Synchronization

In some experiments we need to be able to synchronize certain groups of participants to do the same things at the same time. For example, we might want to implement a behavioral economics game where participants have to make certain kinds of decisions and receive payouts depending on what the other participants in their group did. PsyNet provides advanced synchronization utilities for supporting such experiments.

There are two main timeline constructs that are used to implement such synchronization. The Grouper is responsible for creating groups of participants, whereas the GroupBarrier is responsible for synchronizing participants within groups.

Grouper

One straightforward type of Grouper is the SimpleGrouper. It may be included in the timeline as follows:

SimpleGrouper(
    group_type="rock_paper_scissors",
    initial_group_size=2,
),

This SimpleGrouper organizes participants into groups of 2. By default it will create a new group of 2 each time 2 participants are ready and waiting, but an optional batch_size parameter can be used to delay group formation until more participants are waiting.

The groups created by Groupers are represented by SyncGroup objects. If a participant is a member of just one active SyncGroup, then it can be accessed with code as follows:

participant.sync_group

If the participant is a member of multiple active SyncGroups, then they can be accessed via participant.active_sync_groups, which takes the form of a dictionary keyed by group_type. The full list of participants within the SyncGroup can then be accessed (and modified) via sync_group.participants, which is a list.

It is possible to put multiple Grouper constructs in a timeline. If they have different group_type parameters then they will be used to create different grouping namespaces. Groupers with the same group_type can be used to regroup the participants into different groupings as they progress through the experiment. However, it is not possible to be in multiple groups with the same group_type simultaneously; one must place a GroupCloser in the timeline to close the group before assigning the participants to a new one.

Group Barrier

A Group Barrier may be included in the timeline as follows:

GroupBarrier(
    id_="finished_trial",
    group_type="rock_paper_scissors",
    on_release=self.score_trial,
)

When participants reach this barrier, they will be told to wait until all participants in their group are also waiting at that barrier. An optional on_release function can be provided to the barrier, which will be executed on the group of participants at the point when they leave the barrier.

Synchronization in trial makers

It is perfectly possible to use these synchronization constructs within trial makers. In this case, it is usually wise to provide a sync_group_type argument to the trial maker, for example:

RockPaperScissorsTrialMaker(
    id_="rock_paper_scissors",
    trial_class=RockPaperScissorsTrial,
    nodes=[
        StaticNode(definition={"color": color})
        for color in ["red", "green", "blue"]
    ],
    expected_trials_per_participant=3,
    max_trials_per_participant=3,
    sync_group_type="rock_paper_scissors",
)

This tells the trial maker to synchronize the logic of assigning participants to nodes according to their SyncGroup. By default, each group has a randomly assigned leader; node allocation is determined by standard PsyNet logic for that leader, as if that person were taking that trial maker by themselves; the other participants in that group then ‘follow’ that leader, being assigned to the same nodes as the leader on each trial.

Using a sync_group_type parameter means that the beginning of each trial is synchronized across all participants within a given group. It is possible to synchronize other parts of the trial by including further GroupBarriers within the trial, for example:

def show_trial(self, experiment, participant):
    return join(
        GroupBarrier(
            id_="wait_for_trial",
            group_type="rock_paper_scissors",
        ),
        self.choose_action(color=self.definition["color"]),
        GroupBarrier(
            id_="finished_trial",
            group_type="rock_paper_scissors",
            on_release=self.score_trial,
        ),
    )

Demo

Several demos are available that illustrate these features, see below.

Rock, paper, scissors

from typing import List

from dominate import tags

import psynet.experiment
from psynet.bot import Bot, advance_past_wait_pages
from psynet.consent import NoConsent
from psynet.modular_page import ModularPage, PushButtonControl
from psynet.page import InfoPage, SuccessfulEndPage
from psynet.participant import Participant
from psynet.sync import GroupBarrier, SimpleGrouper
from psynet.timeline import Timeline, join
from psynet.trial.static import StaticNode, StaticTrial, StaticTrialMaker
from psynet.utils import as_plain_text, get_logger

