Source code for psynet.sync

import random
from math import floor
from typing import Callable, List, Optional, Union

from dallinger import db
from dallinger.models import timenow
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import backref, joinedload, relationship

from psynet.data import SQLBase, SQLMixin, register_table
from psynet.field import PythonClass
from psynet.page import UnsuccessfulEndPage, WaitPage
from psynet.participant import Participant
from psynet.timeline import CodeBlock, EltCollection, conditional
from psynet.utils import call_function_with_context, get_logger

logger = get_logger()


[docs] class Barrier(EltCollection): """ A barrier is a timeline construct that holds participants in a waiting area until certain conditions are satisfied to release them. The decision about which participants to release at any given point is taken by the ``choose_who_to_release`` method, which the user is expected to provide. Parameters ---------- id_ ID parameter for the barrier. Barriers with the same ID share waiting areas; this allows participants at different points in the timeline to share the same waiting areas. waiting_logic Either a single timeline element or a list of timeline elements (created by ``join``) that is to be displayed to the participant while they are waiting at the barrier. If left at the default value of ``None`` then the participant will be shown a default waiting page. waiting_logic_expected_repetitions The number of times that the participant is expected to experience the waiting_logic during a given barrier visit. This is used for time estimation. max_wait_time The maximum amount of time in seconds that the participant will be allowed to wait at the barrier; if this time is exceeded and the participant is still not released, then the participant will be failed and sent to the end of the experiment. fix_time_credit If set to ``True``, then the amount of time 'credit' that the participant receives will be capped according to the estimate derived from ``waiting_logic`` and ``waiting_logic_expected_repetitions``. """ def __init__( self, id_: str, waiting_logic=None, waiting_logic_expected_repetitions=3, max_wait_time=20, fix_time_credit=False, ): if waiting_logic is None: waiting_logic = WaitPage(wait_time=0.5) self.id = id_ self.waiting_logic = waiting_logic self.waiting_logic_expected_repetitions = waiting_logic_expected_repetitions self.max_wait_time = max_wait_time self.fix_time_credit = fix_time_credit
[docs] def choose_who_to_release( self, waiting_participants: List[Participant] ) -> List[Participant]: """ Given a list of waiting participants, decides which of these participants should be released from the barrier. Parameters ---------- waiting_participants A list of waiting participants. Returns ------- A list of participants to be released. """ raise NotImplementedError
def resolve(self): from psynet.timeline import join, while_loop elts = join( CodeBlock(lambda participant: self.receive_participant(participant)), while_loop( label=f"barrier:{self.id}", condition=lambda participant: not self.can_participant_exit( participant ), logic=self.waiting_logic, expected_repetitions=self.waiting_logic_expected_repetitions, max_loop_time=self.max_wait_time, fix_time_credit=self.fix_time_credit, ), conditional( "participant_failed", condition=lambda participant: participant.failed, logic_if_true=UnsuccessfulEndPage(), time_estimate=0, log_chosen_branch=False, ), ) for elt in elts: elt.links["barrier"] = self return elts def receive_participant(self, participant: Participant): link = ParticipantLinkBarrier( participant=participant, barrier_id=self.id, barrier_class=self.__class__, arrival_time=timenow(), ) participant.active_barriers[self.id] = link def get_waiting_participants(self, for_update: bool = False): return self.get_waiting_participants_from_barrier_id( self.id, for_update=for_update )
[docs] @classmethod def get_waiting_participants_from_barrier_id( cls, barrier_id: str, for_update: bool = False ) -> List[Participant]: """ Gets the participants currently waiting at a barrier. Parameters ---------- barrier_id The ID of the barrier to check. for_update Set to ``True`` if you plan to update the resulting participant objects and their barrier links. The objects will be locked for update in the database and only released at the end of the transaction. Returns ------- A list of waiting participants. Note that this only includes currently active participants (not participants who failed and left the experiment). """ query = ( ParticipantLinkBarrier.query.join(Participant) .filter( ParticipantLinkBarrier.barrier_id == barrier_id, ~ParticipantLinkBarrier.released, ~Participant.failed, Participant.status == "working", ) .options(joinedload(ParticipantLinkBarrier.participant, innerjoin=True)) ) if for_update: query = query.with_for_update(of=ParticipantLinkBarrier).populate_existing() links = query.all() participants = [link.participant for link in links] return participants
def release(self, participant: Participant): link = participant.active_barriers.get(self.id, None) if link is None: raise RuntimeError( "Could not find an appropriate barrier link to release the participant from " f"(participant_id = {participant.id}, barrier_id = '{self.id}')." ) link.release() def can_participant_exit(self, participant: "Participant"): barrier_is_active = self.id in participant.active_barriers return not barrier_is_active def process_potential_releases(self): waiting_participants = self.get_waiting_participants(for_update=True) waiting_participants.sort(key=lambda p: p.id) logger.info( "Barrier '%s' currently has %i participant(s) waiting (ids = %s)", self.id, len(waiting_participants), ", ".join([str(p.id) for p in waiting_participants]), ) participants_to_release = self.choose_who_to_release(waiting_participants) participants_to_release.sort(key=lambda p: p.id) if len(participants_to_release) > 0: logger.info( "Barrier '%s' is releasing %i participant(s) (ids = %s)", self.id, len(participants_to_release), ", ".join([str(p.id) for p in participants_to_release]), ) for participant in participants_to_release: self.release(participant)
[docs] class GroupBarrier(Barrier): """ A GroupBarrier is a Barrier that waits until all participants in a given :class:`~psynet.sync.SyncGroup` have reached the Barrier. It also checks the current group size against the group's minimum size parameter; the group won't be allowed to proceed if it's below this size. If ``join_existing_groups=True`` for that group, it'll wait just in case new participants join the group. If ``join_existing_groups=False``, then there's no hope for new participants, so the group will be released and failed. Parameters ---------- id_ ID parameter for the Barrier. Barriers with the same ID share waiting areas; this allows participants at different points in the timeline to share the same waiting areas. group_type Identifies the kind of groups that the Barrier is operating over (see :class:`~psynet.sync.Grouper`). waiting_logic Either a single timeline element or a list of timeline elements (created by ``join``) that is to be displayed to the participant while they are waiting at the barrier. If left at the default value of ``None`` then the participant will be shown a default waiting page. waiting_logic_expected_repetitions The number of times that the participant is expected to experience the waiting_logic during a given barrier visit. This is used for time estimation. max_wait_time The maximum amount of time in seconds that the participant will be allowed to wait at the barrier; if this time is exceeded and the participant is still not released, then the participant will be failed and sent to the end of the experiment. fix_time_credit If set to ``True``, then the amount of time 'credit' that the participant receives will be fixed according to the estimate derived from ``waiting_logic`` and ``waiting_logic_expected_repetitions``. """ def __init__( self, id_: str, group_type: str, waiting_logic=None, waiting_logic_expected_repetitions=3, max_wait_time=20, on_release: Optional[Callable] = None, fix_time_credit=False, ): super().__init__( id_=id_, waiting_logic=waiting_logic, waiting_logic_expected_repetitions=waiting_logic_expected_repetitions, max_wait_time=max_wait_time, fix_time_credit=fix_time_credit, ) self.group_type = group_type self.on_release = on_release
[docs] def choose_who_to_release(self, waiting_participants: List[Participant]): waiting_participant_ids = [p.id for p in waiting_participants] participants_to_release = [] groups = { participant.active_sync_groups[ self.group_type ].id: participant.active_sync_groups[self.group_type] for participant in waiting_participants } for group in groups.values(): if group.n_active_participants < group.min_group_size: # If join_existing_groups is False, then the group will never be able # to get to the minimum size, so we should fail all participants in the group # and release them. if not group.join_existing_groups: for participant in group.active_participants: participant.fail("sync group below minimum size") participants_to_release.append(participant) continue all_participants_present = all( [ participant.id in waiting_participant_ids for participant in group.active_participants ] ) if all_participants_present: group.check_leader() for participant in group.active_participants: participants_to_release.append(participant) if self.on_release: call_function_with_context( self.on_release, group=group, participants=group.active_participants, ) return participants_to_release
[docs] class Grouper(Barrier): """ A Grouper is a kind of Barrier that assigns incoming participants into groups. This is a generic class that requires several methods to be overrun, in particular ``ready_to_group`` and ``group``. Parameters ---------- group_type A textual label for the groups that are created. This label is used to link the Grouper with subsequent GroupBarriers. id_ Optional ID parameter for this grouper. If left blank the default value is ``group_type + "_" + "grouper"``. Groupers with the same ID are treated as equivalent and share the same participant waiting areas. waiting_logic Either a single timeline element or a list of timeline elements (created by ``join``) that is to be displayed to the participant while they are waiting at the barrier. If left at the default value of ``None`` then the participant will be shown a default waiting page. waiting_logic_expected_repetitions The number of times that the participant is expected to experience the waiting_logic during a given barrier visit. This is used for time estimation. max_wait_time The maximum amount of time in seconds that the participant will be allowed to wait at the barrier; if this time is exceeded and the participant is still not released, then the participant will be failed and sent to the end of the experiment. """ def __init__( self, group_type: str, id_: Optional[str] = None, waiting_logic=None, waiting_logic_expected_repetitions=3, max_wait_time=20, ): if not id_: id_ = group_type + "_" + "grouper" super().__init__( id_=id_, waiting_logic=waiting_logic, waiting_logic_expected_repetitions=waiting_logic_expected_repetitions, max_wait_time=max_wait_time, ) self.group_type = group_type
[docs] def ready_to_group(self, participants: List[Participant]) -> bool: """ Determines whether the Grouper is ready to group a given collection of participants. Note that not all participants need to be grouped at once; it's permissible to leave some participants still waiting. Parameters ---------- participants List of participants who are candidates for grouping. Returns ------- ``True`` if the grouper is ready to group (some of) the participants, ``False`` otherwise. """ raise NotImplementedError
[docs] def group(self, participants: List[Participant]) -> List["SyncGroup"]: """ This method is run if ``ready_to_group`` returns ``True``. It is responsible for grouping participants. Parameters ---------- participants Participants who are candidates for grouping. Returns ------- A list of SyncGroups who should be populated by the grouped participants. """ raise NotImplementedError
def receive_participant(self, participant: Participant): if self.group_type in participant.active_sync_groups: raise RuntimeError( f"Participant is already in a group with this group_type ('{self.group_type}'). " "You should close this group, typically by including a GroupCloser in the timeline, " "before reassigning it." ) super().receive_participant(participant)
[docs] def choose_who_to_release(self, waiting_participants: List[Participant]): participants_to_release = [] if self.ready_to_group(waiting_participants): groups = self.group(waiting_participants) if not isinstance(groups, list) and all( [isinstance(group, SyncGroup) for group in groups] ): raise ValueError("group() must return a list of SyncGroups.") for _group in groups: db.session.add(_group) for _participant in _group.participants: participants_to_release.append(_participant) return participants_to_release
[docs] def select_leader(self, participants: List[Participant]) -> Participant: """ By default the leader is randomly chosen from the list of available participants. Parameters ---------- participants Participants to choose from. Returns ------- A participant to be assigned 'leader' of the SyncGroup. """ return random.choice(participants)
[docs] class SimpleGrouper(Grouper): """ A Simple Grouper waits until ``batch_size`` many participants are waiting, and then randomly partitions this group of participants into groups of size ``initial_group_size``. Parameters ---------- group_type A textual label for the groups that are created. This label is used to link the Grouper with subsequent GroupBarriers. initial_group_size Size of the groups to create. max_group_size If ``join_existing_groups=True``, then participants will be allowed to join groups until they reach this maximum size. If set to ``"initial_group_size"`` (default), then the maximum size will be set to the initial group size. min_group_size If the current group size is below this value (taking into account failed participants and participants who have left the experiment), then the group will be considered under-quota. The group will not be allowed to pass through barriers until it is at or above this size. If set to ``"initial_group_size"`` (default), then the minimum size will be set to the initial group size. batch_size Number of participants that should be waiting until the groups are created. If set to ``"initial_group_size"`` (default), then the batch size will be set to the initial group size. join_existing_groups If set to ``True``, then before a new group is created, the Grouper will check if there are any existing groups that are under-quota (e.g. because some participants left the experiment early). If so, the arriving participant will be assigned to one of these groups instead. kwargs Further arguments to pass to Grouper. """ def __init__( self, group_type: str, *, initial_group_size: Optional[int] = None, max_group_size: Optional[Union[int, str]] = "initial_group_size", min_group_size: Union[int, str] = "initial_group_size", batch_size: Union[int, str] = "initial_group_size", join_existing_groups: bool = False, **kwargs, ): if "group_size" in kwargs: raise ValueError( "The group_size argument has been renamed to initial_group_size, " "please update your code accordingly.", ) if initial_group_size is None: raise ValueError("initial_group_size must be provided.") super().__init__(group_type=group_type, **kwargs) if max_group_size == "initial_group_size": max_group_size = initial_group_size else: if not join_existing_groups: raise ValueError( "If max_group_size != 'initial_group_size', you probably want to set join_existing_groups=True." ) if min_group_size == "initial_group_size": min_group_size = initial_group_size if batch_size == "initial_group_size": batch_size = initial_group_size self.initial_group_size = initial_group_size self.max_group_size = max_group_size self.min_group_size = min_group_size self.batch_size = batch_size self.join_existing_groups = join_existing_groups def resolve(self): from .timeline import conditional, join return join( CodeBlock(self._join_existing_groups), conditional( "joined_an_existing_group", condition=lambda participant: self.group_type in participant.active_sync_groups, logic_if_true=[], logic_if_false=super().resolve(), ), ) def _join_existing_groups(self, participant: Participant): if not self.join_existing_groups: return query = SimpleSyncGroup.query.filter( SimpleSyncGroup.group_type == self.group_type ) if self.max_group_size is not None: query = query.filter( SimpleSyncGroup.n_active_participants < self.max_group_size ) # Preferentially join the smallest groups, and among those, the oldest query = query.order_by( SimpleSyncGroup.n_active_participants, SimpleSyncGroup.id ) group = query.one_or_none() if group: group.participants.append(participant) assert participant.active_sync_groups[self.group_type] == group group.check_numbers()
[docs] def ready_to_group(self, participants: List[Participant]) -> bool: return len(participants) >= self.batch_size
[docs] def group(self, participants: List[Participant]) -> List["SyncGroup"]: n_groups = floor(len(participants) / self.initial_group_size) n_participants_to_group = n_groups * self.initial_group_size participants_to_group = participants[:n_participants_to_group] grouped_participants = self.randomly_partition_list( participants_to_group, group_size=self.initial_group_size ) groups = [] for _participants in grouped_participants: _group = SimpleSyncGroup( group_type=self.group_type, initial_group_size=self.initial_group_size, max_group_size=self.max_group_size, min_group_size=self.min_group_size, n_active_participants=len(_participants), join_existing_groups=self.join_existing_groups, ) groups.append(_group) for _participant in _participants: _group.participants.append(_participant) _group.leader = self.select_leader(_participants) return groups
@staticmethod def randomly_partition_list(lst: list, group_size: int): n_groups = len(lst) / group_size if not n_groups == floor(n_groups): raise ValueError( f"List size ({len(lst)}) is not an integer multiple of group_size ({group_size})" ) n_groups = floor(n_groups) lst = lst.copy() random.shuffle(lst) return [lst[i::n_groups] for i in range(n_groups)]
[docs] @register_table class SyncGroup(SQLBase, SQLMixin): """ A SyncGroup represents a group of participants that are synchronized at various points in the experiment. Such groups are created by Groupers and synchronized by GroupBarriers. Attributes ---------- leader : Participant The leader of the SyncGroup. This can be reassigned by logic such as ``group.leader = participant``. participants : List[Participant] A list of participants in that group. Additional participants can be added by logic such as ``group.participants.append(participant)``. """ __tablename__ = "sync_group" group_type = Column(String) active = Column(Boolean, default=True) end_time = Column(DateTime) leader_id = Column(Integer, ForeignKey("participant.id")) participant_links = relationship( "ParticipantLinkSyncGroup", cascade="all, delete-orphan", ) participants = association_proxy( "participant_links", "participant", creator=lambda participant: ParticipantLinkSyncGroup(participant=participant), ) n_active_participants = Column(Integer) @property def active_participants(self) -> List[Participant]: return [p for p in self.participants if not p.failed and p.status == "working"] leader = relationship( "psynet.participant.Participant", cascade="all", ) def check_leader(self): if self.leader not in self.active_participants: self.leader = sorted(self.active_participants, key=lambda p: p.id)[0] @property def active_followers(self): return [p for p in self.active_participants if p != self.leader] @classmethod def get_active_group( cls, participant: Participant, group_type: str, ) -> "SyncGroup": return participant.active_sync_groups[group_type] def close(self): self.active = False self.end_time = timenow() def check_numbers(self): self.n_active_participants = len(self.active_participants)
[docs] class SimpleSyncGroup(SyncGroup): """ A SyncGroup that is created by a SimpleGrouper. """ initial_group_size = Column(Integer) max_group_size = Column(Integer) min_group_size = Column(Integer) join_existing_groups = Column(Boolean)
[docs] @register_table class ParticipantLinkSyncGroup(SQLBase, SQLMixin): __tablename__ = "participant_link_sync_group" arrival_time = Column(DateTime) participant_id = Column(Integer, ForeignKey("participant.id"), index=True) participant = relationship( "psynet.participant.Participant", back_populates="sync_group_links" ) sync_group_id = Column(Integer, ForeignKey("sync_group.id"), index=True) sync_group = relationship("SyncGroup", back_populates="participant_links")
[docs] @register_table class ParticipantLinkBarrier(SQLBase, SQLMixin): __tablename__ = "participant_link_barrier" barrier_id = Column(String, index=True) barrier_class = Column(PythonClass) participant_id = Column(Integer, ForeignKey("participant.id"), index=True) participant = relationship( "psynet.participant.Participant", backref=backref( "barrier_links", cascade="all, delete-orphan" ), # for some reason backpopulates didn't work here ) arrival_time = Column(DateTime) departure_time = Column(DateTime) released = Column(Boolean, default=False) def get_barrier(self): from .experiment import get_experiment exp = get_experiment() timeline = exp.timeline elt = timeline.get_current_elt(exp, self.participant) try: barrier = elt.links["barrier"] assert barrier.id == self.barrier_id return barrier except (KeyError, AssertionError): raise RuntimeError( "The barrier can only be retrieved if the participant is currently at the barrier." ) def release(self): self.departure_time = timenow() self.released = True def get_waiting_participants(self, for_update: bool = False): return self.barrier_class.get_waiting_participants_from_barrier_id( self.barrier_id, for_update=for_update )
Participant.sync_group_links = relationship( "ParticipantLinkSyncGroup", cascade="all, delete-orphan", ) Participant.sync_groups = association_proxy( "sync_group_links", "sync_group", ) # No association proxy for barrier links because barriers aren't represented as database objects (yet)
[docs] class GroupCloser(GroupBarrier): """ A timeline construct for closing a previously created group. This is required before creating a new group with the same ``group_type``. """ def __init__(self, group_type: str, **kwargs): if "id_" not in kwargs: kwargs["id_"] = f"closer_{group_type}" super().__init__( group_type=group_type, on_release=lambda group: group.close(), **kwargs )