Source code for psynet.timeline

# pylint: disable=abstract-method

import json
import random
import time
from collections import Counter
from datetime import datetime
from functools import cached_property, reduce
from importlib import resources
from statistics import median
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Sequence, Union

from dallinger import db
from dominate import tags
from markupsafe import Markup
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import backref, relationship
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.orm.collections import attribute_mapped_collection

from . import templates
from .data import SQLBase, SQLMixin, register_table
from .field import PythonObject, VarStore
from .utils import (
    NoArgumentProvided,
    call_function,
    call_function_with_context,
    check_function_args,
    dict_to_js_vars,
    format_datetime,
    get_args,
    get_language_dict,
    get_logger,
    log_time_taken,
    merge_dicts,
    pretty_format_seconds,
    render_string_with_translations,
    serialise,
    unserialise_datetime,
)

if TYPE_CHECKING:
    from .participant import Participant

logger = get_logger()


class Event(dict):
    """
    Defines an event that occurs on the front-end for a given page.
    This event is triggered once custom conditions are satisfied;
    it can then trigger future events to occur.
    One can define custom JS code to be run when these events execute
    in one of two ways.
    One approach is to register this custom JS code by writing something
    like this:

    ::

        psynet.trial.onEvent("myEventId", function() {
            // custom code goes here
        });

    A second approach is to add JS code directly to the ``js`` argument
    of the present function.

    The resulting object should be passed to the ``events`` parameter in
    :class:`~psynet.timeline.Page`.

    Parameters
    ----------

    is_triggered_by:
        Defines the triggers for the present event.
        A trigger can be specified either as a string corresponding to an event ID,
        for example ``"trialStart"``, or as an object of class :class:`~psynet.timeline.Trigger`.
        The latter case is more flexible because it allows a particular trigger to be delayed
        by a specified number of seconds.
        Multiple triggers can be defined by instead passing a list of these strings
        or :class:`~psynet.timeline.Trigger` objects.
        Alternatively, one can pass ``None``, in which case the event won't be triggered automatically,
        but instead will only be triggered if/when ``psynet.trial.registerEvent`` is called
        in the Javascript front-end.

    trigger_condition:
        If this is set to ``"all"`` (default), then all triggers must be satisfied before the
        event will be cued. If this is set to ``"any"``, then the event will be cued when
        any one of these triggers occurred.

    delay:
        Determines the time interval (in seconds) between the trigger condition being satisfied
        and the event being triggered (default = 0.0).

    once:
        If ``True``, then the event will only be cued once, at the point when the
        trigger condition is first satisfied. If ``False`` (default), then the event will be recued
        each time one of the triggers is hit again.

    message:
        Optional message to display when this event occurs (default = ``""``).

    message_color:
        CSS color specification for the message (default = ``"black"``).

    js:
        Optional Javascript code to execute when the event occurs (default = ``None``).

    """

    def __init__(
        self,
        is_triggered_by,
        trigger_condition: str = "all",
        delay: float = 0.0,
        once: bool = False,
        message: Optional[str] = None,
        message_color: str = "black",
        js: Optional[str] = None,
    ):
        if is_triggered_by is None:
            is_triggered_by = []
        elif not isinstance(is_triggered_by, list):
            is_triggered_by = [is_triggered_by]

        is_triggered_by = [
            x if isinstance(x, Trigger) else Trigger(x) for x in is_triggered_by
        ]

        super().__init__(
            is_triggered_by=is_triggered_by,
            trigger_condition=trigger_condition,
            delay=delay,
            once=once,
            message=message,
            message_color=message_color,
            js=js,
        )

    def add_trigger(self, trigger, **kwargs):
        if isinstance(trigger, str):
            t = Trigger(triggering_event=trigger, **kwargs)
        elif isinstance(trigger, Trigger):
            t = trigger
        else:
            raise ValueError("trigger must be an object of class str or Trigger.")
        self["is_triggered_by"].append(t)

    def add_triggers(self, *args):
        for arg in args:
            self.add_trigger(arg)


class Trigger(dict):
    def __init__(self, triggering_event, delay=0.0):
        assert isinstance(triggering_event, str)
        super().__init__(triggering_event=triggering_event, delay=float(delay))


def get_template(name):
    assert isinstance(name, str)
    path_all_templates = resources.files(templates)
    path_template = path_all_templates.joinpath(name)
    with open(path_template, "r") as file:
        return file.read()


class Elt:
    returns_time_credit = False
    time_estimate = None
    expected_repetitions = None
    id = None
    created_within_page_maker = False

    def __init__(self):
        self.links = {}

    def consume(self, experiment, participant):
        raise NotImplementedError

    def render(self, experiment, participant):
        raise NotImplementedError

    def multiply_expected_repetitions(self, factor):
        # pylint: disable=unused-argument
        if self.expected_repetitions is not None:
            self.expected_repetitions *= factor


class EltCollection:
    def resolve(self) -> Union[Elt, List[Elt]]:
        raise NotImplementedError


class NullElt(Elt):
    def consume(self, experiment, participant):
        pass

    def render(self, experiment, participant):
        pass