logger = get_logger()


class RockPaperScissorsTrialMaker(StaticTrialMaker):
    pass


class RockPaperScissorsTrial(StaticTrial):
    time_estimate = 5
    accumulate_answers = True

    def show_trial(self, experiment, participant):
        return join(
            GroupBarrier(
                id_="wait_for_trial",
                group_type="rock_paper_scissors",
            ),
            self.choose_action(color=self.definition["color"]),
            GroupBarrier(
                id_="finished_trial",
                group_type="rock_paper_scissors",
                on_release=self.score_trial,
            ),
        )

    def choose_action(self, color):
        prompt = tags.p("Choose your action:", style=f"color: {color};")
        return ModularPage(
            "choose_action",
            prompt,
            PushButtonControl(
                choices=["rock", "paper", "scissors"],
            ),
            time_estimate=5,
            save_answer="last_action",
            # extras=ChatRoom(),
        )

    def show_feedback(self, experiment, participant):
        score = participant.var.last_trial["score"]
        if score == -1:
            outcome = "You lost."
        elif score == 0:
            outcome = "You drew."
        else:
            assert score == 1
            outcome = "You won!"

        prompt = (
            f"You chose {participant.var.last_trial['action_self']}, "
            + f"your partner chose {participant.var.last_trial['action_other']}. "
            + outcome
        )

        return InfoPage(
            prompt,
            time_estimate=5,
        )

    def score_trial(self, participants: List[Participant]):
        assert len(participants) == 2
        answers = [participant.var.last_action for participant in participants]
        score_0 = self.scoring_matrix[answers[0]][answers[1]]
        score_1 = -score_0
        participants[0].var.last_trial = {
            "action_self": answers[0],
            "action_other": answers[1],
            "score": score_0,
        }
        participants[1].var.last_trial = {
            "action_self": answers[1],
            "action_other": answers[0],
            "score": score_1,
        }

    scoring_matrix = {
        "rock": {
            "rock": 0,
            "paper": -1,
            "scissors": 1,
        },
        "paper": {
            "rock": 1,
            "paper": 0,
            "scissors": -1,
        },
        "scissors": {"rock": -1, "paper": 1, "scissors": 0},
    }


