Source code for psynet.bot

import time
import uuid
from datetime import datetime
from statistics import mean
from typing import List

import requests
from cached_property import cached_property
from dallinger import db
from sqlalchemy import Column, Integer

from .db import transaction
from .participant import Participant
from .timeline import Page
from .utils import NoArgumentProvided, get_logger, log_time_taken, wait_until

logger = get_logger()


[docs] class Bot(Participant): page_count = Column(Integer, default=1) def __init__( self, recruiter_id="bot_recruiter", worker_id=None, assignment_id=None, unique_id=None, hit_id="", mode="debug", ): self.wait_until_experiment_launch_is_complete() if worker_id is None: worker_id = str(uuid.uuid4()) if assignment_id is None: assignment_id = str(uuid.uuid4()) logger.info("Initializing bot with worker ID %s.", worker_id) super().__init__( self.experiment, recruiter_id=recruiter_id, worker_id=worker_id, assignment_id=assignment_id, hit_id=hit_id, mode=mode, ) db.session.add(self) db.session.commit() def initialize(self, experiment): self.experiment.initialize_bot(bot=self) super().initialize(experiment) def wait_until_experiment_launch_is_complete(self): from .experiment import is_experiment_launched def f(): logger.info("Waiting for experiment launch to complete....") return is_experiment_launched() wait_until( f, max_wait=60, error_message="Experiment launch didn't finish in time" ) @cached_property def experiment(self): from .experiment import get_experiment return get_experiment() @cached_property def timeline(self): return self.experiment.timeline def get_current_page(self): return self.experiment.get_current_page(self.experiment, self)
[docs] @log_time_taken def take_experiment(self, time_factor=0, render_pages: bool = False): """ Parameters ---------- time_factor : Determines how long the bot spends on each page. If 0, the bot spends no time on each page. If 1, the bot spends ``time_estimate`` time on each page. render_page : Whether to run page rendering code (default: False). This is generally only useful for testing. """ logger.info(f"Bot {self.id} is starting the experiment.") self.run_to_completion(time_factor, render_pages)
def run_to_completion(self, time_factor=0, render_pages: bool = False): # We tried the following code to simulate the Flask server and thereby # run Page.render() functions directly. However the approach fails # when we try to run multiple tests in succession, because Flask # doesn't let us deregister the old apps. # # from gunicorn import util # from .utils import working_directory # with working_directory(self.experiment.var.server_working_directory): # app = util.import_app("dallinger.experiment_server.sockets:app") # with app.app_context(), app.test_request_context(): n_pages = 0 page_processing_times = [] page_total_times = [] while True: page_time_started = time.monotonic() # This commit is necessary because get_current_page can make changes to the participant # (e.g. advancing them to the next page in the timeline). We need to commit so that the # server (as accessed via the HTTP request) has access to this information too. sleep_time = self.take_page( time_factor=time_factor, render_page=render_pages )["sleep_time"] page_time_finished = time.monotonic() page_total_time = page_time_finished - page_time_started page_total_times.append(page_total_time) page_processing_time = page_total_time - sleep_time page_processing_times.append(page_processing_time) n_pages += 1 if not self.status == "working": break if n_pages > 0: mean_page_processing_time = mean(page_processing_times) else: mean_page_processing_time = None total_experiment_time = (datetime.now() - self.creation_time).total_seconds() # Todo - migrate these metrics to generic Participants (not just bots) so that we can report them # everywhere stats = { "page_count": self.page_count, "progress": self.progress, "mean_page_processing_time": mean_page_processing_time, "total_wait_page_time": self.total_wait_page_time, "total_experiment_time": total_experiment_time, } logger.info( f"Bot {self.id} has finished the experiment (took {stats['page_count']} page(s), " f"progress = {100 * stats['progress']:.0f}%, " f"mean processing time per page = {stats['mean_page_processing_time']:.3f} seconds, " f"total WaitPage time = {stats['total_wait_page_time']:.3f} seconds, " f"total experiment time = {stats['total_experiment_time']:.3f} seconds)." ) return stats # In a real launched experiment, taking a page involves a single HTTP request that is wrapped in a transaction. # We therefore do the same here, to ensure that the bot's behavior is as close as possible to that of a real # participant. def take_page( self, page=None, time_factor=0, response=NoArgumentProvided, render_page=False ): from .page import WaitPage if render_page: db.session.commit() # Make sure that any local changes to the participant are visible to the server req = requests.get( f"http://localhost:5000/timeline?unique_id={self.unique_id}" ) assert req.status_code == 200 db.session.commit() # Make sure any server-side changes are visible to us with transaction(): # Locks the present participant row self = ( self.__class__.query.with_for_update(of=self.__class__) .populate_existing() .get(self.id) ) start_time = time.monotonic() if page is None: page = self.get_current_page() bot = self experiment = self.experiment assert isinstance(page, Page) sleep_time = page.time_estimate * time_factor if sleep_time == 0 and isinstance(page, WaitPage): sleep_time = 0.5 if sleep_time > 0: time.sleep(sleep_time) response = page.call__bot_response(experiment, bot, response) if "time_taken" not in response.metadata: response.metadata["time_taken"] = sleep_time try: experiment.process_response( participant_id=self.id, raw_answer=response.raw_answer, blobs=response.blobs, metadata=response.metadata, page_uuid=self.page_uuid, client_ip_address=response.client_ip_address, answer=response.answer, ) except RuntimeError as err: if "Working outside of request context" in str(err): err.args = ( err.args[0] + "\n\nNote: The 'working outside of request context' error can usually be ignored " "during testing as it typically comes from Flask trying to construct an " "error page without a valid request context. The real error probably " "happened earlier though.", ) raise self.page_count += 1 end_time = time.monotonic() processing_time = end_time - start_time - sleep_time return { "sleep_time": sleep_time, "processing_time": processing_time, } def submit_response(self, response=NoArgumentProvided): page = self.get_current_page() self.take_page(page, response=response) def run_until(self, condition, render_pages=False): while True: current_page = self.get_current_page() if condition(current_page): break self.take_page(current_page, render_page=render_pages) if not self.status == "working": raise RuntimeError( "Bot finished the experiment before condition was met." )
[docs] class BotResponse: """ Defines a bot's response to a given page. Parameters ---------- raw_answer : The raw_answer returned from the page. answer : The (formatted) answer, as would ordinarily be computed by ``format_answer``. metadata : A dictionary of metadata. blobs : A dictionary of blobs returned from the front-end. client_ip_address : The client's IP address. """ def __init__( self, *, raw_answer=NoArgumentProvided, answer=NoArgumentProvided, metadata=NoArgumentProvided, blobs=NoArgumentProvided, client_ip_address=NoArgumentProvided, ): if raw_answer != NoArgumentProvided and answer != NoArgumentProvided: raise ValueError( "raw_answer and answer cannot both be provided; you should probably just provide raw_answer." ) if raw_answer == NoArgumentProvided and answer == NoArgumentProvided: raise ValueError("At least one of raw_answer and answer must be provided.") if blobs == NoArgumentProvided: blobs = {} if metadata == NoArgumentProvided: metadata = {} if client_ip_address == NoArgumentProvided: client_ip_address = None self.raw_answer = raw_answer self.answer = answer self.metadata = metadata self.blobs = blobs self.client_ip_address = client_ip_address
def advance_past_wait_pages(bots: List[Bot], max_iterations=10): from .page import WaitPage iteration = 0 while True: iteration += 1 any_waiting = False for bot in bots: current_page = bot.get_current_page() if isinstance(current_page, WaitPage): any_waiting = True bot.take_page(current_page) if not any_waiting: break if iteration >= max_iterations: raise RuntimeError("Not all bots finished waiting in time.")