[docs] class CodeBlock(Elt): """ A timeline component that executes some back-end logic without showing anything to the participant. Parameters ---------- function: A function with up to two arguments named ``participant`` and ``experiment``, that is executed once the participant reaches the corresponding part of the timeline. """ def __init__(self, function): super().__init__() self.function = function def consume(self, experiment, participant): call_function_with_context( self.function, self=self, experiment=experiment, participant=participant, )
class StartFixElt(Elt): """ This class is not to be used directly; use instead ``with_fixed_time_credit`` and ``with_fixed_progress``. """ def __init__(self, time_credit: float, end_fix: "EndFixElt"): super().__init__() self.time_credit = time_credit self.expected_repetitions = 1 self.end_fix = end_fix class EndFixElt(Elt): def __init__(self, time_credit: float): super().__init__() self.time_credit = time_credit self.expected_repetitions = 1 class StartFixTimeCredit(StartFixElt): """ This class is not to be used directly; use instead ``with_fixed_time_credit`` and ``with_fixed_progress``. """ def consume(self, experiment, participant): bound = participant.time_credit + self.time_credit participant.time_credit_fixes.append(bound) class EndFixTimeCredit(EndFixElt): """ This class is not to be used directly; use instead ``with_fixed_time_credit`` and ``with_fixed_progress``. """ def consume(self, experiment, participant): participant.time_credit = participant.time_credit_fixes.pop() class StartFixProgress(StartFixElt): """ This class is not to be used directly; use instead ``with_fixed_time_credit`` and ``with_fixed_progress``. """ def consume(self, experiment, participant: "Participant"): if participant.estimated_max_time_credit == 0.0: new_bound = 1.0 else: try: old_bound = participant.progress_fixes[-1] except IndexError: old_bound = 1.0 new_bound = ( participant.progress + self.time_credit / participant.estimated_max_time_credit ) new_bound = min(new_bound, old_bound) participant.progress_fixes.append(new_bound) class EndFixProgress(EndFixElt): """ This class is not to be used directly; use instead ``with_fixed_time_credit`` and ``with_fixed_progress``. """ def consume(self, experiment, participant): participant.progress = participant.progress_fixes.pop() class GoTo(Elt): def __init__(self, target: Union[Elt, str, callable]): super().__init__() self.target = target def get_target(self, experiment, participant): # pylint: disable=unused-argument return self.target def consume(self, experiment, participant): target_elt = self.get_target(experiment, participant) participant.elt_id = target_elt.id # We subtract 1 because elt_id will be incremented again when # we return to the start of the advance page loop. # Remember that ``elt_id`` corresponds to a nested representation, # where each element corresponds to successively deeper and deeper # levels of page makers. # We therefore perform our subtraction to the last element # of the list. participant.elt_id[-1] -= 1 # Todo - remove ReactiveGoTo and move its code into Switch class ReactiveGoTo(GoTo): def __init__( self, function, # function taking experiment, participant and returning a key targets, # dict of possible target elements ): # pylint: disable=super-init-not-called super().__init__(target=None) self.function = function self.targets = targets self.check_args() def check_args(self): self.check_function() self.check_targets() def check_function(self): check_function_args( self.function, ("self", "experiment", "participant"), need_all=False ) def check_targets(self): try: assert isinstance(self.targets, dict) for target in self.targets.values(): assert isinstance(target, Elt) except AssertionError: raise TypeError("<targets> must be a dictionary of Elt objects.") def get_target(self, experiment, participant): val = call_function_with_context( self.function, self=self, experiment=experiment, participant=participant, ) try: return self.targets[val] except KeyError: raise ValueError( f"ReactiveGoTo returned {val}, which is not present among the target keys: " + f"{list(self.targets)}." ) class MediaSpec: """ This object enumerates the media assets available for a given :class:`~psynet.timeline.Page` object. Parameters ---------- audio: dict A dictionary of audio assets. Each item can either be a string, corresponding to the URL for a single file (e.g. "/static/audio/test.wav"), or a dictionary, corresponding to metadata for a batch of media assets. A batch dictionary must contain the field "url", providing the URL to the batch file, and the field "ids", providing the list of IDs for the batch's constituent assets. A valid audio argument might look like the following: :: { 'bier': '/static/bier.wav', 'my_batch': { 'url': '/static/file_concatenated.mp3', 'ids': ['funk_game_loop', 'honey_bee', 'there_it_is'], 'type': 'batch' } } html: dict An analogously structured dictionary of HTML stimuli (e.g., SVG stimuli). image: dict An analogously structured dictionary of image stimuli. video: dict An analogously structured dictionary of video stimuli. """ modalities = ["audio", "image", "html", "video"] def __init__( self, audio: Optional[dict] = None, image: Optional[dict] = None, html: Optional[dict] = None, video: Optional[dict] = None, ): from .asset import Asset if audio is None: audio = {} if image is None: image = {} if html is None: html = {} if video is None: video = {} self.data = {"audio": audio, "image": image, "html": html, "video": video} for modality in self.data.values(): for key, value in modality.items(): if isinstance(value, Asset): modality[key] = value.url elif isinstance(value, dict): for _key, _value in value.items(): if isinstance(_value, Asset): value[_key] = _value.url assert list(self.data) == self.modalities @property def audio(self): return self.data["audio"] @property def image(self): return self.data["image"] @property def html(self): return self.data["html"] @property def video(self): return self.data["video"] @property def ids(self): res = {} for media_type, media in self.data.items(): res[media_type] = set() for key, value in media.items(): if isinstance(value, str): res[media_type].add(key) else: assert isinstance(value, dict) res[media_type].update(value["ids"]) return res @property def num_files(self): counter = 0 for modality in self.data.values(): counter += len(modality) return counter def add(self, modality: str, entries: dict): if modality not in self.data: self.data[modality] = {} for key, value in entries.items(): self.data[modality][key] = value @classmethod def merge(self, *args, overwrite: bool = False): if len(args) == 0: return MediaSpec() new_args = {} for modality in self.modalities: new_args[modality] = merge_dicts( *[x.data[modality] for x in args], overwrite=overwrite ) return MediaSpec(**new_args) def check(self): assert isinstance(self.data, dict) for key, value in self.data.items(): assert key in self.modalities ids = set() for file_id, file in value.items(): if file_id in ids: raise ValueError( f"{file_id} occurred more than once in page's {key} specification." ) ids.add(file_id) if not isinstance(file, str): if not isinstance(file, dict): raise TypeError( f"Media entry must either be a string URL or a dict (got {file})." ) if not ("url" in file and "ids" in file): raise ValueError( "Batch specifications must contain both 'url' and 'ids' keys." ) batch_ids = file["ids"] if not isinstance(batch_ids, list): raise TypeError( f"The ids component of the batch specification must be a list (got {ids})." ) for _id in batch_ids: if not isinstance(_id, str): raise TypeError( f"Each id in the batch specification must be a string (got {_id})." ) ids.add(_id) def to_json(self): return json.dumps(self.data) class ProgressStage(dict): def __init__( self, time: Union[float, int, List], caption: str = "", color: str = "rgb(49, 124, 246)", persistent: bool = False, ): if isinstance(time, list): duration = time[1] - time[0] else: duration = time self["time"] = time self["duration"] = duration self["caption"] = caption self["color"] = color self["persistent"] = persistent class ProgressDisplay(dict): def __init__( self, stages: List, start="trialStart", show_bar: bool = True, show_caption: bool = True, **kwargs, ): self.consolidate_stages(stages) if len(stages) == 0: _duration = 0.0 else: last_stage = stages[-1] _duration = last_stage["time"][1] self["duration"] = _duration self["start"] = start self["show_bar"] = show_bar self["show_caption"] = show_caption self["stages"] = stages self.validate() if "duration" in kwargs: logger.warning( "ProgressDisplay no longer takes a 'duration' argument, please remove it." ) del kwargs["duration"] if (len(kwargs)) > 0: logger.warning( "The following unrecognized arguments were passed to ProgressDisplay: " + ", ".join(list(kwargs)) ) def consolidate_stages(self, stages): """ Goes through the list of stages, and whenever the ``time`` argument is a single number, replaces this argument with a pair of numbers corresponding to the computed start time and end time for that stage. """ _start_time = 0.0 for s in stages: if not isinstance(s["time"], list): _duration = s["time"] _end_time = _start_time + _duration s["time"] = [_start_time, _end_time] _end_time = s["time"][1] _start_time = _end_time def validate(self): stages = self["stages"] for i, stage in enumerate(stages): start_time = stage["time"][0] if i == 0: if start_time != 0.0: raise ValueError( "The first stage in the progress bar must have a start time of 0.0." ) else: prev_stage = stages[i - 1] prev_stage_end_time = prev_stage["time"][1] if start_time != prev_stage_end_time: raise ValueError( f"The start time of stages[{i}] did not match the end time of the previous stage." ) if i == len(stages) - 1: end_time = stage["time"][1] if end_time != self["duration"]: raise ValueError( "The final stage must have an end time equal to the progress bar's duration." )
[docs] class Page(Elt): """ The base class for pages, customised by passing values to the ``__init__`` function and by overriding the following methods: * :meth:`~psynet.timeline.Page.format_answer` * :meth:`~psynet.timeline.Page.validate` * :meth:`~psynet.timeline.Page.metadata` Parameters ---------- time_estimate: Time estimated for the page. template_path: Path to the jinja2 template to use for the page. template_str: Alternative way of specifying the jinja2 template as a string. template_arg: Dictionary of arguments to pass to the jinja2 template. label: Internal label to give the page, used for example in results saving. js_vars: Dictionary of arguments to instantiate as global Javascript variables. js_links: Optional list of paths to JavaScript scripts to include in the page. media: :class:`psynet.timeline.MediaSpec` Optional specification of media assets to preload (see the documentation for :class:`psynet.timeline.MediaSpec`). scripts: Optional list of scripts to include in the page. Each script should be represented as a string, which will be passed verbatim to the page's HTML. css: Optional list of CSS specification to include in the page. Each specification should be represented as a string, which will be passed verbatim to the page's HTML. A valid CSS specification might look like this: :: .modal-content { background-color: #4989C8; margin: auto; padding: 20px; border: 1px solid #888; width: 80%; } .close { color: #aaaaaa; float: right; font-size: 28px; font-weight: bold; } css_links: Optional list of links to CSS stylesheets to include in the page. contents: Optional dictionary to store some experiment specific data. For example, in an experiment about melodies, the contents property might look something like this: {”melody”: [1, 5, 2]}. save_answer: If ``True`` (default), then the answer generated by the page is saved to ``participant.answer``, and a link to the corresponding ``Response`` object is saved in ``participant.last_response_id``. If ``False``, these slots are left unchanged. If a string, then the answer is not only saved to ``participant.answer`` and ``participant.last_response_id``, but it is additionally saved as a participant variable named by that string. events: An optional dictionary of event specifications for the page. This determines the timing of various Javascript events that happen on the page. Each key of this dictionary corresponds to a particular event. Each value should then correspond to an object of class :class:`~psynet.timeline.Event`. The :class:`~psynet.timeline.Event` object specifies how the event is triggered by other events. For example, if I want to define an event that occurs 3 seconds after the trial starts, I would write ``events={"myEvent": Event(is_triggered_by="trialStart", delay=3.0)}``. Useful standard events to know are ``trialStart`` (start of the trial), ``promptStart`` (start of the prompt), ``promptEnd`` (end of the prompt), ``recordStart`` (beginning of a recording), ``recordEnd`` (end of a recording), ``responseEnable`` (enables the response options), and ``submitEnable`` (enables the user to submit their response). These events and their triggers are set to sensible defaults, but the user is welcome to modify them for greater customization. See also the ``update_events`` methods of :class:`~psynet.modular_page.Prompt` and :class:`~psynet.modular_page.Control`, which provide alternative ways to customize event sequences for modular pages. progress_display Optional :class:`~psynet.timeline.ProgressDisplay` object. show_termination_button: If ``True``, a button is displayed allowing the participant to terminate the experiment, Defaults to ``recruiter.show_termination_button`` which can be ``False`` for all recruiters except for the Lucid recruiter where it should be ``True``. start_trial_automatically If ``True`` (default), the trial starts automatically, e.g. by the playing of a queued audio file. Otherwise the trial will wait for the trialPrepare event to be triggered (e.g. by clicking a 'Play' button, or by calling `psynet.trial.registerEvent("trialPrepare")` in JS). bot_response Optional function to call when this page is consumed by a bot. This will override any ``bot_response`` function specified in the class's ``bot_response`` method. validate Optional validation function to use for the participant's response. Alternatively, the validation function can be set by overriding this class's ``validate`` method. If no validation function is found, no validation is performed. See :meth:`~psynet.timeline.Page.validate` for information about how to write this function. Validation functions provided via the present route may contain various optional arguments. Most typically the function will be of the form ``lambda answer: ...` or ``lambda answer, participant: ...``, but it is also possible to include the arguments ``raw_answer``, ``response``, ``page``, and ``experiment``. Note that ``raw_answer`` is the answer before applying ``format_answer``, and ``answer`` is the answer after applying ``format_answer``. Validation functions should return ``None`` if the validation passes, or if it fails a string corresponding to a message to pass to the participant. For example, a validation function testing that the answer contains exactly 3 characters might look like this: ``lambda answer: "Answer must contain exactly 3 characters!" if len(answer) != 3 else None``. Attributes ---------- contents : dict A dictionary containing experiment specific data. session_id : str If session_id is not None, then it must be a string. If two consecutive pages occur with the same session_id, then when it’s time to move to the second page, the browser will not navigate to a new page, but will instead update the Javascript variable psynet.page with metadata for the new page, and will trigger an event called pageUpdated. This event can be listened for with Javascript code like window.addEventListener(”pageUpdated”, ...). dynamically_update_progress_bar_and_reward : bool If ``True``, then the page will regularly poll for updates to the progress bar and the reward. If ``False`` (default), the progress bar and reward are updated only on page refresh or on transition to the next page. """ returns_time_credit = True dynamically_update_progress_bar_and_reward = False def __init__( self, *, time_estimate: Optional[float] = None, template_path: Optional[str] = None, template_str: Optional[str] = None, template_arg: Optional[Dict] = None, label: str = "untitled", js_vars: Optional[Dict] = None, js_links: Optional[List] = None, media: Optional[MediaSpec] = None, scripts: Optional[List] = None, css: Optional[List] = None, css_links: Optional[List] = None, contents: Optional[Dict] = None, session_id: Optional[str] = None, save_answer: bool = True, events: Optional[Dict] = None, progress_display: Optional[ProgressDisplay] = None, start_trial_automatically: bool = True, show_termination_button: bool = None, aggressive_termination_on_no_focus: bool = False, bot_response=NoArgumentProvided, validate: Optional[callable] = None, ): super().__init__() if template_arg is None: template_arg = {} if js_vars is None: js_vars = {} if js_links is None: js_links = [] if contents is None: contents = {} if css_links is None: css_links = [] if template_path is None and template_str is None: raise ValueError("Must provide either template_path or template_str.") if template_path is not None and template_str is not None: raise ValueError("Cannot provide both template_path and template_str.") if template_path is not None: with open(template_path, "r") as file: template_str = file.read() assert len(label) <= 250 assert isinstance(template_arg, dict) assert isinstance(label, str) self.time_estimate = time_estimate self.template_str = template_str self.template_arg = template_arg self.label = label self.js_vars = js_vars self.js_links = js_links self.expected_repetitions = 1 self.media = MediaSpec() if media is None else media self.media.check() self.scripts = [] if scripts is None else [Markup(x) for x in scripts] assert isinstance(self.scripts, list) self.css = [] if css is None else [Markup(x) for x in css] assert isinstance(self.css, list) self.css_links = css_links self._contents = contents self.session_id = session_id self.save_answer = save_answer self.start_trial_automatically = start_trial_automatically self.show_termination_button = show_termination_button self.aggressive_termination_on_no_focus = aggressive_termination_on_no_focus self.events = { **self.prepare_default_events(), **({} if events is None else events), } if progress_display is None: progress_display = ProgressDisplay( stages=[], show_bar=False, show_caption=False ) self.progress_display = progress_display self._bot_response = bot_response self._validate_function = validate def call__bot_response(self, experiment, bot, response=NoArgumentProvided): from .bot import BotResponse if response != NoArgumentProvided: res = response elif self._bot_response == NoArgumentProvided: res = self.get_bot_response(experiment, bot) elif callable(self._bot_response): res = call_function_with_context( self._bot_response, experiment=experiment, bot=bot, participant=bot, page=self, ) else: res = self._bot_response if not isinstance(res, BotResponse): res = BotResponse(answer=res) return res
[docs] def get_bot_response(self, experiment, bot): """ This function is used when a bot simulates a participant responding to a given page. In the simplest form, the function just returns the value of the answer that the bot returns. For more sophisticated treatment, the function can return a ``BotResponse`` object which contains other parameters such as ``blobs`` and ``metadata``. """ raise NotImplementedError
def prepare_default_events(self): return { "trialConstruct": Event(is_triggered_by=None, once=True), "trialManualRequest": Event( is_triggered_by=["trialConstruct", "buttonStart"], once=True, js="$('#buttonStart').attr('disabled', true)", ), "trialPrepare": Event( is_triggered_by=( "trialConstruct" if self.start_trial_automatically else "trialManualRequest" ), once=True, ), "trialStart": Event(is_triggered_by="trialPrepare", once=True), "responseEnable": Event(is_triggered_by="trialStart", delay=0.0, once=True), "submitEnable": Event(is_triggered_by="trialStart", delay=0.0, once=True), "trialFinish": Event( is_triggered_by=None ), # only called when trial comes to a natural end "trialFinished": Event(is_triggered_by="trialFinish"), "trialStop": Event(is_triggered_by=None), # only called at premature end "trialStopped": Event(is_triggered_by="trialStop"), } def __json__(self, participant): return { "attributes": self.attributes(participant), "contents": self.contents, }
[docs] def attributes(self, participant): """ Returns a dictionary containing the `session_id`, the page `type`, and the `page_uuid` . """ from psynet.page import UnityPage return { "session_id": self.session_id, "type": type(self).__name__, "unique_id": participant.unique_id, "page_uuid": participant.page_uuid, "is_unity_page": isinstance(self, UnityPage), }
@property def contents(self): return self._contents @contents.setter def contents(self, contents): self._contents = contents @property def initial_download_progress(self): if self.media.num_files > 0: return 0 else: return 100 def visualize(self, trial): return "" def consume(self, experiment, participant): participant.page_uuid = experiment.make_uuid() def on_complete(self, experiment, participant): pass @log_time_taken def process_response( self, raw_answer, blobs, metadata, experiment, participant, client_ip_address, answer=NoArgumentProvided, ): from psynet.trial.main import Trial if raw_answer == NoArgumentProvided and answer == NoArgumentProvided: raise ValueError("At least one of raw_answer and answer must be provided.") if blobs is None: blobs = {} if metadata is None: metadata = {} resp = Response( participant=participant, label=self.label, page_type=type(self).__name__, client_ip_address=client_ip_address, ) db.session.add(resp) trial = participant.current_trial if answer == NoArgumentProvided: answer = self.format_answer( raw_answer, blobs=blobs, metadata=metadata, experiment=experiment, participant=participant, trial=participant.current_trial, response=resp, ) extra_metadata = self.metadata( metadata=metadata, raw_answer=raw_answer, answer=answer, experiment=experiment, participant=participant, ) combined_metadata = {**metadata, **extra_metadata} resp.answer = answer resp.metadata = combined_metadata if isinstance(trial, Trial): trial.response = resp if trial.time_taken is None: trial.time_taken = resp.metadata["time_taken"] else: trial.time_taken += resp.metadata["time_taken"] if self.save_answer: if len(participant.answer_accumulators) > 0: page_label = self.label accumulator = participant.answer_accumulators[-1] answer_label = self._find_answer_label(page_label, accumulator) accumulator[answer_label] = resp.answer flag_modified(participant, "answer_accumulators") else: participant.answer = resp.answer participant.answer_is_fresh = True if isinstance(self.save_answer, str): participant.var.set(self.save_answer, resp.answer) else: participant.answer_is_fresh = False participant.browser_platform = metadata.get( "platform", "Browser platform info could not be retrieved." ) self.on_complete(experiment=experiment, participant=participant) return resp def _find_answer_label(self, page_label, accumulator): if page_label not in accumulator: return page_label else: i = 0 while i < 1e7: i += 1 label = f"{page_label}_{i}" if label not in accumulator: return label raise ValueError("Failed to construct an appropriate answer label")
[docs] def metadata(self, **kwargs): """ Compiles metadata about the page or its response from the participant. This metadata will be merged with the default metadata object returned from the browser, with any duplicate terms overwritten. Parameters ---------- **kwargs Keyword arguments, including: 1. ``raw_answer``: The raw answer returned from the participant's browser. 2. ``answer``: The formatted answer. 3. ``metadata``: The original metadata returned from the participant's browser. 3. ``experiment``: An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. 4. ``participant``: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. Returns ------- dict A dictionary of metadata. """ return {}
[docs] def format_answer(self, raw_answer, **kwargs): """ Formats the raw answer object returned from the participant's browser. Parameters ---------- raw_answer The raw answer object returned from the participant's browser. **kwargs Keyword arguments, including: 1. ``blobs``: A dictionary of any blobs that were returned from the participant's browser. 2. ``metadata``: The metadata returned from the participant's browser. 3. ``experiment``: An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. 4. ``participant``: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. Returns ------- Object The formatted answer, suitable for serialisation to JSON and storage in the database. """ # pylint: disable=unused-argument return raw_answer
[docs] def validate(self, response, **kwargs): # pylint: disable=unused-argument """ Takes the :class:`psynet.timeline.Response` object created by the page and runs a validation check to determine whether the participant may continue to the next page. Parameters ---------- response: An instance of :class:`psynet.timeline.Response`. Typically the ``answer`` attribute of this object is most useful for validation. **kwargs: Keyword arguments, including: 1. ``experiment``: An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. 2. ``participant``: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. 3. ``answer``: The formatted answer returned by the participant. 4. ``raw_answer``: The unformatted answer returned by the participant. 5. ``page``: The page to which the participant is responding. Returns ------- ``None`` or an object of class :class:`psynet.timeline.FailedValidation` On the case of failed validation, an instantiation of :class:`psynet.timeline.FailedValidation` containing a message to pass to the participant. """ if self._validate_function is not None: return call_function(self._validate_function, response=response, **kwargs)
[docs] def pre_render(self): """ This method is called immediately prior to rendering the page for the participant. It will be called again each time the participant refreshes the page. """ pass
def render(self, experiment, participant): from .utils import get_config internal_js_vars = { "uniqueId": participant.unique_id, "pageUuid": participant.page_uuid, "dynamicallyUpdateProgressBarAndReward": self.dynamically_update_progress_bar_and_reward, } locale = participant.get_locale(experiment) language_dict = get_language_dict(locale) config = get_config() js_vars = {**self.js_vars, **internal_js_vars} all_template_args = { **self.template_arg, "init_js_vars": Markup(dict_to_js_vars(js_vars)), "js_vars": js_vars, "page": self, "define_media_requests": Markup(self.define_media_requests), "initial_download_progress": self.initial_download_progress, "time_reward": "%.2f" % participant.time_reward, "performance_reward": "%.2f" % participant.performance_reward, "total_reward": "%.2f" % (participant.performance_reward + participant.time_reward), "progress_percentage": round(participant.progress * 100), "contact_email_on_error": config.get("contact_email_on_error"), "experiment_title": config.get("title"), "app_id": experiment.app_id, "participant": participant, "unique_id": participant.unique_id, "worker_id": participant.worker_id, "scripts": self.scripts, "js_links": self.js_links, "css": self.css + experiment.css, "css_links": self.css_links + experiment.css_links, "events": self.events, "trial_progress_display_config": self.progress_display, "attributes": self.attributes, "contents": self.contents, "supported_language_dict": { iso: language_dict[iso] for iso in json.loads(config.get("supported_locales")) }, "current_locale": locale, "start_experiment_in_popup_window": experiment.start_experiment_in_popup_window, "show_termination_button": self.show_termination_button, "aggressive_termination_on_no_focus": self.aggressive_termination_on_no_focus, } return render_string_with_translations( template_string=self.template_str, locale=locale, **all_template_args ) @property def define_media_requests(self): return f"psynet.media.requests = JSON.parse('{self.media.to_json()}');"
[docs] class PageMaker(Elt): """ A page maker is defined by a function that is executed when the participant requests the relevant page. Parameters ---------- function: A function that may take up to two arguments, named ``experiment`` and ``participant``. These arguments correspond to instantiations of the class objects :class:`psynet.experiment.Experiment` and :class:`psynet.participant.Participant` respectively. The function should return either a single test element (e.g. :class:`psynet.timeline.Page`, :class:`psynet.timeline.PageMaker`, :class:`psynet.timeline.CodeBlock`) or a list of such elements. Note that :class:`psynet.timeline.PageMaker` objects can be nested arbitrarily deeply. Note also that, if the page maker returns multiple pages, then the function will be recomputed each time the participant progresses to the next page. This functionality can be used to make the latter pages depend on the earlier pages in the page maker. time_estimate: Time estimated to complete the segment. This time estimate is used for predicting the overall length of the experiment and hence generating the progress bar. The actual time credit given to the participant is determined by ``time_estimate`` parameters provided to the pages generated by ``function``. However, there is an exception provided for back-compatibility: if ``function`` generates a list containing solely :class:`psynet.timeline.Page` or :class:`psynet.timeline.PageMaker` objects, and if those objects are all missing ``time_estimate`` values, then these ``time_estimate`` values will be imputed by dividing the parent :class:`psynet.timeline.PageMaker`'s ``time_estimate`` by the number of produced elements. """ returns_time_credit = True def __init__( self, function: Callable[..., "TimelineLogic"], time_estimate: Optional[float] = None, accumulate_answers: bool = False, label: str = "page_maker", ): super().__init__() assert callable(function) self.function = function self.time_estimate = time_estimate self.accumulate_answers = accumulate_answers self.expected_repetitions = 1 self.label = label def resolve(self, experiment, participant, position): """ This function 'resolves' the page maker by calling its underlying function and hence returning its underlying timeline logic. Parameters ---------- experiment : The experiment instance. participant : The participant instance. position : The position of the page maker within the timeline. This is used for setting the IDs of the timeline elements that are produced. Returns ------- A list of ``Elt`` objects. """ res = call_function_with_context( self.function, self=self, experiment=experiment, participant=participant, ) res = join(res) for elt in res: if isinstance(elt, StartModule): raise ValueError( "Sorry, you cannot use modules or trial makers inside the lambda functions of " "page makers or for loops. These need to be defined upon construction of the timeline." ) self.impute_time_estimates(res) self.check_time_estimates(res) res = join( StartAccumulateAnswers() if self.accumulate_answers else None, res, EndAccumulateAnswers() if self.accumulate_answers else None, ) res = with_fixed_progress(res, self.time_estimate) for i, elt in enumerate(res): elt.id = position + [i] elt.created_within_page_maker = True elt.links = {**self.links, **elt.links} return res def impute_time_estimates(self, elts): # This is performed for back-compatibility; # basically, if all the elements are pages or page makers # and none of them have time estimates, then we compute # their time estimates by equally subdividing the time estimate # for the parent page maker. if all( [ isinstance(elt, (Page, PageMaker)) and elt.time_estimate is None for elt in elts ] ): n = len(elts) for elt in elts: elt.time_estimate = self.time_estimate / n def check_time_estimates(self, elts): for elt in elts: if elt.returns_time_credit and elt.time_estimate is None: raise RuntimeError( f"One of the elements in the page maker was missing a time estimate ({elt})" )
class PageMakerFinishedError(Exception): pass class Timeline: def __init__(self, *args): # Todo - don't add SuccessfulEndLogic if it's already there. # To achieve this, we should refactor EltCollection to make # it easier to test for. from psynet.end import SuccessfulEndLogic self.elts = join(*args, SuccessfulEndLogic()) self.modules, self.module_list = self.compile_modules() self.check_elts() self.add_elt_ids() self.estimated_time_credit = CreditEstimate(self.elts) def compile_modules(self): modules = {} module_list = [] for elt in self.elts: if isinstance(elt, StartModule): module = elt.module if module.id in modules: raise ValueError(f"Duplicated module name detected: {module.id}") modules[module.id] = module module_list.append(module) return modules, module_list def check_elts(self): assert isinstance(self.elts, list) assert len(self.elts) > 0 # We used to check that the timeline finished with an EndPage, but this is no longer necessary, # as we now automatically add SuccessfulEndLogic to the main branch. self.check_for_time_estimate() self.check_for_consent() self.check_modules() def check_for_time_estimate(self): for i, elt in enumerate(self.elts): if ( isinstance(elt, Page) or isinstance(elt, PageMaker) ) and elt.time_estimate is None: raise ValueError( f"Element {i} of the timeline was missing a time_estimate value." ) def check_modules(self): modules = [x.label for x in self.elts if isinstance(x, StartModule)] counts = Counter(modules) duplicated = [key for key, value in counts.items() if value > 1] if len(duplicated) > 0: raise ValueError( "The following module ID(s) were duplicated in your timeline: " + ", ".join(duplicated) + ". PsyNet timelines may not contain duplicated module IDs. " + "You will need to update your timeline to fix this. " + "This will probably mean updating one or more `id_` arguments in your " + "trial makers and/or pre-screening tasks." ) def check_for_consent(self): from psynet.consent import Consent from psynet.page import InfoPage first_elt = self.elts[0] # ignore unless the timeline is fully initialized if ( isinstance(first_elt, InfoPage) and first_elt.content == "Placeholder timeline" ): return if all([not isinstance(elt, Consent) for elt in self.elts]): raise ValueError("At least one element in the timeline must be a consent.") @property def consents(self): from .consent import Consent return [elt for elt in self.elts if isinstance(elt, Consent)] def verify_consents(self, experiment): recruiter = experiment.recruiter if hasattr(recruiter, "verify_consents"): recruiter.verify_consents(self.consents) def get_module(self, module_id): try: return self.modules[module_id] except IndexError: raise RuntimeError(f"Couldn't find module with id = {module_id}.") @cached_property def trial_makers(self): return { e.trial_maker_id: e.trial_maker for e in self.elts if isinstance(e, RegisterTrialMaker) } def get_trial_maker(self, trial_maker_id): try: return self.trial_makers[trial_maker_id] except IndexError: raise RuntimeError(f"Couldn't find trial maker with id = {trial_maker_id}.") def add_elt_ids(self): for i, elt in enumerate(self.elts): if elt.id is not None and elt.id != [i]: raise ValueError( f"Failed to set unique IDs for each element in the timeline " f"(the same element was reused at positions {elt.id} and {i}). " "This usually means that the same Python object instantiation is reused multiple times " "in the same timeline. This kind of reusing is not permitted, instead you should " "create a fresh instantiation of each element, e.g. by calling a function twice." ) elt.id = [i] def __len__(self): return len(self.elts) def __getitem__(self, key: Union[str, list]): if isinstance(key, str): key = [key] selected = self.elts for k in key: selected = selected[k] return selected def index(self, elt: Elt): if elt.id is None: raise ValueError( "Cannot index an element that has yet to be assigned an ID." ) return elt.id @log_time_taken def get_current_elt(self, experiment, participant): # Remember, ``participant.elt_id`` corresponds to a list representation # of the participant's position in the timeline, where the first element corresponds # to the index of the participant within the timeline's underlying # list representation, and successive elements (if any) represent # the participant's position within (potentially nested) page makers. # For example, ``[10, 3, 2]`` would mean go to # element 10 in the timeline (0-indexing), # which must be a page maker; # go to element 3 within that page maker, which must also be a page maker; # go to element 2 within that page maker. # # The current function gets the ``Elt`` corresponding to the participant's # current ``elt_id``. It works by iterating through the ``participant.elt_id`` # list from first to last element, each time 'resolving' the corresponding # page maker (which means computing its underlying function), # taking the list of test elements that comes out, # going to the corresponding element within that list, # resolving it, and so on. # num_levels = len(participant.elt_id) selected = self.elts for depth, index in enumerate(participant.elt_id): # Suppose ``participant.elt_id`` = ``[10, 3, 2]`` # then: # depth: 0, 1, 2 # index: 10, 3, 2 try: # index_max tells us the maximum allowed elt_id at this level of the hierarchy. # The top level is the number of Elts in the timeline, minus one; # the next level is the number of Elts in the trialmaker minus one, and so on. index_max = participant.elt_id_max[depth] except IndexError: index_max = None if isinstance(selected, PageMaker): try: # ``position`` corresponds to the page maker's location within the timeline. # For example, suppose we are on the third level of the example above, then: # depth: 2 # index: 2 # position: ``[10, 3]`` if index_max is not None and index > index_max: raise IndexError position = participant.elt_id[0:depth] selected = selected.resolve(experiment, participant, position) if index_max is None: participant.elt_id_max.append(len(selected) - 1) except IndexError: # This occurs if the requested index goes past the number of # elements produced by the current page maker. # If this occurs in the deepest level of ``participant.elt_id``, # it's fine; it normally means that the participant has finished the # page maker that is currently under consideration, and is ready # to move to the next part of the timeline. In this case we therefore # raise a ``PageMakerFinishedError``. # However, if this happens at a higher level of ``participant.elt_id``, # something weird has happened. assert depth + 1 == num_levels raise PageMakerFinishedError selected = selected[index] return selected @log_time_taken def advance_page(self, experiment, participant): finished = False while not finished: participant.elt_id[-1] += 1 try: new_elt = self.get_current_elt(experiment, participant) except PageMakerFinishedError: participant.elt_id = participant.elt_id[:-1] participant.elt_id_max = participant.elt_id_max[:-1] continue if isinstance(new_elt, PageMaker): participant.elt_id.append(-1) continue new_elt.consume(experiment, participant) if isinstance(new_elt, Page): finished = True def estimated_max_reward(self, wage_per_hour): return self.estimated_time_credit.get_max("reward", wage_per_hour=wage_per_hour) def estimated_completion_time(self, wage_per_hour): return self.estimated_time_credit.get_max("time", wage_per_hour=wage_per_hour) class CreditEstimate: def __init__(self, elts): self._elts = join(elts) self._max_time = self._estimate_max_time(self._elts) def get_max(self, mode, wage_per_hour=None): if mode == "time": return self._max_time elif mode == "reward": assert wage_per_hour is not None return self._max_time * wage_per_hour / (60 * 60) elif mode == "all": return { "time_seconds": self._max_time, "time_minutes": self._max_time / 60, "time_hours": self._max_time / (60 * 60), "reward": self.get_max("reward", wage_per_hour=wage_per_hour), } def _estimate_max_time(self, elts: List[Elt]): time_credit = 0.0 pos = 0 while True: if pos == len(elts): return time_credit elt = elts[pos] if elt.returns_time_credit: time_credit += elt.time_estimate * elt.expected_repetitions if isinstance(elt, StartFixElt): pos = elts.index(elt.end_fix) elif isinstance(elt, EndFixElt): time_credit += elt.time_credit * elt.expected_repetitions pos += 1 elif isinstance(elt, StartSwitch): time_credit += self._estimate_switch_credit(elt, elts) pos = elts.index(elt.end_switch) elif isinstance(elt, EndSwitchBranch): pos = elts.index(elt.target) elif isinstance(elt, GoTo): pos = self._follow_go_to(go_to=elt, elts=elts) else: pos += 1 def _estimate_switch_credit(self, elt, elts): return max( [ self._estimate_max_time( elts[elts.index(branch_start) : (1 + elts.index(elt.end_switch))] ) for key, branch_start in elt.branch_start_elts.items() ] ) def _follow_go_to(self, go_to, elts) -> Union[List, int]: if callable(go_to.target): raise ValueError( "Cannot proceed with timeline simulation as this GoTo's target is only known at run time" ) elif isinstance(go_to.target, Elt): return elts.index(go_to.target) elif isinstance(go_to.target, list): for i, elt in enumerate(elts): if elt.id == go_to.target: return i raise ValueError(f"Failed to follow GoTo to target {go_to.target}") def estimate_duration(logic): # This join ensures that any modules are resolved into lists of Elts. elts = join(logic) return CreditEstimate(elts).get_max("time") class FailedValidation: def __init__(self, message="Invalid response, please try again."): self.message = message @register_table class _Response(SQLBase, SQLMixin): """ This virtual class is not to be used directly. We use it as the parent class for the ``Response`` class to sidestep the following SQLAlchemy error: sqlalchemy.exc.InvalidRequestError: Attribute name 'metadata' is reserved for the MetaData instance when using a declarative base class. """ __tablename__ = "response"
[docs] class Response(_Response): """ A database-backed object that stores the participant's response to a :class:`~psynet.timeline.Page`. By default, one such object is created each time the participant tries to advance to a new page. Attributes ---------- answer The participant's answer, after formatting. page_type: str The type of page administered. successful_validation: bool Whether the response validation was successful, allowing the participant to advance to the next page. client_ip_address : str The participant's IP address as reported by Flask. """ __extra_vars__ = {} participant_id = Column(Integer, ForeignKey("participant.id"), index=True) participant = relationship( "psynet.participant.Participant", back_populates="all_responses", foreign_keys=[participant_id], ) question = Column(String) answer = Column(PythonObject) page_type = Column(String) successful_validation = Column(Boolean) client_ip_address = Column(String) # metadata is a protected attribute in SQLAlchemy, hence the underscore # and the functional setter/getter. metadata_ = Column(PythonObject) @property def metadata(self): """ A dictionary of metadata associated with the Response object. Stored in the ``details`` field in the database. """ return self.metadata_ @metadata.setter def metadata(self, metadata): self.metadata_ = metadata async_processes = relationship("AsyncProcess") # assets = relationship( # "Asset", collection_class=attribute_mapped_collection("label_or_key") # ) errors = relationship("ErrorRecord") def __init__( self, participant, label, page_type, client_ip_address, answer=None, metadata=None, ): self.participant_id = participant.id self.question = label self.page_type = page_type self.metadata = metadata self.client_ip_address = client_ip_address self.answer = answer self.metadata = metadata
def is_list_of(x, what): if not isinstance(x, list): return False for val in x: if not isinstance(val, what): return False return True def join(*args): from .asset import AssetSpecification valid_classes = (AssetSpecification, Elt, EltCollection) for i, arg in enumerate(args): if not ( (arg is None) or (isinstance(arg, valid_classes) or is_list_of(arg, valid_classes)) ): raise TypeError( f"Element {i + 1} of the input to join() was neither an Asset/Elt/EltCollection nor a list of such objects: ({arg})." ) args = [a for a in args if a is not None] if len(args) == 0: return [] elif len(args) == 1: if isinstance(args[0], Elt): return [args[0]] elif isinstance(args[0], EltCollection): return args[0].resolve() else: return args[0] else: def f(x, y): if isinstance(x, EltCollection): x = x.resolve() if isinstance(y, EltCollection): y = y.resolve() if x is None: return y elif y is None: return x elif isinstance(x, Elt) and isinstance(y, Elt): return [x, y] elif isinstance(x, Elt) and isinstance(y, list): return [x] + y elif isinstance(x, list) and isinstance(y, Elt): return x + [y] elif isinstance(x, list) and isinstance(y, list): return x + y else: raise ValueError( f"Don't know how to join the following two timeline components: {x}, {y}." ) return reduce(f, args) class StartWhile(NullElt): def __init__(self, label): # targets = { # True: self, # False: end_while # } # super().__init__(condition, targets) super().__init__() self.label = label class EndWhile(NullElt): def __init__(self, label): super().__init__() self.label = label
[docs] def while_loop( label: str, condition: Callable, logic, expected_repetitions: int, max_loop_time: float = None, fix_time_credit=True, fail_on_timeout=True, ): """ Loops a series of elts while a given criterion is satisfied. The criterion function is evaluated once at the beginning of each loop. Parameters ---------- label: Internal label to assign to the construct. condition: A function with up to two arguments named ``participant`` and ``experiment``, that is executed once the participant reaches the corresponding part of the timeline, returning a Boolean. logic: An elt (or list of elts) to display while ``condition`` returns ``True``. expected_repetitions: The number of times the loop is expected to be seen by a given participant. This doesn't have to be completely accurate, but it is used for estimating the length of the total experiment. max_loop_time: The maximum time in seconds for staying in the loop. Once exceeded, the participant is is presented the ``UnsuccessfulEndPage``. Default: None. fix_time_credit: Whether participants should receive the same time credit irrespective of whether ``condition`` returns ``True`` or not; defaults to ``True``, so that all participants receive the same credit. fail_on_timeout: Whether the participants should be failed when the ``max_loop_time`` is reached. Setting this to ``False`` will not return the ``UnsuccessfulEndPage`` when maximum time has elapsed but allow them to proceed to the next page. Returns ------- list A list of elts that can be embedded in a timeline using :func:`psynet.timeline.join`. """ start_while = StartWhile(label) end_while = EndWhile(label) logic = join(logic) logic = multiply_expected_repetitions(logic, expected_repetitions) def condition_wrapped(participant, experiment): result = call_function_with_context( condition, participant=participant, experiment=experiment ) logger.info(f"Evaluating while_loop ({label}) condition: result = {result}") return result conditional_logic = join(logic, GoTo(start_while)) def with_namespace(x=None): prefix = f"__{label}__{x}" if x is None: return prefix return f"{prefix}__{x}" if max_loop_time is not None: max_loop_time_condition = ( lambda participant, experiment: ( datetime.now() - unserialise_datetime( participant.var.get(with_namespace("loop_start_time")) ) ).seconds > max_loop_time ) else: max_loop_time_condition = lambda participant, experiment: False # noqa: E731 from .page import UnsuccessfulEndPage if fail_on_timeout is True: after_timeout_logic = UnsuccessfulEndPage( failure_tags=[f"while_loop:{label}", "fail_on_timeout"] ) else: after_timeout_logic = GoTo(end_while) time_estimate = CreditEstimate(logic).get_max("time") elts = join( CodeBlock( lambda participant: participant.var.set( with_namespace("loop_start_time"), serialise(datetime.now()) ) ), start_while, conditional( "max_loop_time_condition", lambda participant, experiment: call_function_with_context( max_loop_time_condition, participant=participant, experiment=experiment, ), after_timeout_logic, # The while loop includes its own progress bounds, so we don't need to bound progress # within this inner component. bound_progress=False, log_chosen_branch=False, time_estimate=0.0, ), conditional( label, condition_wrapped, conditional_logic, # The while loop includes its own progress bounds, so we don't need to bound progress here. # Moreover, this conditional contains a GoTo, which will cause the progress bound logic # to fail if enabled here. bound_progress=False, log_chosen_branch=False, time_estimate=time_estimate, ), end_while, ) elts = with_fixed_progress(elts, time_estimate) if fix_time_credit: elts = with_fixed_time_credit(elts, time_estimate) return elts
def check_branches(branches): for branch_name, branch_elts in branches.items(): branches[branch_name] = join(branch_elts) return branches
[docs] def switch( label: str, function: Callable, branches: dict, fix_time_credit: bool = False, bound_progress: bool = True, log_chosen_branch: bool = True, time_estimate: float = None, ): """ Selects a series of elts to display to the participant according to a certain condition. Parameters ---------- label: Internal label to assign to the construct. function: A function with up to two arguments named ``participant`` and ``experiment``, that is executed once the participant reaches the corresponding part of the timeline, returning a key value with which to index ``branches``. branches: A dictionary indexed by the outputs of ``function``; each value should correspond to an elt (or list of elts) that can be selected by ``function``. fix_time_credit: Whether participants should receive the same time credit irrespective of the branch taken. Defaults to ``False``; if set to ``True``, all participants receive the same credit, corresponding to the branch with the maximum time credit. bound_progress: Whether the progress estimate should be 'bound' such that, whatever happens, when the participant exits the conditional construct, the progress estimate will be the same as if the participant had taken the branch with the maximum time credit. Defaults to ``True``. log_chosen_branch: Whether to keep a log of which participants took each branch; defaults to ``True``. time_estimate: An optional time estimate to use for the switch construct. If not provided, the time estimate will be estimated by computing time estimates for all branches and taking the maximum. Returns ------- list A list of elts that can be embedded in a timeline using :func:`psynet.timeline.join`. """ check_function_args(function, ("self", "experiment", "participant"), need_all=False) branches = check_branches(branches) all_branch_starts = dict() all_elts = [] end_switch = EndSwitch(label) for branch_name, branch_elts in branches.items(): branch_start = StartSwitchBranch(branch_name) branch_end = EndSwitchBranch(branch_name, end_switch) branch_elts = join(branch_elts) all_branch_starts[branch_name] = branch_start all_elts = all_elts + [branch_start] + branch_elts + [branch_end] start_switch = StartSwitch( label, function, branch_start_elts=all_branch_starts, end_switch=end_switch, log_chosen_branch=log_chosen_branch, ) combined_elts = [start_switch] + all_elts + [end_switch] if time_estimate is None: time_estimate = max( [ CreditEstimate(branch_elts).get_max("time") for branch_elts in branches.values() ] ) if bound_progress: combined_elts = with_fixed_progress(combined_elts, time_estimate) if fix_time_credit: combined_elts = with_fixed_time_credit(combined_elts, time_estimate) return combined_elts
class StartSwitch(ReactiveGoTo): def __init__( self, label, function, branch_start_elts, end_switch, log_chosen_branch=True ): if log_chosen_branch: def function_2(experiment, participant): val = call_function_with_context( function, experiment=experiment, participant=participant, ) log_entry = [label, val] participant.append_branch_log(log_entry) return val super().__init__(function_2, targets=branch_start_elts) else: super().__init__(function, targets=branch_start_elts) self.label = label self.branch_start_elts = branch_start_elts self.end_switch = end_switch self.log_chosen_branch = log_chosen_branch class EndSwitch(NullElt): def __init__(self, label): super().__init__() self.label = label class StartSwitchBranch(NullElt): def __init__(self, name): super().__init__() self.name = name class EndSwitchBranch(GoTo): def __init__(self, name, final_elt): super().__init__(target=final_elt) self.name = name
[docs] def conditional( label: str, condition: Callable, logic_if_true, logic_if_false=None, fix_time_credit: bool = False, bound_progress: bool = True, log_chosen_branch: bool = True, time_estimate: float = None, ): """ Executes a series of elts if and only if a certain condition is satisfied. Parameters ---------- label: Internal label to assign to the construct. condition: A function with up to two arguments named ``participant`` and ``experiment``, that is executed once the participant reaches the corresponding part of the timeline, returning a Boolean. logic_if_true: An elt (or list of elts) to display if ``condition`` returns ``True``. logic_if_false: An optional elt (or list of elts) to display if ``condition`` returns ``False``. fix_time_credit: Whether participants should receive the same time credit irrespective of the branch taken. Defaults to ``False``; if set to ``True``, all participants receive the same credit, corresponding to the branch with the maximum time credit. bound_progress: Whether the progress estimate should be 'bound' such that, whatever happens, when the participant exits the conditional construct, the progress estimate will be the same as if the participant had taken the branch with the maximum time credit. Defaults to ``True``. log_chosen_branch: Whether to keep a log of which participants took each branch; defaults to ``True``. time_estimate: An optional time estimate to use for the conditional construct. If not provided, the time estimate will be estimated by computing time estimates for the two branches and taking the maximum. Returns ------- list A list of elts that can be embedded in a timeline using :func:`psynet.timeline.join`. """ return switch( label, function=condition, branches={ True: logic_if_true, False: NullElt() if logic_if_false is None else logic_if_false, }, fix_time_credit=fix_time_credit, bound_progress=bound_progress, log_chosen_branch=log_chosen_branch, time_estimate=time_estimate, )
class ConditionalElt(Elt): def __init__(self, label: str): super().__init__() self.label = label class StartConditional(ConditionalElt): pass class EndConditional(ConditionalElt): pass def with_fixed_progress(elts: List[Elt], time_credit: float): """ Ensures that, when the provided list of elts has been consumed, the participant's progress corresponds exactly to the specified time credit, irrespective of whatever happens within those elts. Parameters ---------- elts : A list of timeline Elts. time_credit : The progress increment is calculated as if the participant had acquired this amount of time credit (in units of seconds). """ end_fix = EndFixProgress(time_credit) start_fix = StartFixProgress(time_credit, end_fix) return join( start_fix, elts, end_fix, ) def with_fixed_time_credit(elts, time_credit): """ Ensures that, when the provided list of elts has been consumed, the participant's resulting time credit corresponds exactly to the specified value, irrespective of whatever happens within those elts. Parameters ---------- elts : A list of timeline Elts. time_credit : The amount of time credit to allocate (in units of seconds). """ end_fix = EndFixTimeCredit(time_credit) start_fix = StartFixTimeCredit(time_credit, end_fix) return join(start_fix, elts, end_fix) def multiply_expected_repetitions(logic, factor: float): assert isinstance(logic, Elt) or is_list_of(logic, Elt) if isinstance(logic, Elt): logic.multiply_expected_repetitions(factor) else: for elt in logic: elt.multiply_expected_repetitions(factor) return logic @register_table class ModuleState(SQLBase, SQLMixin): __tablename__ = "module_state" id = Column(Integer, primary_key=True, index=True, autoincrement=True, unique=True) module_id = Column(String) # parent_id = Column(Integer, ForeignKey("module_state.id")) # parent = relationship("ModuleState", foreign_keys=[parent_id], post_update=True) participant_id = Column( Integer, ForeignKey("participant.id"), # back_populates="_module_states", ) participant = relationship( "psynet.participant.Participant", foreign_keys=[participant_id], backref=backref("_module_states", post_update=True, lazy="selectin"), post_update=True, ) # current_trial = Column( # PythonObject # ) # Note: this can sometimes be a trial object or alternatively a string @property def var(self): return VarStore(self) time_started = Column(DateTime) time_finished = Column(DateTime) time_aborted = Column(DateTime) started = Column(Boolean, default=False) finished = Column(Boolean, default=False) aborted = Column(Boolean, default=False) asset_links = relationship( "AssetModuleState", collection_class=attribute_mapped_collection("local_key"), cascade="all, delete-orphan", ) @staticmethod def _create_asset_module_state(local_key, asset): from psynet.asset import AssetModuleState return AssetModuleState(local_key=local_key, asset=asset) assets = association_proxy( "asset_links", "asset", creator=lambda k, v: _create_asset_module_state(local_key=k, asset=v), # noqa ) nodes = relationship("psynet.trial.main.TrialNode") def __init__(self, module, participant): self.module_id = module.id self.participant = participant def start(self): self.time_started = datetime.now() self.started = True def finish(self): self.time_finished = datetime.now() self.finished = True def abort(self): self.time_finished = datetime.now() self.aborted = True # def get(self, module_id: str): # return self.participant.get_module_state(module_id) class ModuleAssets: def __init__(self, module_id): self.module_id = module_id def __getitem__(self, item): from psynet.asset import Asset return Asset.query.filter_by( module_id=self.module_id, key_within_module=item ).one() class Module(EltCollection): default_id = None default_elts = None state_class = ModuleState # type: Type[ModuleState] def __init__( self, id_: str = None, *args, assets=None, nodes=None, state_class=None ): elts = join(*args) if self.default_id is None and id_ is None: raise ValueError("Either one of <default_id> or <id_> must not be None.") if self.default_elts is None and elts is None: raise ValueError("Either one of <default_elts> or <elts> must not be None.") self.id = id_ if id_ is not None else self.default_id self.elts = elts if elts is not None else self.default_elts self.nodes = nodes if nodes else [] if assets is None: self._staged_assets = [] elif isinstance(assets, dict): self._staged_assets = [] for _key_within_module, _asset in assets.items(): _asset.key_within_module = _key_within_module self._staged_assets.append(_asset) else: assert isinstance(assets, list) self._staged_assets = assets self.state_class = state_class if state_class else self.__class__.state_class from psynet.asset import Asset for elt in self.elts: if isinstance(elt, Asset): self._staged_assets.append(elt) for asset in self._staged_assets: asset.module_id = self.id for node in self.nodes: if node.module_id is not None and node.module_id != self.id: raise RuntimeError( "Nodes cannot belong to multiple modules/trial makers. " "Please make a separate node list for each one." ) node.module_id = self.id @property def assets(self): return ModuleAssets(self.id) def prepare_for_deployment(self, experiment): self.prepare_nodes_for_deployment(experiment) self.prepare_assets_for_deployment(experiment) def prepare_nodes_for_deployment(self, experiment): self.nodes_register_in_db() self.nodes_stage_assets(experiment) def prepare_assets_for_deployment(self, experiment): for asset in self._staged_assets: experiment.assets.stage(asset) db.session.commit() def deposit_assets_on_the_fly(self): assets_to_deposit = [ asset for asset in self._staged_assets if not asset.deposited ] if len(assets_to_deposit) > 0: logger.info( "Depositing %i assets on-the-fly (i.e. while the participant waits for the " "experiment to continue. This is a bad idea if the number of assets is large " "and if they need to be uploaded to a remote server. " "To avoid this, avoid defining your module/trial maker within a page maker.", len(assets_to_deposit), ) for asset in assets_to_deposit: # TODO - parallelize this deposit, see code in Experiment class asset.deposit() def nodes_register_in_db(self): for node in self.nodes: db.session.add(node) assert node.module_id == self.id if node.network is None: node.add_default_network() db.session.commit() for node in self.nodes: node.check_on_deploy() db.session.commit() def nodes_stage_assets(self, experiment): for node in self.nodes: node.stage_assets(experiment) db.session.commit() def start(self, participant): participant.start_module(self) def end(self, participant): participant.end_module(self) @classmethod def started_and_finished_times(cls, participants, module_id): logs = cls.state_class.query.filter_by(module_id=module_id, finished=True).all() return [ {"time_started": log.time_started, "time_finished": log.time_finished} # "time_aborted": log.time_aborted, for log in logs ] @classmethod def median_finish_time_in_s(cls, participants, module_id): started_and_finished_times = cls.started_and_finished_times( participants, module_id ) if not started_and_finished_times: return None durations_in_s = [] for start_end_times in started_and_finished_times: if not ( start_end_times["time_started"] and start_end_times["time_finished"] ): continue t1 = start_end_times["time_started"] t2 = start_end_times["time_finished"] durations_in_s.append((t2 - t1).total_seconds()) if not durations_in_s: return None return median(sorted(durations_in_s)) @classmethod def median_finish_time_in_min_and_s(cls, participants, module_id): return pretty_format_seconds( cls.median_finish_time_in_s(participants, module_id) ) @property def aborted_participants(self): from .participant import Participant aborted_participants = ( db.session.query(Participant) .filter(self.state_class.module_id == self.id, self.state_class.aborted) .all() ) return sorted( [p for p in aborted_participants if self.id in p.aborted_modules], key=lambda p: p.module_states[self.id][0].time_aborted, ) @property def started_participants(self): from .participant import Participant started_participants = ( db.session.query(Participant) .filter(self.state_class.module_id == self.id, self.state_class.started) .all() ) return sorted( [p for p in started_participants if self.id in p.started_modules], key=lambda p: p.module_states[self.id][0].time_started, ) @property def finished_participants(self): from .participant import Participant finished_participants = ( db.session.query(Participant) .filter(self.state_class.module_id == self.id, self.state_class.finished) .all() ) return sorted( [p for p in finished_participants if self.id in p.finished_modules], key=lambda p: p.module_states[self.id][0].time_finished, ) def resolve(self): return join( StartModule(self.id, module=self), self.elts, EndModule(self.id, module=self), ) def visualize(self): if self.started_participants: time_started_last = ( self.started_participants[-1].module_states[self.id][0].time_started ) if self.finished_participants: time_finished_last = ( self.finished_participants[-1].module_states[self.id][0].time_finished ) median_finish_time_in_min_and_s = Module.median_finish_time_in_min_and_s( self.finished_participants, self.id ) if self.aborted_participants: time_aborted_last = ( self.aborted_participants[-1].module_states[self.id][0].time_aborted ) div = tags.div() with div: with tags.h4(): tags.b(f"Module: {self.id}") with tags.ul(cls="details"): tags.b("Participants:") if self.started_participants: tags.li( f"{len(self.started_participants)} started (last at {format_datetime(time_started_last)})" ) if self.finished_participants: tags.li( f"{len(self.finished_participants)} finished (last at {format_datetime(time_finished_last)})" ) if self.aborted_participants: tags.li( f"{len(self.aborted_participants)} aborted (last at {format_datetime(time_aborted_last)})" ) if self.finished_participants: tags.br() tags.li( f"Median time spent to finish: {median_finish_time_in_min_and_s}" ) return div.render() def visualize_tooltip(self): if self.finished_participants: median_finish_time_in_min_and_s = Module.median_finish_time_in_min_and_s( self.finished_participants, self.id ) span = tags.span() with span: tags.b(self.id) tags.br() tags.span( f"{len(self.started_participants)} started, {len(self.finished_participants)} finished," ) tags.br() tags.span(f"{len(self.aborted_participants)} aborted") if self.finished_participants: tags.br() tags.span(f"{median_finish_time_in_min_and_s} (median)") return span.render() def get_progress_info(self, participant_counts, **kwargs): target_n_participants = ( self.target_n_participants if hasattr(self, "target_n_participants") else None ) # TODO a more sophisticated calculation of progress progress = ( participant_counts["finished"] / target_n_participants if target_n_participants is not None and target_n_participants > 0 else 1 ) return { self.id: { "started_n_participants": participant_counts["started"], "finished_n_participants": participant_counts["finished"], "aborted_n_participants": participant_counts["aborted"], "target_n_participants": target_n_participants, "progress": progress, } } class StartModule(NullElt): def __init__(self, label, module): super().__init__() self.label = label self.module = module def consume(self, experiment, participant): self.module.start(participant) if self.created_within_page_maker: self.module.deposit_assets_on_the_fly() class EndModule(NullElt): def __init__(self, label, module): super().__init__() self.label = label self.module = module def consume(self, experiment, participant): self.module.end(participant) class StartAccumulateAnswers(NullElt): def consume(self, experiment, participant): participant.answer_accumulators = participant.answer_accumulators + [{}] class EndAccumulateAnswers(NullElt): def consume(self, experiment, participant): participant.answer = participant.answer_accumulators[-1] participant.answer_accumulators = participant.answer_accumulators[:-1] class DatabaseCheck(NullElt): def __init__(self, label, function): super().__init__() check_function_args(function, args=[]) self.label = label self.function = function def run(self): start_time = time.monotonic() logger.info("Executing the database check '%s'...", self.label) try: self.function() end_time = time.monotonic() time_taken = end_time - start_time logger.info( "The database check '%s' completed in %s seconds.", self.label, f"{time_taken:.3f}", ) except Exception: logger.info( "An exception was thrown in the database check '%s'.", self.label, exc_info=True, )
[docs] class PreDeployRoutine(NullElt): """ A timeline component that allows for the definition of tasks to be performed before deployment. It is possible to make database changes as part of these routines and these will be propagated to the deployed experiment. Parameters ---------- label A label describing the pre-deployment task. function The name of a function to be executed. args The arguments for the function to be executed. """ def __init__(self, label, function, args=None): super().__init__() if args is None: args = {} provided_args = list(args.keys()) provided_args.append("experiment") check_function_args(function, args=provided_args, need_all=False) self.label = label self.function = function self.args = args
class ParticipantFailRoutine(NullElt): def __init__(self, label, function): super().__init__() check_function_args( function, args=["participant", "experiment"], need_all=False ) self.label = label self.function = function class RecruitmentCriterion(NullElt): def __init__(self, label, function): super().__init__() check_function_args(function, args=["experiment"], need_all=False) self.label = label self.function = function FOR_LOOP_STACK_DEPTH = -1 def for_loop( *, label: str, iterate_over: Union[Sequence, Callable[..., Sequence]], logic: Union["TimelineLogic", Callable[..., "TimelineLogic"]], time_estimate_per_iteration: Optional[float] = None, expected_repetitions=None, ): if time_estimate_per_iteration is None: if callable(logic): raise ValueError( "If logic is a callable, then time_estimate_per_iteration must be provided" ) else: time_estimate_per_iteration = CreditEstimate(logic).get_max("time") def estimate_num_repetitions(iterate_over): if not callable(iterate_over): return len(iterate_over) else: if len(get_args(iterate_over)) > 0: raise ValueError( "If iterate_over takes arguments then expected_repetitions cannot be inferred automatically " "and must be provided explicitly." ) return len(iterate_over()) def setup(experiment, participant): nonlocal iterate_over nonlocal label if callable(iterate_over): lst = call_function_with_context( iterate_over, experiment=experiment, participant=participant, ) else: lst = iterate_over state = {"lst": lst, "index": 0} # participant.for_loops.append(state) if label in participant.for_loops: raise ValueError( f"Duplicated for_loop label detected: {label}. " "This suggests that you have tried to nest two for loops with the same label, " "which is not permitted. Please disambiguate the labels." ) participant.for_loops[label] = state flag_modified(participant, "for_loops") def wrapup(experiment, participant): nonlocal label del participant.for_loops[label] flag_modified(participant, "for_loops") def content(experiment, participant): # global FOR_LOOP_STACK_DEPTH # FOR_LOOP_STACK_DEPTH += 1 # state = participant.for_loops[FOR_LOOP_STACK_DEPTH] if not callable(logic): return logic nonlocal label state = participant.for_loops[label] lst = state["lst"] index = state["index"] input = lst[index] return call_function_with_context( logic, input, experiment=experiment, participant=participant, ) def should_stay_in_loop(participant): nonlocal label # state = participant.for_loops[-1] state = participant.for_loops[label] return state["index"] < len(state["lst"]) def increment_counter(participant): # state = participant.for_loops[-1] nonlocal label state = participant.for_loops[label] state["index"] += 1 flag_modified(participant, "for_loops") return join( CodeBlock(setup), while_loop( "for_loop", should_stay_in_loop, logic=join( PageMaker(content, time_estimate_per_iteration), CodeBlock(increment_counter), ), expected_repetitions=( expected_repetitions if expected_repetitions else estimate_num_repetitions(iterate_over) ), fix_time_credit=False, ), CodeBlock(wrapup), ) def sequence( *, label: str, function: Callable, logic: list, ): """ Administers a sequence of logical units in an order determined by a function. This could be used, for example, to determine the order of a series of questionnaires. See ``randomize`` for a special case where the order is randomized. Parameters ---------- label: Internal label to assign to the construct. function: A function with up to two arguments named ``participant`` and ``experiment``, that is executed once the participant reaches the corresponding part of the timeline, returning a list of indices that will be used to determine the order of the sequence. logic: A list of logical units to be administered in the order determined by ``function``. Each element should be a unit of timeline logic, for example a trial maker or a sequence of Elts created through the join function. """ assert isinstance(logic, list) for elt in logic: if isinstance(elt, (StartModule, StartSwitch)): raise ValueError( f"Saw an unexpected element within `sequence`: f{elt} ." "Perhaps you are misusing the function? " "`logic` should be a list where each element is a unit of timeline to be inserted into a sequence. " "This could be a page, or it could be a module, a trial maker, or something like that. " "Note that you do NOT want to pass the output of `join` directly to `sequence`." ) sequence_length = len(logic) def initialize_sequence(participant, experiment): seq = call_function_with_context( function, participant=participant, experiment=experiment ) assert isinstance(seq, list) assert len(seq) == sequence_length participant.sequences.append(seq) flag_modified(participant, "sequences") def sequence_is_not_finished(participant): return len(participant.sequences[-1]) > 0 def get_current_position(participant): return participant.sequences[-1][0] def progress_sequence(participant): participant.sequences[-1].pop(0) flag_modified(participant, "sequences") def tear_down_sequence(participant): participant.sequences.pop() flag_modified(participant, "sequences") label = f"sequence_{label}" return join( CodeBlock(initialize_sequence), while_loop( label=label, condition=sequence_is_not_finished, logic=join( switch( label=label, function=get_current_position, branches={i: logic[i] for i in range(sequence_length)}, ), CodeBlock(progress_sequence), ), expected_repetitions=sequence_length, fix_time_credit=False, ), CodeBlock(tear_down_sequence), ) def randomize(*, label, logic): """ Randomizes the order of a series of logical units. This could be used, for example, to randomize the order of a series of questionnaires. Each participant will receive a different random order. Parameters ---------- label: Internal label to assign to the construct. logic: A list to be randomized. Each element should be a unit of timeline logic, for example a trial maker or a sequence of Elts created through the join function. """ n = len(logic) return sequence( label=label, function=lambda participant: random.sample(range(n), k=n), logic=logic, ) class RegisterTrialMaker(NullElt): def __init__(self, trial_maker): super().__init__() self.trial_maker_id = trial_maker.id self.trial_maker = trial_maker TimelineLogic = Union[Elt, List[Elt], EltCollection]