class Exp(psynet.experiment.Experiment):
    label = "Rock paper scissors demo"

    initial_recruitment_size = 1

    timeline = Timeline(
        NoConsent(),
        SimpleGrouper(
            group_type="rock_paper_scissors",
            initial_group_size=2,
        ),
        RockPaperScissorsTrialMaker(
            id_="rock_paper_scissors",
            trial_class=RockPaperScissorsTrial,
            nodes=[
                StaticNode(definition={"color": color})
                for color in ["red", "green", "blue"]
            ],
            expected_trials_per_participant=3,
            max_trials_per_participant=3,
            sync_group_type="rock_paper_scissors",
        ),
        SuccessfulEndPage(),
    )

    test_n_bots = 2
    test_mode = "serial"

    def test_serial_run_bots(self, bots: List[Bot]):
        from psynet.page import WaitPage

        advance_past_wait_pages(bots)

        page = bots[0].get_current_page()
        assert page.label == "choose_action"
        bots[0].take_page(page, response="rock")
        page = bots[0].get_current_page()
        assert isinstance(page, WaitPage)

        page = bots[1].get_current_page()
        assert page.label == "choose_action"
        bots[1].take_page(page, response="paper")

        advance_past_wait_pages(bots)

        pages = [bot.get_current_page() for bot in bots]
        assert pages[0].content == "You chose rock, your partner chose paper. You lost."
        assert pages[1].content == "You chose paper, your partner chose rock. You won!"

        bots[0].take_page()
        bots[1].take_page()
        advance_past_wait_pages(bots)

        bots[0].take_page(page, response="scissors")
        bots[1].take_page(page, response="paper")
        advance_past_wait_pages(bots)

        pages = [bot.get_current_page() for bot in bots]

        logger.info("Bot 1: %s", pages[0].content)
        logger.info("Bot 2: %s", pages[1].content)

        assert (
            pages[0].content == "You chose scissors, your partner chose paper. You won!"
        )
        assert (
            pages[1].content
            == "You chose paper, your partner chose scissors. You lost."
        )

        bots[0].take_page()
        bots[1].take_page()
        advance_past_wait_pages(bots)

        bots[0].take_page(page, response="scissors")
        bots[1].take_page(page, response="scissors")
        advance_past_wait_pages(bots)

        pages = [bot.get_current_page() for bot in bots]
        assert (
            pages[0].content
            == "You chose scissors, your partner chose scissors. You drew."
        ), (
            "A rare error sometimes occurs here. If you see it, please report it to Peter Harrison (pmcharrison) for "
            "further debugging."
        )
        assert (
            pages[1].content
            == "You chose scissors, your partner chose scissors. You drew."
        ), (
            "A rare error sometimes occurs here. If you see it, please report it to Peter Harrison (pmcharrison) for "
            "further debugging."
        )

        bots[0].take_page()
        bots[1].take_page()
        advance_past_wait_pages(bots)

        pages = [bot.get_current_page() for bot in bots]
        for page in pages:
            text = as_plain_text(page.prompt.text)
            assert "That's the end of the experiment!" in text

Synchronized GSP

import random
from typing import List, Union

from dominate import tags
from markupsafe import Markup

import psynet.experiment
from psynet.bot import Bot, advance_past_wait_pages
from psynet.consent import NoConsent
from psynet.modular_page import ModularPage, Prompt, SliderControl
from psynet.page import InfoPage, SuccessfulEndPage
from psynet.participant import Participant
from psynet.sync import SimpleGrouper
from psynet.timeline import Timeline, join
from psynet.trial.gibbs import GibbsNode, GibbsTrial, GibbsTrialMaker
from psynet.utils import as_plain_text, get_logger

logger = get_logger()

TARGETS = ["tree", "rock", "carrot", "banana"]
COLORS = ["red", "green", "blue"]


class ColorSliderPage(ModularPage):
    def __init__(
        self,
        label: str,
        prompt: Union[str, Markup],
        selected_idx: int,
        starting_values: List[int],
        reverse_scale: bool,
        directional: bool,
        time_estimate=None,
        **kwargs,
    ):
        assert 0 <= selected_idx < len(COLORS)
        self.prompt = prompt
        self.selected_idx = selected_idx
        self.starting_values = starting_values

        not_selected_idxs = list(range(len(COLORS)))
        not_selected_idxs.remove(selected_idx)
        not_selected_colors = [COLORS[i] for i in not_selected_idxs]
        not_selected_values = [starting_values[i] for i in not_selected_idxs]
        hidden_inputs = dict(zip(not_selected_colors, not_selected_values))
        kwargs["template_arg"] = {
            "hidden_inputs": hidden_inputs,
        }
        super().__init__(
            label,
            Prompt(prompt),
            control=SliderControl(
                start_value=starting_values[selected_idx],
                min_value=0,
                max_value=255,
                slider_id=COLORS[selected_idx],
                reverse_scale=reverse_scale,
                directional=directional,
                template_filename="color-slider.html",
                template_args={
                    "hidden_inputs": hidden_inputs,
                },
                continuous_updates=False,
                bot_response=lambda: random.randint(0, 255),
            ),
            time_estimate=time_estimate,
        )


