Source code for psynet.trial.main

# pylint: disable=unused-argument

import datetime
import random
from math import isnan
from typing import List, Optional, Union

import dallinger.experiment
import dallinger.models
import dallinger.nodes
from dallinger import db
from dallinger.models import Info, Network
from dominate import tags
from markupsafe import Markup
from sqlalchemy import (
    Boolean,
    Column,
    Float,
    ForeignKey,
    Integer,
    String,
    and_,
    func,
    not_,
    or_,
    select,
)
from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import column_property, declared_attr, deferred, relationship
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.orm.collections import attribute_mapped_collection

from psynet import field

from ..asset import Asset, AssetNetwork, AssetNode, AssetTrial
from ..data import SQLMixinDallinger
from ..error import (  # noqa  # Importing the error module is important to ensure sqlalchemy is happy
    ErrorRecord,
)
from ..field import PythonDict, PythonObject, VarStore
from ..page import InfoPage, UnsuccessfulEndPage, WaitPage, wait_while
from ..participant import Participant
from ..process import WorkerAsyncProcess
from ..sync import GroupBarrier, SyncGroup
from ..timeline import (
    CodeBlock,
    DatabaseCheck,
    Module,
    ModuleState,
    PageMaker,
    ParticipantFailRoutine,
    PreDeployRoutine,
    RecruitmentCriterion,
    RegisterTrialMaker,
    conditional,
    join,
    switch,
    while_loop,
)
from ..utils import (
    NoArgumentProvided,
    call_function,
    call_function_with_context,
    corr,
    get_logger,
    is_method_overridden,
    log_time_taken,
)

logger = get_logger()


def with_trial_maker_namespace(trial_maker_id: str, x: Optional[str] = None):
    if x is None:
        return trial_maker_id
    return f"{trial_maker_id}__{x}"


# Patch the relationship from Dallinger
Info.origin = relationship(
    "dallinger.models.Node", foreign_keys=[Info.origin_id], post_update=True
)  # type: TrialNode


[docs] class Trial(SQLMixinDallinger, Info): """ Represents a trial in the experiment. The user is expected to override the following methods: * :meth:`~psynet.trial.main.Trial.make_definition`, responsible for deciding on the content of the trial. * :meth:`~psynet.trial.main.Trial.show_trial`, determines how the trial is turned into a webpage for presentation to the participant. * :meth:`~psynet.trial.main.Trial.show_feedback`, defines an optional feedback page to be displayed after the trial. The user must also override the ``time_estimate`` class attribute, providing the estimated duration of the trial in seconds. This is used for predicting the participant's performance reward and for constructing the progress bar. The user may also wish to override the :meth:`~psynet.trial.main.Trial.async_post_trial` method if they wish to implement asynchronous trial processing. They may also override the :meth:`~psynet.trial.main.Trial.score_answer` method if they wish to implement trial-level scoring; for scoring methods that work by analyzing multiple trials at the same time (e.g., test-retest correlations), see the trial maker method :meth:`~psynet.trial.main.TrialMaker.performance_check`. This class subclasses the :class:`~dallinger.models.Info` class from Dallinger, hence can be found in the ``Info`` table in the database. It inherits this class's methods, which the user is welcome to use if they seem relevant. Instances can be retrieved using *SQLAlchemy*; for example, the following command retrieves the ``Trial`` object with an ID of 1: :: Trial.query.filter_by(id=1).one() Parameters ---------- experiment: An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. node: An object of class :class:`dallinger.models.Node` to which the :class:`~dallinger.models.Trial` object should be attached. Complex experiments are often organised around networks of nodes, but in the simplest case one could just make one :class:`~dallinger.models.Network` for each type of trial and one :class:`~dallinger.models.Node` for each participant, and then assign the :class:`~dallinger.models.Trial` to this :class:`~dallinger.models.Node`. Ask us if you want to use this simple use case - it would be worth adding it as a default to this implementation, but we haven't done that yet, because most people are using more complex designs. participant: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. propagate_failure : bool Whether failure of a trial should be propagated to other parts of the experiment depending on that trial (for example, subsequent parts of a transmission chain). Attributes ---------- time_estimate : numeric The estimated duration of the trial (including any feedback), in seconds. This should generally correspond to the (sum of the) ``time_estimate`` parameters in the page(s) generated by ``show_trial``, plus the ``time_estimate`` parameter in the page generated by ``show_feedback`` (if defined). This is used for predicting the participant's performance reward and for constructing the progress bar. participant_id : int The ID of the associated participant. The user should not typically change this directly. Stored in ``property1`` in the database. node The :class:`dallinger.models.Node` to which the :class:`~dallinger.models.Trial` belongs. complete : bool Whether the trial has been completed (i.e. received a response from the participant). The user should not typically change this directly. finalized : bool Whether the trial has been finalized. This is a stronger condition than ``complete``; in particular, a trial is only marked as finalized once its async processes have completed (if it has any). One day we might extend this to include arbitrary conditions, for example waiting for another user to evaluate that trial, or similar. answer : Object The response returned by the participant. This is serialised to JSON, so it shouldn't be too big. The user should not typically change this directly. Stored in ``details`` in the database. parent_trial_id : int If the trial is a repeat trial, this attribute corresponds to the ID of the trial from which that repeat trial was cloned. earliest_async_process_start_time : Optional[datetime] Time at which the earliest pending async process was called. propagate_failure : bool Whether failure of a trial should be propagated to other parts of the experiment depending on that trial (for example, subsequent parts of a transmission chain). var : :class:`~psynet.field.VarStore` A repository for arbitrary variables; see :class:`~psynet.field.VarStore` for details. definition : Object An arbitrary Python object that somehow defines the content of a trial. Often this will be a dictionary comprising a few named parameters. The user should not typically change this directly, as it is instead determined by :meth:`~psynet.trial.main.Trial.make_definition`. run_async_post_trial : bool Set this to ``False`` if you want to disable :meth:`~psynet.trial.main.Trial.async_post_trial`. This is only included for back-compatibility. wait_for_feedback : bool Set this class attribute to ``False`` if you don't want to wait for asynchronous processes to complete before giving feedback. The default is to wait. accumulate_answers : bool Set this class attribute to ``True`` if the trial contains multiple pages and you want the answers to all of these pages to be stored as a dict in ``participant.answer``. Otherwise, the default behaviour is to only store the answer from the final page. time_credit_before_trial: float Reports the amount of time credit that the participant had before they started the trial (in seconds). time_credit_after_trial: float Reports the amount of time credit that the participant had after they finished the trial (in seconds). time_credit_from_trial: float Reports the amount of time credit that was allocated to the participant on the basis of this trial (in seconds). This should be equal to ``time_credit_after_trial - time_credit_before_trial``. progress_before_trial: float Reports the progress that the participant had before they started the trial. Mainly useful for debugging. progress_after_trial: float Reports the progress that the participant had after they finished the trial. Mainly useful for debugging. check_time_credit_received : bool If ``True`` (default), PsyNet will check at the end of the trial whether the participant received the expected amount of time credit. If the received amount is inconsistent with the amount specified by ``time_estimate``, then a warning message will be delivered, suggesting a revised value for ``time_estimate``. response_id : int ID of the associated :class:`~psynet.timeline.Response` object. Equals ``None`` if no such object has been created yet. response : The associated :class:`~psynet.timeline.Response` object, which records in detail the response received from the participant's web browser. Equals ``None`` if no such object has been created yet. """ # pylint: disable=unused-argument __extra_vars__ = SQLMixinDallinger.__extra_vars__.copy() node_id = Column(Integer, ForeignKey("node.id"), index=True) participant_id = Column(Integer, ForeignKey("participant.id"), index=True) # module_id = Column(String) module_id = association_proxy("module_state", "module_id") module_state_id = Column(Integer, ForeignKey("module_state.id"), index=True) module_state = relationship("ModuleState", foreign_keys=[module_state_id]) trial_maker_id = Column(String, index=True) definition = Column(PythonObject) @declared_attr def complete(cls): # Dallinger v9.6.0 adds an Info.complete column. # The following code inherits that column if it exists. return cls.__table__.c.get("complete", Column(Boolean)) finalized = Column(Boolean) is_repeat_trial = Column(Boolean) score = Column(Float) performance_reward = Column(Float) parent_trial_id = Column(Integer, ForeignKey("info.id"), index=True) answer = Column(PythonObject) propagate_failure = Column(Boolean) response_id = Column(Integer, ForeignKey("response.id"), index=True) repeat_trial_index = Column(Integer) n_repeat_trials = Column(Integer) time_taken = Column(Float) _initial_assets = deferred(Column(PythonDict)) time_credit_before_trial = Column(Float) time_credit_after_trial = Column(Float) time_credit_from_trial = Column(Float) progress_before_trial = Column(Float) progress_after_trial = Column(Float) async_post_trial_required = Column(Boolean, default=False, index=True) async_post_trial_requested = Column(Boolean, default=False, index=True) async_post_trial_complete = Column(Boolean, default=False, index=True) async_post_trial_failed = Column(Boolean, default=False, index=True) @hybrid_property def async_post_trial_pending(self): return self.async_post_trial_requested and not ( self.async_post_trial_complete or self.async_post_trial_failed ) @async_post_trial_pending.expression def async_post_trial_pending(cls): return and_( cls.async_post_trial_requested, not_( or_( cls.async_post_trial_complete, cls.async_post_trial_failed, ) ), ) node = relationship( "TrialNode", foreign_keys=[node_id], back_populates="all_trials", post_update=True, ) participant = relationship( "psynet.participant.Participant", foreign_keys=[participant_id], backref="all_trials", post_update=True, ) parent_trial = relationship( "psynet.trial.main.Trial", foreign_keys=[parent_trial_id], uselist=False ) response = relationship("psynet.timeline.Response") async_processes = relationship("AsyncProcess") asset_links = relationship( "AssetTrial", collection_class=attribute_mapped_collection("local_key"), cascade="all, delete-orphan", ) assets = association_proxy( "asset_links", "asset", creator=lambda k, v: AssetTrial(local_key=k, asset=v) ) errors = relationship("ErrorRecord") time_estimate = None check_time_credit_received = True wait_for_feedback = True # determines whether feedback waits for async_post_trial accumulate_answers = False @property def var(self): return VarStore(self) @property def position(self): """ Returns the position of the current trial within that participant's current trial maker (0-indexed). This can be used, for example, to display how many trials the participant has taken so far. """ trials = self.get_for_participant( self.participant_id, self.network.trial_maker_id ) trial_ids = [t.id for t in trials] return trial_ids.index(self.id)
[docs] @classmethod def get_for_participant(cls, participant_id: int, trial_maker_id: int = None): """ Returns all trials for a given participant. """ query = ( db.session.query(cls) .join(TrialNetwork) .filter(Trial.participant_id == participant_id) ) if trial_maker_id is not None: query = query.filter(TrialNetwork.trial_maker_id == trial_maker_id) return query.order_by(Trial.id).all()
@property def visualization_html(self): experiment = dallinger.experiment.load() participant = self.participant page = self.show_trial(experiment=experiment, participant=participant) return page.visualize(trial=self)
[docs] def fail(self, reason=None): if not self.failed: logger.info(f"Failing trial (id: {self.id}, reason: {reason})") super().fail(reason=reason)
@property def ready_for_feedback(self): """ Determines whether a trial is ready to give feedback to the participant. """ msg = f"Participant {self.participant.id}: Checking if the trial is ready for feedback... " if not self.complete: msg += "no, because the trial is not complete." outcome = False elif not self.wait_for_feedback: msg += "yes, because we don't need to wait for feedback." outcome = True elif self.asset_deposit_pending: msg += "no, because the trial is awaiting an asset deposit." outcome = False elif self.async_post_trial_pending: msg += "no, because the trial is awaiting async_post_trial." outcome = False else: msg += "yes, all conditions are satisfied." outcome = True logger.info(msg) return outcome def __init__( self, experiment, node, participant, propagate_failure, # If the trial fails, should its failure be propagated to its descendants? is_repeat_trial, # Is the trial a repeat trial? parent_trial=None, # If the trial is a repeat trial, what is its parent? repeat_trial_index=None, # Only relevant if the trial is a repeat trial n_repeat_trials=None, # Only relevant if the trial is a repeat trial assets=None, definition=NoArgumentProvided, # If provided, overrides make definition ): super().__init__(origin=node) self.node = node # self.node_id = node.id self.complete = False self.finalized = False self.participant_id = participant.id self.propagate_failure = propagate_failure self.is_repeat_trial = is_repeat_trial self.parent_trial = parent_trial self.repeat_trial_index = repeat_trial_index self.n_repeat_trials = n_repeat_trials self.score = None self.response_id = None self.time_taken = None self.trial_maker_id = node.trial_maker_id self.module_state = participant.module_state self.vars = {} self.async_post_trial_required = is_method_overridden( self, Trial, "async_post_trial" ) self.async_post_trial_requested = False self.async_post_trial_complete = False self.async_post_trial_failed = False # self.module_id = node.module_id if assets is None: assets = {} if is_repeat_trial: self.definition = parent_trial.definition for k, v in parent_trial._initial_assets.items(): self.assets[k] = v else: self.assets = { **node.assets, **assets, } if definition == NoArgumentProvided: self.definition = self.make_definition(experiment, participant) assert self.definition is not None self.definition = self.finalize_definition( self.definition, experiment, participant ) flag_modified(self, "definition") assert self.definition is not None else: self.definition = definition db.session.add(self)
[docs] def to_dict(self): x = super().to_dict() field.json_unpack_field(x, "definition") field.json_unpack_field(x, "answer") return x
@property def trial_maker(self) -> "TrialMaker": from ..experiment import get_trial_maker if self.trial_maker_id: return get_trial_maker(self.trial_maker_id) @property def contents(self): return self.answer @contents.setter def contents(self, x): self.answer = x def _allocate_performance_reward(self): reward = self.compute_performance_reward(score=self.score) assert isinstance(reward, (float, int)) self._log_performance_reward(reward) self.performance_reward = reward self.participant.inc_performance_reward(reward) def _log_performance_reward(self, reward): logger.info( "Allocating a performance reward of $%.2f to participant %i for trial %i.", reward, self.participant.id, self.id, ) def add_assets(self, assets: dict): for local_key, asset in assets.items(): self.add_asset(local_key, asset) def add_asset(self, local_key, asset): if not asset.parent: asset.parent = self asset.receive_node_definition(self.definition) asset.local_key = local_key asset.set_keys() self.assets[local_key] = asset if not asset.deposited: asset.deposit()
[docs] def score_answer(self, answer, definition): """ Scores the participant's answer. At the point that this method is called, any pending asynchronous processes should already have been completed. Parameters ---------- answer : The answer provided to the trial. definition : The trial's definition Returns ------- A numeric score quantifying the participant's success. The experimenter is free to decide the directionality (whether high scores are better than low scores, or vice versa). Alternatively, ``None`` indicating no score. """ return None
[docs] def compute_performance_reward(self, score): """ Computes a performance reward to allocate to the participant as a reward for the current trial. By default, no performance reward is given. Note: this trial-level performance reward system is complementary to the trial-maker-level performance reward system, which computes an overall performance reward for the participant at the end of a trial maker. It is possible to use these two performance reward systems independently or simultaneously. See :meth:`~psynet.trial.main.TrialMaker.compute_performance_reward` for more details. Parameters ---------- score: The score achieved by the participant, as computed by :meth:`~psynet.trial.main.Trial.score_answer`. Returns ------- The resulting performance reward, typically in dollars. """ return 0.0
[docs] def json_data(self): x = super().json_data() x["participant_id"] = self.participant_id return x
[docs] def make_definition(self, experiment, participant): """ Creates and returns a definition for the trial, which will be later stored in the ``definition`` attribute. This can be an arbitrary object as long as it is serialisable to JSON. Parameters ---------- experiment: An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. """ raise NotImplementedError
[docs] def finalize_definition(self, definition, experiment, participant): """ This can be overridden to add additional randomized trial properties. For example: :: definition["bass_note"] = random.sample(10) It can also be used to add OnDemandAsset: :: self.add_assets({ "audio": OnDemandAsset( function=synth_stimulus, extension=".wav", ) """ return definition
def finalize_assets(self): for _, asset in self.assets.items(): asset.receive_node_definition(self.definition) if not asset.deposited: asset.deposit()
[docs] def show_trial(self, experiment, participant): """ Returns a :class:`~psynet.timeline.Page` object, or alternatively a list of such objects, that solicits an answer from the participant. Parameters ---------- experiment: An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. """ raise NotImplementedError
[docs] def show_feedback(self, experiment, participant): """ Returns a Page object displaying feedback (or None, which means no feedback). Parameters ---------- experiment: An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. """ return None
def _show_feedback(self, experiment, participant): return self.show_feedback(experiment=experiment, participant=participant) def gives_feedback(self, experiment, participant): return is_method_overridden(self, Trial, "show_feedback") # return ( # self._show_feedback(experiment=experiment, participant=participant) # is not None # ) run_async_post_trial = None
[docs] def async_post_trial(self): """ Optional function to be run after a trial is completed by the participant. Will only run if :attr:`~psynet.trial.main.Trial.run_async_post_trial` is set to ``True``. """ raise NotImplementedError
[docs] def format_answer(self, raw_answer, **kwargs): """ Optional function to be run after a trial is completed by the participant. """ return raw_answer
def call_async_post_trial(self): try: self.async_post_trial() except Exception: self.async_post_trial_failed = True raise self.async_post_trial_complete = True self.check_if_can_mark_as_finalized() def fail_async_processes(self, reason): super().fail_async_processes(reason) self.fail(reason="fail_async_processes") def new_repeat_trial(self, experiment, repeat_trial_index, n_repeat_trials): repeat_trial = self.__class__( experiment=experiment, node=self.origin, participant=self.participant, propagate_failure=False, is_repeat_trial=True, parent_trial=self, repeat_trial_index=repeat_trial_index, n_repeat_trials=n_repeat_trials, ) return repeat_trial def check_if_can_mark_as_finalized(self): if self.failed: logger.info("Cannot mark as finalized because the trial is failed.") elif self.asset_deposit_pending: logger.info( "Cannot mark as finalized yet because the trial is awaiting an asset deposit." ) elif self.async_post_trial_requested and not self.async_post_trial_complete: logger.info( "Cannot mark as finalized yet because the trial is awaiting async_post_trial." ) else: self.finalized = True self.on_finalized() def check_if_can_run_async_post_trial(self): msg = "Checking if we should run async_post_trial... " answer = False if self.async_post_trial_requested: msg += "no need, async_post_trial has already been requested." elif self.run_async_post_trial is not None and not self.run_async_post_trial: msg += "no need, as run_async_post_trial is False." elif not is_method_overridden(self, Trial, "async_post_trial"): msg += "no need, as no async_post_trial method is defined." elif self.asset_deposit_pending: msg += "the trial is awaiting an asset deposit, so we have to wait." else: msg = "All conditions seem to be satisfied, calling call_async_post_trial if it hasn't been called already." answer = True logger.info(msg) if answer: self.queue_async_post_trial() def queue_async_post_trial(self): self.async_post_trial_requested = True WorkerAsyncProcess( self.call_async_post_trial, label="post_trial", timeout=self.trial_maker.async_timeout_sec, trial=self, unique=True, ) def on_finalized(self): self.score = self.score_answer(answer=self.answer, definition=self.definition) self._allocate_performance_reward()
[docs] @classmethod def cue(cls, definition, assets=None): """ Use this method to add a trial directly into a timeline, without needing to create a corresponding trial maker. Parameters ---------- definition : This can be a ``dict`` object, which will then be saved to the trial's ``definition`` slot. Alternatively, it can be a ``Node`` object, in which case the ``Node`` object will be saved to ``trial.node``, and ``node.definition`` will be saved to ``trial.definition``. assets : Optional dictionary of assets to add to the trial (in addition to any provided by providing a ``Source`` containing assets to the ``definition`` parameter). """ from psynet.trial.chain import ChainNode if isinstance(definition, ChainNode): node = definition cls.check_node_is_valid(node) definition = node.definition elif isinstance(definition, dict): node = None else: raise TypeError(f"Invalid definition type: {type(definition)}") def _register_trial(experiment, participant): nonlocal node if not node: node = cls.get_default_parent_node(participant, experiment) trial = cls( experiment, node, participant, propagate_failure=False, is_repeat_trial=False, definition=definition, ) db.session.add(trial) participant.current_trial = trial if assets: trial.add_assets(assets) return join( CodeBlock(_register_trial), cls.trial_logic(), )
@classmethod def get_default_parent_node(cls, participant, experiment): module_id = participant.module_id try: return TrialNode.query.filter_by( module_id=module_id, ).one() except NoResultFound: node = GenericTrialNode(module_id, experiment) db.session.add(node) node.check_on_deploy() return node @classmethod def check_node_is_valid(cls, source): from sqlalchemy import inspect if not inspect(source).persistent: raise ValueError( f"The node with definition {source.definition} looks like it hasn't " "been registered in the database. This normally means that you are trying to " "access the node object in the wrong way. You should access it by writing " "(within a PageMaker or CodeBlock) " "lambda nodes: nodes['your_node_key'], " "or by writing a SQLAlchemy query, e.g. Node.query.filter_by(...).one()." ) @classmethod def trial_logic(cls, trial_maker=None): time_estimate = cls._get_trial_time_estimate(trial_maker) return join( CodeBlock(cls._log_time_credit_before_trial), CodeBlock(cls._log_progress_before_trial), PageMaker( lambda experiment, participant: participant.current_trial._show_trial( experiment, participant ), time_estimate=time_estimate, accumulate_answers=cls.accumulate_answers, ), cls._finalize_trial(trial_maker), cls._construct_feedback_logic(trial_maker), CodeBlock(cls._log_time_credit_after_trial), CodeBlock(cls._log_progress_after_trial), ) @classmethod def _get_trial_time_estimate(cls, trial_maker): if cls.time_estimate is not None: return cls.time_estimate elif trial_maker.time_estimate_per_trial is not None: return trial_maker.time_estimate_per_trial else: raise AttributeError( f"You need to provide either time_estimate as a class attribute of {cls.__name__} " "or time_estimate_per_trial as a class/instance attribute of the corresponding trial maker." ) def _show_trial(self, experiment, participant): return call_function_with_context( self.show_trial, experiment=experiment, participant=participant, trial_maker=self.trial_maker, ) @classmethod def _log_time_credit_before_trial(cls, participant): trial = participant.current_trial trial.time_credit_before_trial = participant.time_credit @classmethod def _log_progress_before_trial(cls, participant): trial = participant.current_trial trial.progress_before_trial = participant.progress @classmethod def _log_time_credit_after_trial(cls, participant): trial = participant.current_trial trial.time_credit_after_trial = participant.time_credit trial.time_credit_from_trial = ( trial.time_credit_after_trial - trial.time_credit_before_trial ) if trial.check_time_credit_received: original_estimate = cls._get_trial_time_estimate( trial_maker=trial.trial_maker ) actual = trial.time_credit_from_trial if actual != original_estimate: logger.info( f"Warning: Trial {trial.id} received an unexpected amount of time credit " f"(expected = {original_estimate} seconds; " f"actual = {actual} seconds). " f"Consider setting the trial's `time_estimate` parameter to {trial.time_credit_from_trial}." "You can disable this warning message by setting `Trial.check_time_credit_received = False`." ) @classmethod def _log_progress_after_trial(cls, participant): trial = participant.current_trial trial.progress_after_trial = participant.progress @classmethod def _finalize_trial(cls, trial_maker=None): def f(participant, experiment): logger.info("Calling _finalize_trial.") trial = participant.current_trial answer = participant.answer trial.answer = trial.format_answer(answer) trial.complete = True if trial_maker: trial_maker.finalize_trial( answer=trial.answer, trial=trial, experiment=experiment, participant=participant, ) trial.check_if_can_run_async_post_trial() trial.check_if_can_mark_as_finalized() return CodeBlock(f) @classmethod def _construct_feedback_logic(cls, trial_maker): if trial_maker: label = trial_maker.with_namespace("feedback") else: label = f"{cls.__name__}__feedback" return conditional( label=label, condition=lambda experiment, participant: ( participant.current_trial.gives_feedback(experiment, participant) ), logic_if_true=join( wait_while( lambda participant: not participant.current_trial.ready_for_feedback, expected_wait=0, log_message="Waiting for feedback to be ready.", check_interval=1.0, ), PageMaker( lambda experiment, participant: ( participant.current_trial.show_feedback( experiment=experiment, participant=participant ) ), time_estimate=0, ), ), fix_time_credit=False, log_chosen_branch=False, ) @hybrid_property def asset_deposit_pending(self): return any(not asset.deposited for asset in self.assets.values()) @asset_deposit_pending.expression def asset_deposit_pending(cls): return ( select(Asset.id) .where( Asset.trial_id == Trial.id, ~Asset.deposited, ) .exists() )
[docs] class TrialMakerState(ModuleState): participant_group = Column(String) in_repeat_phase = Column(Boolean) performance_check = Column(PythonDict) trials_to_repeat = Column(PythonObject) repeat_trial_index = Column(Integer) n_created_trials = Column(Integer) n_completed_trials = Column(Integer) trial_maker_initialized = Column(Boolean) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.n_created_trials = 0 self.n_completed_trials = 0 self.trial_maker_initialized = False
[docs] class TrialMaker(Module): """ Generic trial generation module, to be inserted in an experiment timeline. It is responsible for organising the administration of trials to the participant. Users are invited to override the following abstract methods/attributes: * :meth:`~psynet.trial.main.TrialMaker.prepare_trial`, which prepares the next trial to administer to the participant. * :meth:`~psynet.trial.main.TrialMaker.pre_deploy_routine` (optional), which defines a routine that sets up the experiment in advance of deployment (for example initialising and seeding networks). * :meth:`~psynet.trial.main.TrialMaker.init_participant` (optional), a function that is run when the participant begins this sequence of trials, intended to initialize the participant's state. Make sure you call ``super().init_participant`` when overriding this. * :meth:`~psynet.trial.main.TrialMaker.finalize_trial` (optional), which finalizes the trial after the participant has given their response. * :meth:`~psynet.trial.main.TrialMaker.on_complete` (optional), run once the sequence of trials is complete. * :meth:`~psynet.trial.main.TrialMaker.performance_check` (optional), which checks the performance of the participant with a view to rejecting poor-performing participants. * :meth:`~psynet.trial.main.TrialMaker.compute_performance_reward`; computes the final performance reward to assign to the participant. * :attr:`~psynet.trial.main.TrialMaker.n_trials_still_required` (optional), which is used to estimate how many more participants are still required in the case that ``recruit_mode="n_trials"``. * :attr:`~psynet.trial.main.TrialMaker.give_end_feedback_passed` (default = ``False``); if ``True``, then participants who pass the final performance check will be given feedback. This feedback can be customised by overriding :meth:`~psynet.trial.main.TrialMaker.get_end_feedback_passed_page`. Users are also invited to add new recruitment criteria for selection with the ``recruit_mode`` argument. This may be achieved using a custom subclass of :class:`~psynet.trial.main.TrialMaker` as follows: :: class CustomTrialMaker(TrialMaker): def new_recruit(self, experiment): if experiment.my_condition: return True # True means recruit more else: return False # False means don't recruit any more (for now) recruit_criteria = { **TrialMaker.recruit_criteria, "new_recruit": new_recruit } With the above code, you'd then be able to use ``recruit_mode="new_recruit"``. If you're subclassing a subclass of :class:`~psynet.trial.main.TrialMaker`, then just replace that subclass wherever :class:`~psynet.trial.main.TrialMaker` occurs in the above code. Parameters ---------- trial_class The class object for trials administered by this maker. expected_trials_per_participant Expected number of trials that the participant will take, including repeat trials (used for progress estimation). check_performance_at_end If ``True``, the participant's performance is evaluated at the end of the series of trials. check_performance_every_trial If ``True``, the participant's performance is evaluated after each trial. fail_trials_on_premature_exit If ``True``, a participant's trials are marked as failed if they leave the experiment prematurely. fail_trials_on_participant_performance_check If ``True``, a participant's trials are marked as failed if the participant fails a performance check. propagate_failure If ``True``, the failure of a trial is propagated to other parts of the experiment (the nature of this propagation is left up to the implementation). recruit_mode Selects a recruitment criterion for determining whether to recruit another participant. The built-in criteria are ``"n_participants"`` and ``"n_trials"``, though the latter requires overriding of :attr:`~psynet.trial.main.TrialMaker.n_trials_still_required`. target_n_participants Target number of participants to recruit for the experiment. All participants must successfully finish the experiment to count towards this quota. This target is only relevant if ``recruit_mode="n_participants"``. n_repeat_trials Number of repeat trials to present to the participant. These trials are typically used to estimate the reliability of the participant's responses. Defaults to ``0``. Attributes ---------- check_timeout_interval_sec : float How often to check for timeouts, in seconds (default = 30). Users are invited to override this. response_timeout_sec : float How long until a trial's response times out, in seconds (default = 60) (i.e. how long PsyNet will wait for the participant's response to a trial). This is a lower bound on the actual timeout time, which depends on when the timeout daemon next runs, which in turn depends on :attr:`~psynet.trial.main.TrialMaker.check_timeout_interval_sec`. Users are invited to override this. async_timeout_sec : float How long until an async process times out, in seconds (default = 300). This is a lower bound on the actual timeout time, which depends on when the timeout daemon next runs, which in turn depends on :attr:`~psynet.trial.main.TrialMaker.check_timeout_interval_sec`. Users are invited to override this. introduction An optional event or list of elts to execute prior to beginning the trial loop. give_end_feedback_passed : bool If ``True``, then participants who pass the final performance check are given feedback. This feedback can be customised by overriding :meth:`~psynet.trial.main.TrialMaker.get_end_feedback_passed_page`. performance_threshold : float Score threshold used by the default performance check method, defaults to 0.0. By default, corresponds to the minimum proportion of non-failed trials that the participant must achieve to pass the performance check. end_performance_check_waits : bool If ``True`` (default), then the final performance check waits until all trials no longer have any pending asynchronous processes. sync_group_type Optional SyncGroup type to use for synchronizing participant allocation to nodes. When this is set, then the ordinary node allocation logic will only apply to the 'leader' of each SyncGroup. The other members of this SyncGroup will follow that leader around, so that in every given trial the SyncGroup works on the same node together. sync_group_max_wait_time The maximum time that the participant will be allowed to wait for the SyncGroup to be ready. If this time is exceeded then the participant will be failed and the experiment will terminate early. Defaults to 45.0 seconds. """ state_class = TrialMakerState def __init__( self, id_: str, trial_class, expected_trials_per_participant: Union[int, float], check_performance_at_end: bool, check_performance_every_trial: bool, fail_trials_on_premature_exit: bool, fail_trials_on_participant_performance_check: bool, propagate_failure: bool, recruit_mode: str, target_n_participants: Optional[int], n_repeat_trials: int, assets: List, sync_group_type: Optional[str] = None, sync_group_max_wait_time: float = 45.0, ): if recruit_mode == "n_participants" and target_n_participants is None: raise ValueError( "If <recruit_mode> == 'n_participants', then <target_n_participants> must be provided." ) if recruit_mode == "n_trials" and target_n_participants is not None: raise ValueError( "If <recruit_mode> == 'n_trials', then <target_n_participants> must be None." ) if hasattr(self, "performance_check_threshold"): raise AttributeError( f"Please rename performance_check_threshold to performance_threshold in trial maker '{id_}'." ) if hasattr(self, "compute_bonus"): raise AttributeError( f"Please rename compute_bonus to compute_performance_reward in trial maker '{id_}'." ) self.trial_class = trial_class self.id = id_ self.expected_trials_per_participant = expected_trials_per_participant self.check_performance_at_end = check_performance_at_end self.check_performance_every_trial = check_performance_every_trial self.fail_trials_on_premature_exit = fail_trials_on_premature_exit self.fail_trials_on_participant_performance_check = ( fail_trials_on_participant_performance_check ) self.propagate_failure = propagate_failure self.recruit_mode = recruit_mode self.target_n_participants = target_n_participants self.n_repeat_trials = n_repeat_trials self.sync_group_type = sync_group_type self.sync_group_max_wait_time = sync_group_max_wait_time elts = self.compile_elts() self.check_time_estimates() super().__init__(id_, elts, assets=assets) participant_progress_threshold = 0.1 performance_threshold = 0.0 time_estimate_per_trial = None introduction = None def compile_elts(self): return join( RegisterTrialMaker(self), self._setup_core, self._setup_extra, self.introduction, self._trial_loop(), self._wrapup_core, ( self._check_performance_logic(type="end") if self.check_performance_at_end else None ), ) def custom(self, *args, assets=None, nodes=None): return Module( self.id, join( RegisterTrialMaker(self), CodeBlock(lambda participant: participant.select_module(self.id)), self._setup_core, *args, CodeBlock(lambda participant: participant.select_module(self.id)), self._wrapup_core, ), assets=assets, nodes=nodes, state_class=self.state_class, ) @property def _setup_core(self): return join( PreDeployRoutine(self.with_namespace(), self.pre_deploy_routine), ParticipantFailRoutine( self.with_namespace(), self.participant_fail_routine ), self.check_timeout_task, self._init_participant(), ) def _init_participant(self): return conditional( "init_participant", # If the participant is in a sync group and the leader has not been initialized, # then we put a GroupBarrier to ensure that the leader can be initialized first. # Otherwise we go ahead and initialize the participant. lambda participant: ( self.sync_group_type is not None and not self._leader_is_initialized(participant) ), logic_if_true=GroupBarrier( "init_participant", group_type=self.sync_group_type, max_wait_time=self.sync_group_max_wait_time, on_release=self._init_participants_in_sync_group, ), logic_if_false=CodeBlock(self.init_participant), time_estimate=0.0 if self.sync_group_type is None else 3.0, ) def _leader_is_initialized(self, participant): group = participant.active_sync_groups[self.sync_group_type] try: leader_state = group.leader.module_states[self.id][-1] except (KeyError, IndexError): return False return leader_state.trial_maker_initialized def _init_participants_in_sync_group(self, group: SyncGroup, experiment): self.init_participant(experiment, group.leader) for participant in group.participants: if participant != group.leader: self.init_participant(experiment, participant) @property def _setup_extra(self): return join( RecruitmentCriterion(self.with_namespace(), self.selected_recruit_criterion) ) @property def _wrapup_core(self): return join( CodeBlock(self.on_complete), ) @property def n_complete_participants(self): return Participant.query.filter_by(complete=True).count() @property def n_working_participants(self): return Participant.query.filter_by(status="working", failed=False).count() @property def n_viable_participants(self): return
[docs] def prepare_trial(self, experiment, participant): """ Prepares and returns a trial to administer the participant. Parameters ---------- experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. Returns _______ A tuple of (:class:`~psynet.trial.main.Trial`, ``str``), where the first is a Trial object, and the second is a status string. """ raise NotImplementedError
[docs] def on_first_launch(self, experiment): """ Defines a routine to run when the experiment is launched for the first time. Parameters ---------- experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. """ pass
[docs] def pre_deploy_routine(self, experiment): """ Defines a routine for setting up the experiment prior to deployment. This is a good place to prepare networks etc. Parameters ---------- experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. """ pass
check_timeout_interval_sec = 30 response_timeout_sec = 60 * 5 async_timeout_sec = 300 end_performance_check_waits = True def participant_fail_routine(self, participant, experiment): if ( self.fail_trials_on_participant_performance_check and "performance_check" in participant.failure_tags ) or ( self.fail_trials_on_premature_exit and "premature_exit" in participant.failure_tags ): self.fail_participant_trials( participant, reason=", ".join(participant.failure_tags) ) @property def check_timeout_task(self): return DatabaseCheck(self.with_namespace("check_timeout"), self.check_timeout) def check_timeout(self): # pylint: disable=no-member self.check_old_trials() WorkerAsyncProcess.check_timeouts() def selected_recruit_criterion(self, experiment): if self.recruit_mode not in self.recruit_criteria: raise ValueError( f"Invalid recruitment mode: {self.recruit_mode}. Valid options: f{self.recruit_criteria}" ) function = self.recruit_criteria[self.recruit_mode] return call_function(function, self=self, experiment=experiment) def null_criterion(self, experiment): logger.info("Recruitment is disabled for this module.") return False def n_participants_criterion(self, experiment): logger.info( "Target number of participants = %i, number of completed participants = %i, number of working participants = %i.", self.target_n_participants, self.n_complete_participants, self.n_working_participants, ) return ( self.n_complete_participants + self.n_working_participants ) < self.target_n_participants def n_trials_criterion(self, experiment): n_trials_still_required = self.n_trials_still_required n_trials_pending = self.n_trials_pending logger.info( "Number of trials still required = %i, number of pending trials = %i.", n_trials_still_required, n_trials_pending, ) return n_trials_still_required > n_trials_pending recruit_criteria = { None: null_criterion, "n_participants": n_participants_criterion, "n_trials": n_trials_criterion, } give_end_feedback_passed = False
[docs] def get_end_feedback_passed_page(self, score): """ Defines the feedback given to participants who pass the final performance check. This feedback is only given if :attr:`~psynet.trial.main.TrialMaker.give_end_feedback_passed` is set to ``True``. Parameters ---------- score : The participant's score on the performance check. Returns ------- :class:`~psynet.timeline.Page` : A feedback page. """ score_to_display = "NA" if score is None else f"{(100 * score):.0f}" return InfoPage( Markup( f"Your performance score was <strong>{score_to_display}&#37;</strong>." ), time_estimate=5, )
def _get_end_feedback_passed_logic(self): if self.give_end_feedback_passed: def f(participant): score = participant.module_state.performance_check["score"] return self.get_end_feedback_passed_page(score) return PageMaker(f, time_estimate=5) else: return [] def visualize(self): rendered_div = super().visualize() div = tags.div() with div: with tags.ul(cls="details"): if ( hasattr(self, "expected_trials_per_participant") and self.expected_trials_per_participant is not None ): tags.li( f"Expected number of trials: {self.expected_trials_per_participant}" ) if ( hasattr(self, "target_n_participants") and self.target_n_participants is not None ): tags.li( f"Target number of participants: {self.target_n_participants}" ) if hasattr(self, "recruit_mode") and self.recruit_mode is not None: tags.li(f"Recruitment mode: {self.recruit_mode}") return rendered_div + div.render() def visualize_tooltip(self): return super().visualize_tooltip() @property def n_trials_pending(self): return sum( [ self.estimate_n_pending_trials(p) for p in self._established_working_participants ] ) @property def n_trials_still_required(self): raise NotImplementedError def estimate_n_pending_trials(self, participant: Participant): assert participant.status == "working" assert not participant.failed module_states = participant.module_states.get(self.id, []) n_completed_trials = sum( [module_state.n_completed_trials for module_state in module_states] ) # This will be an underestimate in the unusual case of a trial maker inside a while loop. # Ideally we'd multiply it by the number of repetitions of the while loop but that's hard to get at from here. # It shouldn't really matter though because all that'll happen is we'll recruit a few too many participants. expected_total_n_trials = self.expected_trials_per_participant return expected_total_n_trials - n_completed_trials @property def _working_participants(self): # Looks across the whole experiment, not just that trial maker. # Should migrate this to the experiment class eventually. return Participant.query.filter_by(status="working", failed=False) @property def _established_working_participants(self): # Returns the number of established working participants across the whole # experiment (not just that trial maker!). Should migrate this to the # Experiment class eventually. return [ p for p in self._working_participants if p.progress > self.participant_progress_threshold ] def check_old_trials(self): time_threshold = datetime.datetime.now() - datetime.timedelta( seconds=self.response_timeout_sec ) trials_to_fail = ( self.trial_class.query.filter_by(complete=False, failed=False) .filter(self.trial_class.creation_time < time_threshold) .with_for_update(of=self.trial_class) .populate_existing() .all() ) logger.info("Found %i old trial(s) to fail.", len(trials_to_fail)) for trial in trials_to_fail: trial.fail(reason="response_timeout")
[docs] def init_participant(self, experiment, participant): # pylint: disable=unused-argument """ Initializes the participant at the beginning of the sequence of trials. If you override this, make sure you call ``super().init_particiant(...)`` somewhere in your new method. Parameters ---------- experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. """ participant.select_module(self.id) participant.module_state.n_created_trials = 0 participant.module_state.n_completed_trials = 0 participant.module_state.in_repeat_phase = False self.init_participant_group(experiment, participant) participant.module_state.trial_maker_initialized = True
def init_participant_group(self, experiment, participant): if participant.module_state.participant_group: return sync_group = ( participant.active_sync_groups[self.sync_group_type] if self.sync_group_type else None ) is_follower = sync_group and participant != sync_group.leader if is_follower: participant_group = sync_group.leader.module_state.participant_group else: if self.choose_participant_group is None: participant_group = "default" else: participant_group = self.choose_participant_group( participant=participant, ) participant.module_state.participant_group = participant_group
[docs] def on_complete(self, experiment, participant): """ An optional function run once the participant completes their sequence of trials. Parameters ---------- experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. """
[docs] def finalize_trial(self, answer, trial, experiment, participant): # pylint: disable=unused-argument,no-self-use """ This function is run after the participant completes the trial. It can be optionally customised, for example to add some more postprocessing. If you override this, make sure you call ``super().finalize_trial(...)`` somewhere in your new method. Parameters ---------- answer The ``answer`` object provided by the trial. trial The :class:`~psynet.trial.main.Trial` object representing the trial. experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. """ participant.module_state.n_completed_trials += 1
[docs] def performance_check(self, experiment, participant, participant_trials): # pylint: disable=unused-argument """ Defines an automated check for evaluating the participant's current performance. The default behaviour is to take the sum of the trial 'scores' (as computed by the Trial.score_answer), but this can be overridden if desired. Parameters ---------- experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. participant An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. participant_trials A list of all trials completed so far by the participant. Returns ------- dict The dictionary should include the following values: - ``score``, expressed as a ``float`` or ``None``. - ``passed`` (Boolean), identifying whether the participant passed the check. """ score = sum( [trial.score for trial in participant_trials if trial.score is not None] ) passed = score > self.performance_threshold return {"score": score, "passed": passed}
def with_namespace(self, x=None): return with_trial_maker_namespace(self.id, x=x) def fail_participant_trials(self, participant, reason=None): trials_to_fail = ( Trial.query.filter_by(participant_id=participant.id, failed=False) .with_for_update(of=Trial) .populate_existing() .join(TrialNetwork) .filter_by(trial_maker_id=self.id) ) for trial in trials_to_fail: trial.fail(reason=reason)
[docs] def check_fail_logic(self): """ Determines the timeline logic for when a participant fails the performance check. By default, the participant is shown an :class:`~psynet.timeline.UnsuccessfulEndPage`. Returns ------- An :class:`~psynet.timeline.Elt` or a list of :class:`~psynet.timeline.Elt` s. """ return join(UnsuccessfulEndPage(failure_tags=["performance_check"]))
def _check_performance_logic(self, type): assert type in ["trial", "end"] def eval_checks(experiment, participant): participant_trials = self.get_participant_trials(participant) results = self.performance_check( experiment=experiment, participant=participant, participant_trials=participant_trials, ) assert isinstance(results["passed"], bool) participant.module_state.performance_check = results if type == "end": reward = call_function(self.compute_performance_reward, **results) participant.module_state.performance_reward = reward participant.inc_performance_reward(reward) return results["passed"] logic = switch( "performance_check", function=eval_checks, branches={ True: [] if type == "trial" else self._get_end_feedback_passed_logic(), False: self.check_fail_logic(), }, fix_time_credit=False, log_chosen_branch=False, ) if type == "end" and self.end_performance_check_waits: def any_trials_awaiting_processing(participant): return ( db.session.query(func.count(Trial.id)) .filter( Trial.participant_id == participant.id, Trial.async_post_trial_pending | Trial.asset_deposit_pending, ) .scalar() ) > 0 return join( wait_while( lambda participant: any_trials_awaiting_processing(participant), expected_wait=5, log_message="Waiting for remaining trials that are awaiting further processing.", ), logic, ) else: return logic def get_all_participant_performance_check_results(self): records = ( db.session.query(self.state_class.performance_check) .filter_by(module_id=self.id) .filter(self.state_class.performance_check.isnot(None)) .all() ) return [record[0] for record in records]
[docs] def get_participant_trials(self, participant): """ Returns all trials (complete and incomplete) owned by the current participant, including repeat trials. Not intended for overriding. Parameters ---------- participant: An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. """ all_participant_trials = self.trial_class.query.filter_by( participant_id=participant.id ).all() return [t for t in all_participant_trials if t.trial_maker_id == self.id]
@log_time_taken def _prepare_trial(self, experiment, participant, leader=None): if not participant.module_state.in_repeat_phase: if leader is None: trial, trial_status = self.prepare_trial( experiment=experiment, participant=participant ) else: assert participant.id != leader.id trial, trial_status = self.prepare_follower_trial( experiment=experiment, participant=participant, leader=leader ) if trial_status == "exit" and self.n_repeat_trials > 0: participant.module_state.in_repeat_phase = True if participant.module_state.in_repeat_phase: trial, trial_status = self._prepare_repeat_trial( experiment=experiment, participant=participant ) if trial_status == "available": assert trial is not None return trial, trial_status def _prepare_repeat_trial(self, experiment, participant): if not participant.module_state.trials_to_repeat: self._init_trials_to_repeat(participant) trials_to_repeat = participant.module_state.trials_to_repeat repeat_trial_index = participant.module_state.repeat_trial_index trial_status = "available" try: trial_to_repeat_id = trials_to_repeat[repeat_trial_index] trial_to_repeat = self.trial_class.query.filter_by( id=trial_to_repeat_id ).one() trial = trial_to_repeat.new_repeat_trial( experiment, repeat_trial_index, len(trials_to_repeat) ) participant.module_state.repeat_trial_index += 1 db.session.add(trial) except IndexError: trial = None trial_status = "exit" return trial, trial_status def _init_trials_to_repeat(self, participant): completed_trial_ids = [t.id for t in self.get_participant_trials(participant)] actual_n_repeat_trials = min(len(completed_trial_ids), self.n_repeat_trials) participant.module_state.trials_to_repeat = random.sample( completed_trial_ids, actual_n_repeat_trials ) participant.module_state.repeat_trial_index = 0
[docs] def cue_trial(self): """ You can use this in combination with init_participant to administer trials outside of a trialmaker. """ return join( CodeBlock(lambda participant: participant.select_module(self.id)), self._wait_for_trial(), conditional( "is_trial_available", condition=lambda participant: participant.trial_status != "exit", logic_if_true=self.trial_class.trial_logic(trial_maker=self), logic_if_false=CodeBlock( lambda: logger.info("No trial found, skipping") ), fix_time_credit=False, log_chosen_branch=False, ), )
def _trial_loop(self): return join( self._wait_for_trial(), while_loop( self.with_namespace("trial_loop"), condition=lambda participant: participant.trial_status != "exit", logic=join( self.trial_class.trial_logic(trial_maker=self), ( self._check_performance_logic(type="trial") if self.check_performance_every_trial else None ), self._wait_for_trial(), ), expected_repetitions=self.expected_trials_per_participant, fix_time_credit=False, ), ) def _wait_for_trial(self): def _try_to_prepare_trial__solo(experiment, participant): trial, trial_status = self._prepare_trial(experiment, participant) participant.current_trial = trial participant.trial_status = trial_status def _try_to_prepare_trial__group(group: SyncGroup): from ..experiment import get_experiment experiment = get_experiment() leader = group.leader leader.current_trial, leader.trial_status = self._prepare_trial( experiment=experiment, participant=group.leader ) for follower in group.active_followers: follower.current_trial, follower.trial_status = self._prepare_trial( experiment=experiment, participant=follower, leader=group.leader, ) def try_to_prepare_trial(): if self.sync_group_type: return join( GroupBarrier( id_="prepare_trial", group_type=self.sync_group_type, on_release=_try_to_prepare_trial__group, fix_time_credit=False, # we're already within a while loop with fixed time credit max_wait_time=self.sync_group_max_wait_time, ) ) else: return CodeBlock(_try_to_prepare_trial__solo) return join( try_to_prepare_trial(), while_loop( "Waiting for trial", lambda participant: participant.trial_status == "wait", logic=join( try_to_prepare_trial(), WaitPage(wait_time=2.0), ), expected_repetitions=0, max_loop_time=self.max_time_waiting_for_trial, fix_time_credit=False, ), ) max_time_waiting_for_trial = 60 def check_time_estimates(self): if ( self.trial_class.time_estimate is None and self.time_estimate_per_trial is None ): raise AttributeError( f"You need to provide either time_estimate as a class attribute of {self.trial_class.__name__} " f"or time_estimate_per_trial as an instance or class attribute of trial maker {self.id}." ) @classmethod def extra_files(cls): return []
[docs] class NetworkTrialMakerState(TrialMakerState): pass
[docs] class NetworkTrialMaker(TrialMaker): """ Trial maker for network-based experiments. These experiments are organised around networks in an analogous way to the network-based experiments in Dallinger. A :class:`~dallinger.models.Network` comprises a collection of :class:`~dallinger.models.Node` objects organised in some kind of structure. Here the role of :class:`~dallinger.models.Node` objects is to generate :class:`~dallinger.models.Trial` objects. Typically the :class:`~dallinger.models.Node` object represents some kind of current experiment state, such as the last datum in a transmission chain. In some cases, a :class:`~dallinger.models.Network` or a :class:`~dallinger.models.Node` will be owned by a given participant; in other cases they will be shared between participants. An important feature of these networks is that their structure can change over time. This typically involves adding new nodes that somehow respond to the trials that have been submitted previously. The present class facilitates this behaviour by providing a built-in :meth:`~psynet.trial.main.TrialMaker.prepare_trial` implementation that comprises the following steps: 1. Find the available networks from which to source the next trial, ordered by preference (:meth:`~psynet.trial.main.NetworkTrialMaker.find_networks`). These may be created on demand, or alternatively pre-created by :meth:`~psynet.trial.main.NetworkTrialMaker.pre_deploy_routine`. 2. Give these networks an opportunity to grow (i.e. update their structure based on the trials that they've received so far) (:meth:`~psynet.trial.main.NetworkTrialMaker.grow_network`). 3. Iterate through these networks, and find the first network that has a node available for the participant to attach to. (:meth:`~psynet.trial.main.NetworkTrialMaker.find_node`). 4. Create a trial from this node (:meth:`psynet.trial.main.Trial.__init__`). The trial is then administered to the participant, and a response elicited. Once the trial is finished, the network is given another opportunity to grow. The implementation also provides support for asynchronous processing, for example to prepare the stimuli available at a given node, or to postprocess trials submitted to a given node. There is some sophisticated logic to make sure that a participant is not assigned to a :class:`~dallinger.models.Node` object if that object is still waiting for an asynchronous process, and likewise a trial won't contribute to a growing network if it is still pending the outcome of an asynchronous process. The user is expected to override the following abstract methods/attributes: * :meth:`~psynet.trial.main.NetworkTrialMaker.pre_deploy_routine`, (optional), which defines a routine that sets up the experiment (for example initialising and seeding networks). * :meth:`~psynet.trial.main.NetworkTrialMaker.find_networks`, which finds the available networks from which to source the next trial, ordered by preference. * :meth:`~psynet.trial.main.NetworkTrialMaker.grow_network`, which give these networks an opportunity to grow (i.e. update their structure based on the trials that they've received so far). * :meth:`~psynet.trial.main.NetworkTrialMaker.find_node`, which takes a given network and finds a node which the participant can be attached to, if one exists. Do not override prepare_trial. Parameters ---------- trial_class The class object for trials administered by this maker. network_class The class object for the networks used by this maker. This should subclass :class`~psynet.trial.main.TrialNetwork`. expected_trials_per_participant Expected number of trials that the participant will take, including repeat trials (used for progress estimation). check_performance_at_end If ``True``, the participant's performance is evaluated at the end of the series of trials. check_performance_every_trial If ``True``, the participant's performance is evaluated after each trial. fail_trials_on_premature_exit If ``True``, a participant's trials are marked as failed if they leave the experiment prematurely. fail_trials_on_participant_performance_check If ``True``, a participant's trials are marked as failed if the participant fails a performance check. propagate_failure If ``True``, the failure of a trial is propagated to other parts of the experiment (the nature of this propagation is left up to the implementation). recruit_mode Selects a recruitment criterion for determining whether to recruit another participant. The built-in criteria are ``"n_participants"`` and ``"n_trials"``, though the latter requires overriding of :attr:`~psynet.trial.main.TrialMaker.n_trials_still_required`. target_n_participants Target number of participants to recruit for the experiment. All participants must successfully finish the experiment to count towards this quota. This target is only relevant if ``recruit_mode="n_participants"``. n_repeat_trials Number of repeat trials to present to the participant. These trials are typically used to estimate the reliability of the participant's responses. Defaults to ``0``. wait_for_networks If ``True``, then the participant will be made to wait if there are still more networks to participate in, but these networks are pending asynchronous processes. sync_group_type Optional SyncGroup type to use for synchronizing participant allocation to nodes. When this is set, then the ordinary node allocation logic will only apply to the 'leader' of each SyncGroup. The other members of this SyncGroup will follow that leader around, so that in every given trial the SyncGroup works on the same node together. sync_group_max_wait_time The maximum time that the participant will be allowed to wait for the SyncGroup to be ready. If this time is exceeded then the participant will be failed and the experiment will terminate early. Defaults to 45.0 seconds. Attributes ---------- check_timeout_interval_sec : float How often to check for trials that have timed out, in seconds (default = 30). Users are invited to override this. response_timeout_sec : float How long until a trial's response times out, in seconds (default = 60) (i.e. how long PsyNet will wait for the participant's response to a trial). This is a lower bound on the actual timeout time, which depends on when the timeout daemon next runs, which in turn depends on :attr:`~psynet.trial.main.TrialMaker.check_timeout_interval_sec`. Users are invited to override this. async_timeout_sec : float How long until an async process times out, in seconds (default = 300). This is a lower bound on the actual timeout time, which depends on when the timeout daemon next runs, which in turn depends on :attr:`~psynet.trial.main.TrialMaker.check_timeout_interval_sec`. Users are invited to override this. network_query : sqlalchemy.orm.Query An SQLAlchemy query for retrieving all networks owned by the current trial maker. Can be used for operations such as the following: ``self.network_query.count()``. n_networks : int Returns the number of networks owned by the trial maker. networks : list Returns the networks owned by the trial maker. performance_threshold : float Score threshold used by the default performance check method, defaults to 0.0. By default, corresponds to the minimum proportion of non-failed trials that the participant must achieve to pass the performance check. end_performance_check_waits : bool If ``True`` (default), then the final performance check waits until all trials no longer have any pending asynchronous processes. performance_threshold : float (default = -1.0) The performance threshold that is used in the :meth:`~psynet.trial.main.NetworkTrialMaker.performance_check` method. """ state_class = NetworkTrialMakerState def __init__( self, id_, trial_class, network_class, expected_trials_per_participant, check_performance_at_end, check_performance_every_trial, fail_trials_on_premature_exit, fail_trials_on_participant_performance_check, # latest performance check is saved in as a participant variable (value, success) propagate_failure, recruit_mode, target_n_participants, n_repeat_trials: int, wait_for_networks: bool, assets=None, sync_group_type: Optional[str] = None, sync_group_max_wait_time: float = 45.0, ): performance_check_is_enabled = ( check_performance_at_end or check_performance_every_trial ) has_custom_performance_check = is_method_overridden( self, NetworkTrialMaker, "performance_check" ) if ( performance_check_is_enabled and self.performance_check_type is None and not has_custom_performance_check ): raise ValueError( f"Trial Maker '{id_}' has performance checks enabled but performance_check_type is not yet set. " "Please set this as a class attribute for your custom TrialMaker class, writing for example:\n\n" "class ConsonanceTrialMaker(StaticTrialMaker):\n" " performance_check_type = 'score'\n\n" "Note: previous versions of PsyNet made this attribute default to " "performance_check_type = 'consistency', but we now force experimenters to be explicit " "with this decision." ) super().__init__( id_=id_, trial_class=trial_class, expected_trials_per_participant=expected_trials_per_participant, check_performance_at_end=check_performance_at_end, check_performance_every_trial=check_performance_every_trial, fail_trials_on_premature_exit=fail_trials_on_premature_exit, fail_trials_on_participant_performance_check=fail_trials_on_participant_performance_check, propagate_failure=propagate_failure, recruit_mode=recruit_mode, target_n_participants=target_n_participants, n_repeat_trials=n_repeat_trials, assets=assets, sync_group_type=sync_group_type, sync_group_max_wait_time=sync_group_max_wait_time, ) self.network_class = network_class self.wait_for_networks = wait_for_networks
[docs] @log_time_taken def prepare_trial(self, experiment, participant: Participant): logger.info("Preparing trial for participant %i.", participant.id) networks = self.find_networks(participant=participant, experiment=experiment) if networks in ["wait", "exit"]: logger.info("Outcome of find_networks: %s", networks) trial = None trial_status = networks return trial, trial_status logger.info( "Outcome: found %i candidate network(s) for participant %i.", len(networks), participant.id, ) assert len(networks) > 0 for network in networks: node = self.find_node( network=network, participant=participant, experiment=experiment ) if node is not None: logger.info( "Selected node %i from network %i to give to participant %i.", node.id, node.network.id, participant.id, ) trial = self._create_trial( node=node, participant=participant, experiment=experiment ) if trial is None: continue trial_status = "available" return trial, trial_status logger.info( "Failed to create any nodes from these networks for participant %i, exiting.", participant.id, ) trial = None trial_status = "exit" return trial, trial_status
def prepare_follower_trial( self, experiment, participant: Participant, leader: Participant ): logger.info( f"Will follow the SyncGroup leader (participant {leader.id}, status = {leader.trial_status})." ) if leader.trial_status in ["wait", "exit"]: assert leader.current_trial is None trial, trial_status = leader.current_trial, leader.trial_status else: node = leader.current_trial.node trial = self._create_trial( node=node, participant=participant, experiment=experiment ) assert trial is not None trial_status = "available" return trial, trial_status ####
[docs] def find_networks(self, participant, experiment): """ Returns a list of all available networks for the participant's next trial, ordered in preference (most preferred to least preferred). Parameters ---------- participant An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. """ raise NotImplementedError
[docs] def grow_network(self, network, experiment): """ Extends the network if necessary by adding one or more nodes. Returns ``True`` if any nodes were added. Parameters ---------- network The network to be potentially extended. experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. """ raise NotImplementedError
[docs] def get_trial_class(self, node, participant, experiment): """ Returns the class of trial to be used for this trial maker. """ return self.trial_class
[docs] def find_node(self, network, participant, experiment): """ Finds the node to which the participant should be attached for the next trial. Parameters ---------- network The network to be potentially extended. participant An instantiation of :class:`psynet.participant.Participant`, corresponding to the current participant. experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. """ raise NotImplementedError
@log_time_taken def _create_trial(self, node, participant, experiment): trial_class = self.get_trial_class(node, participant, experiment) if trial_class is None: return None trial = trial_class( experiment=experiment, node=node, participant=participant, propagate_failure=self.propagate_failure, is_repeat_trial=False, ) trial.finalize_assets() trial._initial_assets = dict(trial.assets) db.session.add(trial) participant.module_state.n_created_trials += 1 return trial def call_grow_network(self, network): # pylint: disable=no-member from psynet.experiment import get_experiment experiment = get_experiment() grown = self.grow_network(network, experiment) assert isinstance(grown, bool) if grown: self._check_run_async_post_grow_network(network) def _check_run_async_post_grow_network(self, network): if ( network.run_async_post_grow_network is not None and not network.run_async_post_grow_network ): return elif not is_method_overridden(network, TrialNetwork, "async_post_grow_network"): return else: self.queue_async_post_grow_network(network) def queue_async_post_grow_network(self, network): WorkerAsyncProcess( network.call_async_post_grow_network, label="post_grow_network", timeout=self.async_timeout_sec, network=network, ) @property def network_query(self): return self.network_class.query.filter_by(trial_maker_id=self.id) @property def n_networks(self): return self.network_query.count() @property def networks(self): return self.network_query.all() performance_threshold = -1.0 min_nodes_for_performance_check = 2 performance_check_type = None consistency_check_type = "spearman_correlation"
[docs] def compute_performance_reward(self, score, passed): """ Computes the performance reward to allocate to the participant at the end of a trial maker on the basis of the results of the final performance check. Note: if `check_performance_at_end = False`, then this function will not be run and the performance reward will not be assigned. """ return 0.0
[docs] def performance_check(self, experiment, participant, participant_trials): if self.performance_check_type == "consistency": return self.performance_check_consistency( experiment, participant, participant_trials ) elif self.performance_check_type == "performance": return self.performance_check_accuracy( experiment, participant, participant_trials ) elif self.performance_check_type == "score": return self.performance_check_score( experiment, participant, participant_trials ) else: raise NotImplementedError
def performance_check_accuracy(self, experiment, participant, participant_trials): n_trials = len(participant_trials) if n_trials == 0: p = None passed = True else: n_failed_trials = len([t for t in participant_trials if t.failed]) p = 1 - n_failed_trials / n_trials passed = p >= self.performance_threshold return {"score": p, "passed": passed} def performance_check_score(self, experiment, participant, participant_trials): score = sum(t.score for t in participant_trials) passed = score >= self.performance_threshold return {"score": score, "passed": passed} def get_answer_for_consistency_check(self, trial): # Must return a number return float(trial.answer) def performance_check_consistency( self, experiment, participant, participant_trials ): repeat_trials = [t for t in participant_trials if t.is_repeat_trial] parent_trials = [t.parent_trial for t in repeat_trials] repeat_trial_answers = [ self.get_answer_for_consistency_check(t) for t in repeat_trials ] parent_trial_answers = [ self.get_answer_for_consistency_check(t) for t in parent_trials ] assert self.min_nodes_for_performance_check >= 2 if len(repeat_trials) < self.min_nodes_for_performance_check: logger.info( "min_nodes_for_performance_check was not reached, so consistency score cannot be calculated." ) score = None passed = True else: consistency = self.get_consistency( repeat_trial_answers, parent_trial_answers ) if isnan(consistency): logger.info( """ get_consistency returned 'nan' in the performance check. This commonly indicates that the participant gave the same response to all repeat trials. The participant will be failed. """ ) score = None passed = False else: score = float(consistency) passed = bool(score >= self.performance_threshold) logger.info( "Performance check for participant %i: consistency = %s, passed = %s", participant.id, "NA" if score is None else f"{score:.3f}", passed, ) return {"score": score, "passed": passed} def get_consistency(self, x, y): if self.consistency_check_type == "pearson_correlation": return corr(x, y) elif self.consistency_check_type == "spearman_correlation": return corr(x, y, method="spearman") elif self.consistency_check_type == "percent_agreement": n_cases = len(x) n_agreements = sum([a == b for a, b in zip(x, y)]) return n_agreements / n_cases else: raise NotImplementedError @staticmethod def group_trials_by_parent(trials): res = {} for trial in trials: parent_id = trial.parent_trial.id if parent_id not in res: res[parent_id] = [] res[parent_id].append(trial) return res
[docs] class TrialNetwork(SQLMixinDallinger, Network): """ A network class to be used by :class:`~psynet.trial.main.NetworkTrialMaker`. The user must override the abstract method :meth:`~psynet.trial.main.TrialNetwork.add_node`. The user may also wish to override the :meth:`~psynet.trial.main.TrialNetwork.async_post_grow_network` method if they wish to implement asynchronous network processing. Parameters ---------- experiment An instantiation of :class:`psynet.experiment.Experiment`, corresponding to the current experiment. Attributes ---------- target_n_trials : int or None Indicates the target number of trials for that network. Left empty by default, but can be set by custom ``__init__`` functions. Stored as the field ``property2`` in the database. participant : Optional[Participant] Returns the network's :class:`~psynet.participant.Participant`, or ``None`` if none can be found. sync_group_type : Optional[str] The ``sync_group_type`` attribute of the trial maker that owns this network. sync_group : Optional[SyncGroup] The SyncGroup that owns this network (normally only relevant for within-style chains). sync_group_id : Optional[int] The ID of the SyncGroup that owns this network (normally only relevant for within-style chains). n_alive_nodes : int Returns the number of non-failed nodes in the network. n_completed_trials : int Returns the number of completed and non-failed trials in the network (irrespective of asynchronous processes, but excluding repeat trials). all_trials : list A list of all trials owned by that network. alive_trials : list A list of all non-failed trials owned by that network. failed_trials : list A list of all failed trials owned by that network. var : :class:`~psynet.field.VarStore` A repository for arbitrary variables; see :class:`~psynet.field.VarStore` for details. run_async_post_grow_network : bool Set this to ``True`` if you want the :meth:`~psynet.trial.main.TrialNetwork.async_post_grow_network` method to run after the network is grown. """ __extra_vars__ = { **SQLMixinDallinger.__extra_vars__.copy(), } def __repr__(self): return ("<Network-{}-{} with {} nodes>").format( self.id, self.type, len(self.alive_nodes) ) trial_maker_id = Column(String) module_id = Column(String) target_n_trials = Column(Integer) participant_group = Column(String) sync_group_type = Column(String) sync_group_id = Column(Integer, ForeignKey("sync_group.id"), index=True) sync_group = relationship("SyncGroup", backref="networks") participant_id = Column(Integer, ForeignKey("participant.id"), index=True) participant = relationship( Participant, foreign_keys=[participant_id], post_update=True ) participants = relationship( Participant, secondary="info", # The info table is where Trials are stored (for historic reasons) primaryjoin="psynet.trial.main.TrialNetwork.id == psynet.trial.main.Trial.network_id", secondaryjoin="psynet.trial.main.Trial.participant_id == psynet.participant.Participant.id", viewonly=True, ) async_post_grow_network_required = Column(Boolean, default=False, index=True) async_post_grow_network_requested = Column(Boolean, default=False, index=True) async_post_grow_network_complete = Column(Boolean, default=False, index=True) async_post_grow_network_failed = Column(Boolean, default=False, index=True) @hybrid_property def async_post_grow_network_pending(self): return self.async_post_grow_network_requested and not ( self.async_post_grow_network_complete or self.async_post_grow_network_failed ) @async_post_grow_network_pending.expression def async_post_grow_network_pending(cls): return and_( cls.async_post_grow_network_requested, not_( or_( cls.async_post_grow_network_complete, cls.async_post_grow_network_failed, ) ), ) id_within_participant = Column(Integer) all_trials = relationship("psynet.trial.main.Trial") @property def alive_nodes(self): return [node for node in self.all_nodes if not self.failed] @property def failed_nodes(self): return [node for node in self.all_nodes if self.failed] @property def alive_trials(self): return [t for t in self.all_trials if not t.failed] @property def failed_trials(self): return [t for t in self.all_trials if t.failed] @property def trials(self): raise RuntimeError( "The .trials attribute has been removed, please use .all_trials, .alive_trials, or .failed_trials instead." ) async_processes = relationship("AsyncProcess") asset_links = relationship( "AssetNetwork", collection_class=attribute_mapped_collection("local_key"), cascade="all, delete-orphan", ) assets = association_proxy( "asset_links", "asset", creator=lambda k, v: AssetNetwork(local_key=k, asset=v) ) errors = relationship("ErrorRecord") def grow(self, experiment): if self.trial_maker: self.trial_maker.call_grow_network(self) @property def trial_maker(self) -> "TrialMaker": from ..experiment import get_trial_maker if self.trial_maker_id: return get_trial_maker(self.trial_maker_id)
[docs] def calculate_full(self): raise RuntimeError("This should not be called directly.")
[docs] def add_node(self, node): """ Adds a node to the network. This method is responsible for setting ``self.full = True`` if the network is full as a result. """ raise NotImplementedError
[docs] def fail(self, reason=None): if not self.failed: logger.info(f"Failing network (id: {self.id}, reason: {reason})") super().fail(reason=reason)
# vars = Column(PythonObject) @property def var(self): return VarStore(self) #### def __init__( self, trial_maker_id: str, experiment, # noqa module_id: Optional[str] = None, sync_group_type: Optional[str] = None, sync_group: Optional[SyncGroup] = None, ): # pylint: disable=unused-argument self.trial_maker_id = trial_maker_id self.assets = {} self.sync_group_type = sync_group_type self.sync_group = sync_group if not module_id: module_id = trial_maker_id self.module_id = module_id self.async_post_grow_network_required = is_method_overridden( self, TrialNetwork, "async_post_grow_network" ) self.async_post_grow_network_requested = False self.async_post_grow_network_complete = False self.async_post_grow_network_failed = False run_async_post_grow_network = None
[docs] def async_post_grow_network(self): """ Optional function to be run after the network is grown. Will only run if :attr:`~psynet.trial.main.TrialNetwork.run_async_post_grow_network` is set to ``True``. """
def call_async_post_grow_network(self): try: self.async_post_grow_network() except Exception: self.async_post_grow_network_failed = True raise self.async_post_grow_network_complete = True
[docs] class TrialNode(SQLMixinDallinger, dallinger.models.Node): __extra_vars__ = { **SQLMixinDallinger.__extra_vars__.copy(), } trial_maker_id = Column(String, index=True) module_id = Column(String, index=True) module_state_id = Column(Integer, ForeignKey("module_state.id"), index=True) is_global = Column(Integer, ForeignKey("experiment.id"), index=True) on_deploy_complete = Column(Boolean, default=False, index=True) async_on_deploy_required = Column(Boolean, default=False, index=True) async_on_deploy_requested = Column(Boolean, default=False, index=True) async_on_deploy_complete = Column(Boolean, default=False, index=True) async_on_deploy_failed = Column(Boolean, default=False, index=True) @hybrid_property def async_on_deploy_pending(self): return self.async_on_deploy_requested and not ( self.async_on_deploy_complete or self.async_on_deploy_failed ) @async_on_deploy_pending.expression def async_on_deploy_pending(cls): return and_( cls.async_on_deploy_requested, not_( or_( cls.async_on_deploy_complete, cls.async_on_deploy_failed, ) ), ) module_state = relationship("ModuleState") async_processes = relationship("AsyncProcess") asset_links = relationship( "AssetNode", collection_class=attribute_mapped_collection("local_key"), cascade="all, delete-orphan", ) assets = association_proxy( "asset_links", "asset", creator=lambda k, v: AssetNode(local_key=k, asset=v) ) errors = relationship("ErrorRecord") all_trials = relationship("psynet.trial.main.Trial", foreign_keys=[Trial.node_id]) @property def trial(self): alive_trials = self.alive_trials if len(alive_trials) == 0: return None elif len(alive_trials) == 1: return alive_trials[0] else: raise RuntimeError(f"Node {self.id} has multiple trials.") @property def alive_trials(self) -> List[Trial]: return [t for t in self.all_trials if not t.failed] @property def pending_trials(self) -> List[Trial]: return [t for t in self.alive_trials if not t.finalized] @property def failed_trials(self) -> List[Trial]: return [t for t in self.all_trials if t.failed] @property def trials(self): raise RuntimeError( "The .trials attribute has been removed, please use .all_trials, .alive_trials, or .failed_trials instead." ) # assets = Column(PythonDict) # @property # def assets(self): # assets_from_trials = Asset.query.filter(Asset.node_id == self.id, Asset.trial_id != None).all() # assets_not_from_trials = Asset.query.filter(Asset.node_id == self.id, Asset.trial_id == None).all() # # assets = {} # for asset in assets_not_from_trials: # assets[asset.label_or_key] = asset # # return { # organize_by_key(assets_from_trials, lambda asset: asset.label_or_key) # **assets, # } def __init__(self, network=None, participant=None, module_id=None): # Note: We purposefully do not call super().__init__(), because this parent constructor # requires the prior existence of the node's parent network, which is impractical for us. self.module_id = module_id if network is not None: self.set_network(network) if participant is not None: self.participant = participant self.participant_id = participant.id self.module_state = participant.module_state self.on_deploy_complete = False self.async_on_deploy_required = is_method_overridden( self, TrialNode, "async_on_deploy" ) self.async_on_deploy_requested = False self.async_on_deploy_complete = False self.async_on_deploy_failed = False def check_on_deploy(self): from psynet.experiment import get_experiment, in_deployment_package if (not in_deployment_package()) or self.on_deploy_complete: return exp = get_experiment() if self not in exp.global_nodes: exp.global_nodes.append(self) if self.async_on_deploy_required and not ( self.async_on_deploy_requested or self.async_on_deploy_complete ): self.queue_async_on_deploy() self.on_deploy_complete = True def queue_async_on_deploy(self): WorkerAsyncProcess( function=self.call_async_on_deploy, node=self, timeout=self.trial_maker.async_timeout_sec, unique=True, ) self.async_on_deploy_requested = True def call_async_on_deploy(self): try: self.async_on_deploy() except Exception: self.async_on_deploy_failed = True db.session.commit() raise self.async_on_deploy_complete = True
[docs] def async_on_deploy(self): """ Called when the node is deployed to the remote server. This includes both deploying nodes from the local machine to the remote machine (e.g. when we have static stimuli that are preregistered in the database) and creating new nodes on the remote machine (e.g. when we have a chain experiment). """ pass
def set_network(self, network): self.network = network self.trial_maker_id = network.trial_maker_id if not self.module_id: self.module_id = network.module_id if not self.participant: self.participant = network.participant @property def trial_maker(self) -> "TrialMaker": from ..experiment import get_trial_maker if self.trial_maker_id: return get_trial_maker(self.trial_maker_id)
[docs] def fail(self, reason=None): if not self.failed: logger.info(f"Failing trial node (id: {self.id}, reason: {reason})") super().fail(reason=reason)
def add_default_network(self): from psynet.experiment import get_experiment network = GenericTrialNetwork( experiment=get_experiment(), module_id=self.module_id ) db.session.add(network) self.set_network(network)
[docs] class GenericTrialNetwork(TrialNetwork): trials_per_node = None def __init__(self, module_id, experiment): super().__init__( trial_maker_id=None, module_id=module_id, experiment=experiment, ) def grow(self, experiment): pass
[docs] class GenericTrialNode(TrialNode): def __init__(self, module_id, experiment): super().__init__( network=self.get_default_network(module_id, experiment), participant=None, ) def get_default_network(self, module_id, experiment): network = GenericTrialNetwork(module_id, experiment) db.session.add(network) return network
TrialNetwork.n_all_trials = column_property( select(func.count(Trial.id)) .where( Trial.network_id == TrialNetwork.id, ) .scalar_subquery() ) TrialNetwork.n_alive_trials = column_property( select(func.count(Trial.id)) .where( Trial.network_id == TrialNetwork.id, ~Trial.failed, ) .scalar_subquery() ) TrialNetwork.n_failed_trials = column_property( select(func.count(Trial.id)) .where( Trial.network_id == TrialNetwork.id, Trial.failed, ) .scalar_subquery() ) TrialNetwork.n_completed_trials = column_property( select(func.count(Trial.id)) .where( Trial.network_id == TrialNetwork.id, ~Trial.failed, Trial.complete, ~Trial.is_repeat_trial, ) .scalar_subquery() ) TrialNetwork.n_all_nodes = column_property( select(func.count(TrialNode.id)) .where( TrialNode.network_id == TrialNetwork.id, ) .scalar_subquery() ) TrialNetwork.n_alive_nodes = column_property( select(func.count(TrialNode.id)) .where( TrialNode.network_id == TrialNetwork.id, ~TrialNode.failed, ) .scalar_subquery() ) TrialNetwork.n_failed_nodes = column_property( select(func.count(TrialNode.id)) .where( TrialNode.network_id == TrialNetwork.id, TrialNode.failed, ) .scalar_subquery() )