class CustomTrial(GibbsTrial):
    time_estimate = 5

    def show_trial(self, experiment, participant):
        return join(
            self.see_last_trial_responses(participant) if self.degree > 0 else None,
            self.choose_response(),
        )

    def choose_response(self):
        target = self.context["target"]

        prompt = tags.span()
        with prompt:
            tags.span(
                "Adjust the slider to match the following word as well as possible: "
            )
            tags.strong(target)

        return ColorSliderPage(
            "color_trial",
            prompt,
            starting_values=self.initial_vector,
            selected_idx=self.active_index,
            reverse_scale=self.reverse_scale,
            directional=False,
        )

    def see_last_trial_responses(self, participant: Participant):
        last_node = self.node.parent
        last_trials = last_node.alive_trials
        last_trials.sort(key=lambda t: t.participant_id)
        try:
            participant_answer = [
                t.answer for t in last_trials if t.participant == participant
            ][0]
        except IndexError:
            participant_answer = None
        other_participant_answers = [
            t.answer for t in last_trials if t.participant != participant
        ]

        html = tags.span()
        with html:
            if participant_answer is not None:
                tags.p(f"You chose: {participant_answer}")
            tags.p("Other participants chose:")
            with tags.ul():
                for response in other_participant_answers:
                    tags.li(response)
            tags.p(
                f"The summarized response was {last_node.var.summarize_trials_output}."
            )

        return InfoPage(html)


class CustomNode(GibbsNode):
    vector_length = 3

    def random_sample(self, i):
        return random.randint(0, 255)


trial_maker = GibbsTrialMaker(
    id_="gibbs_demo",
    start_nodes=lambda: [CustomNode(context={"target": random.sample(TARGETS, 1)[0]})],
    sync_group_type="gibbs",
    trial_class=CustomTrial,
    node_class=CustomNode,
    chain_type="within",
    expected_trials_per_participant=4,
    max_trials_per_participant=4,
    max_nodes_per_chain=4,
    chains_per_participant=1,
    recruit_mode="n_participants",
    target_n_participants=3,
    # propagate_failure means that when a trial fails the chain is aggressively pruned
    # such that no nodes are 'contaminated' by the failed trial. This is often desirable,
    # but if we want to make maximum use of participant trials, we can set propagate_failure=False.
    propagate_failure=False,
)


class Exp(psynet.experiment.Experiment):
    label = "Gibbs within sync demo"
    initial_recruitment_size = 1

    timeline = Timeline(
        NoConsent(),
        InfoPage("Welcome to the experiment!", time_estimate=5),
        SimpleGrouper(
            group_type="gibbs",
            initial_group_size=3,
            join_existing_groups=True,
        ),
        trial_maker,
        SuccessfulEndPage(),
    )

    test_n_bots = 4

    def test_serial_run_bots(self, bots: List[Bot]):
        from psynet.page import WaitPage

        original_bots = bots[:3]

        for bot in original_bots:
            assert bot.get_current_page().content == "Welcome to the experiment!"
            bot.take_page()
            assert isinstance(bot.get_current_page(), WaitPage)

        # Send the first three bots into the trial maker
        advance_past_wait_pages(original_bots)

        # Trial 1 (degree = 0)
        for bot, response in zip(original_bots, [100, 110, 120]):
            page = bot.get_current_page()
            assert page.label == "color_trial"
            bot.take_page(page, response=response)
            assert isinstance(bot.get_current_page(), WaitPage)

        # Going now to the next trial;
        # Trial 2 (degree = 1)
        advance_past_wait_pages(original_bots)

        # Check that the trials have been aggregated appropriately
        page = bots[0].get_current_page()
        info_message = "You chose: 100 Other participants chose: * 110 * 120 The summarized response was 110."
        assert as_plain_text(page.prompt.text) == info_message

        group = bots[0].sync_group

        # Now we make one of the bots fail during a trial
        bots[0].fail(reason="simulated_failure")
        assert group.n_active_participants < group.min_group_size

        # Bring in a new bot to replace the failed one
        new_bot = bots[3]
        assert new_bot.get_current_page().content == "Welcome to the experiment!"

        # If we send the new bot into the trial maker, it should be able to join the group
        new_bot.take_page()
        assert new_bot in group.participants

        # Now the participant should be waiting at the prepare_trial barrier.
        # The other two bots need to finish the previous trial before this new trial can begin
        assert isinstance(new_bot.get_current_page(), WaitPage)
        assert "prepare_trial" in new_bot.active_barriers

        # Let's have them finish the trial, then
        for bot in [bots[1], bots[2]]:
            page = bot.get_current_page()
            assert isinstance(page, InfoPage)
            bot.take_page(page)

            page = bot.get_current_page()
            assert page.label == "color_trial"
            bot.take_page(page)

        # Now all three remaining bots should be at the prepare_trial barrier
        # Trial 3 (degree = 2)
        bots = [bots[1], bots[2], new_bot]
        for bot in bots:
            assert isinstance(bot.get_current_page(), WaitPage)
            assert "prepare_trial" in bot.active_barriers

        # Now we can advance past the prepare_trial barrier
        advance_past_wait_pages(bots)

        for bot in bots:
            assert bot.current_trial is not None
            assert isinstance(bot.get_current_page(), InfoPage)

        # They should all be assigned to the same node
        assert len(set([bot.current_trial.node for bot in bots])) == 1

        # Great, the new bot has successfully joined the team! They can go ahead and finish the experiment now.
        # There should be two more trials to complete, including this one, because max_nodes_per_chain == 4.
        # We want to keep an eye out for the new bot, and make sure it follows the other two bots in finishing the
        # trial maker, which will mean it only taking three trials instead of four.

        for remaining_nodes in range(2):
            for bot in bots:
                page = bot.get_current_page()
                assert isinstance(
                    page, InfoPage
                ), f"Bot {bot.id} unexpectedly saw {page} instead of an InfoPage, on remaining_nodes = {remaining_nodes}."
                bot.take_page(page)

                page = bot.get_current_page()
                assert page.label == "color_trial"
                bot.take_page(page)
            advance_past_wait_pages(bots)

        for bot in bots:
            page = bot.get_current_page()
            text = as_plain_text(page.prompt.text)
            assert "That's the end of the experiment!" in text

    def test_check_bot(self, bot: Bot, **kwargs):
        assert not bot.failed or bot.failed_reason == "simulated_failure"

Quorum

import time
from typing import List

import psynet.experiment
from psynet.bot import Bot
from psynet.consent import NoConsent
from psynet.modular_page import ModularPage, PushButtonControl
from psynet.page import InfoPage, SuccessfulEndPage
from psynet.sync import SimpleGrouper
from psynet.timeline import PageMaker, Timeline, conditional, for_loop, join
from psynet.trial.static import StaticNode, StaticTrial, StaticTrialMaker

nodes = [
    StaticNode(
        definition={"animal": animal},
    )
    for animal in ["cats", "dogs", "fish", "ponies", "zebras", "giraffes"]
]


class AnimalTrial(StaticTrial):
    time_estimate = 3

    def show_trial(self, experiment, participant):
        return ModularPage(
            "animal_trial",
            f"While we are waiting for other participants, please tell us: How much do you like {self.definition['animal']}?",
            PushButtonControl(
                ["Not at all", "A little", "Very much"],
            ),
        )


trial_maker = StaticTrialMaker(
    id_="animals",
    trial_class=AnimalTrial,
    nodes=nodes,
    expected_trials_per_participant=6,
    max_trials_per_participant=99,
    allow_repeated_nodes=True,
)

waiting_logic = PageMaker(
    trial_maker.cue_trial, time_estimate=AnimalTrial.time_estimate
)


class Exp(psynet.experiment.Experiment):
    label = "Quorum experiment"

    timeline = Timeline(
        NoConsent(),
        InfoPage(
            (
                "This demo demonstrates an experiment that operates with a quorum. "
                "In particular, there is a central part of the timeline that participants "
                "can only access once sufficiently many other participants are present at "
                "the same time. This would be useful for e.g. a multiplayer game. "
                "While participants are waiting for other participants, they take trials "
                "from another trial maker, so they don't waste their time."
            ),
            time_estimate=5,
        ),
        trial_maker.custom(
            SimpleGrouper(
                "quorum",
                initial_group_size=3,
                max_group_size=None,
                min_group_size=3,
                join_existing_groups=True,
                waiting_logic=waiting_logic,
                waiting_logic_expected_repetitions=10,
                max_wait_time=120,
            ),
            for_loop(
                label="quorate",
                iterate_over=range(
                    3
                ),  # Participant will only be allowed to visit the quorate page 3 times
                logic=join(
                    conditional(
                        "check_quorate",
                        condition=lambda participant: participant.sync_group.n_active_participants
                        >= participant.sync_group.min_group_size,
                        logic_if_true=PageMaker(
                            lambda participant: InfoPage(
                                f"We are now quorate. There are {participant.sync_group.n_active_participants - 1} other participants present."
                            ),
                            time_estimate=5,
                        ),
                        logic_if_false=waiting_logic,
                    ),
                ),
            ),
        ),
        SuccessfulEndPage(),
    )

    test_n_bots = 5

    def test_serial_run_bots(self, bots: List[Bot]):
        assert isinstance(bots[0].get_current_page(), InfoPage)
        bots[0].take_page()
        assert bots[0].get_current_page().label == "animal_trial"
        bots[0].take_page()
        assert bots[0].get_current_page().label == "animal_trial"

        assert isinstance(bots[1].get_current_page(), InfoPage)
        bots[1].take_page()
        assert bots[1].get_current_page().label == "animal_trial"

        assert isinstance(bots[2].get_current_page(), InfoPage)
        bots[2].take_page()

        # Currently barriers are checked in a background process, so
        # a participant should never be released instantly from a grouper,
        # even if they are the last participant to arrive.
        # We may change this in due course, once we're satisfied we're not going
        # to run into deadlocks.

        page = bots[2].get_current_page()
        if isinstance(page, ModularPage) and page.label == "animal_trial":
            # 2 seconds should be enough for the background process to run once
            time.sleep(2)
            bots[2].take_page()

        page = bots[2].get_current_page()
        assert isinstance(page, InfoPage)
        assert (
            "We are now quorate. There are 2 other participants present."
            in page.content
        )

        for bot in [bots[0], bots[1]]:
            bot.take_page()
            page = bot.get_current_page()
            assert (
                "We are now quorate. There are 2 other participants present."
                in page.content
            )

        bots[0].fail("simulated_failure")

        for bot in [bots[1], bots[2]]:
            bot.take_page()
            page = bot.get_current_page()
            assert page.label == "animal_trial"

        # Bring in a new bot to join the group
        bots[3].take_page()

        # As before, because of the background processes, bot 3 will probably need to
        # take one trial before they can continue.
        page = bots[3].get_current_page()
        if isinstance(page, ModularPage) and page.label == "animal_trial":
            time.sleep(2)
            bots[3].take_page()

        page = bots[3].get_current_page()
        assert (
            "We are now quorate. There are 2 other participants present."
            in page.content
        )

        # If we bring in a 5th participant, they should be able to join the main room right away.
        assert isinstance(bots[4].get_current_page(), InfoPage)
        bots[4].take_page()
        page = bots[4].get_current_page()
        assert (
            "We are now quorate. There are 3 other participants present."
            in page.content
        )

        for bot in [bots[1], bots[2], bots[3]]:
            bot.run_to_completion(render_pages=True)

    def test_check_bot(self, bot: Bot, **kwargs):
        assert not bot.failed or bot.failed_reason == "simulated_failure"