Source code for psynet.recruiters

import json
import os
import re
from math import ceil

import dallinger.recruiters
import dominate
import flask
import requests
from dallinger import db
from dallinger.config import get_config
from dallinger.db import session
from dallinger.notifications import admin_notifier, get_mailer
from dallinger.recruiters import RedisStore
from dallinger.utils import get_base_url
from dominate import tags
from dominate.util import raw
from sqlalchemy import Column, DateTime, ForeignKey, String
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound

from .consent import AudiovisualConsent, LucidConsent, OpenScienceConsent
from .data import SQLBase, SQLMixin, register_table
from .lucid import LucidService
from .utils import get_logger, render_template_with_translations

logger = get_logger()


[docs] class PsyNetRecruiter(dallinger.recruiters.CLIRecruiter): """ The PsyNetRecruiter base class """
[docs] def compensate_worker(self, *args, **kwargs): """A recruiter may provide a means to directly compensate a worker.""" raise RuntimeError("Compensation is not implemented.")
[docs] def notify_duration_exceeded(self, participants, reference_time): """ The participant has been working longer than the time defined in the "duration" config value. """ for participant in participants: participant.status = "abandoned" # We preserve this commit just in case Dallinger removes the external commit in the future session.commit()
[docs] def recruit(self, n=1): """Incremental recruitment isn't implemented for now, so we return an empty list.""" return []
# CAP Recruiter
[docs] class BaseCapRecruiter(PsyNetRecruiter): """ The CapRecruiter base class """
[docs] def open_recruitment(self, n=1): """ Return an empty list which otherwise would be a list of recruitment URLs. """ return {"items": [], "message": ""}
[docs] def close_recruitment(self): logger.info("No more participants required. Recruitment stopped.")
[docs] def reward_bonus(self, participant, amount, reason): """ Return values for `basePay` and `bonus` to cap-recruiter application. """ data = { "assignmentId": participant.assignment_id, "basePayment": self.config.get("base_payment"), "bonus": amount, "failed_reason": participant.failure_tags, } url = self.external_submission_url url += "/fail" if participant.failed else "/complete" requests.post( url, json=data, headers={"Authorization": os.environ.get("CAP_RECRUITER_AUTH_TOKEN")}, verify=False, # Temporary fix because of SSLCertVerificationError )
[docs] class CapRecruiter(BaseCapRecruiter): """ The production cap-recruiter. """ nickname = "cap-recruiter" external_submission_url = "https://cap-recruiter.ae.mpg.de/tasks"
[docs] class StagingCapRecruiter(BaseCapRecruiter): """ The staging cap-recruiter. """ nickname = "staging-cap-recruiter" external_submission_url = "https://staging-cap-recruiter.ae.mpg.de/tasks"
[docs] class DevCapRecruiter(BaseCapRecruiter): """ The development cap-recruiter. """ nickname = "dev-cap-recruiter" external_submission_url = "http://localhost:8000/tasks"
# Lucid Recruiter
[docs] @register_table class LucidRID(SQLBase, SQLMixin): __tablename__ = "lucid_rid" # These fields are removed from the database table as they are not needed. failed = None failed_reason = None time_of_death = None rid = Column(String, ForeignKey("participant.worker_id"), index=True) completed_at = Column(DateTime) terminated_at = Column(DateTime) termination_reason = Column(String)
[docs] class LucidRecruiterException(Exception): """Custom exception for LucidRecruiter"""
[docs] class BaseLucidRecruiter(PsyNetRecruiter): """ The LucidRecruiter base class """ required_consent_page = LucidConsent.LucidConsentPage optional_consent_pages = ( AudiovisualConsent.AudiovisualConsentPage, OpenScienceConsent.OpenScienceConsentPage, ) def __init__(self, *args, **kwargs): super().__init__() self.config = get_config() if self.config.get("show_reward"): raise RuntimeError( "Lucid recruitment requires `show_reward` to be set to `False`." ) self.mailer = get_mailer(self.config) self.notifies_admin = admin_notifier(self.config) self.lucidservice = LucidService( api_key=self.config.get("lucid_api_key"), sha1_hashing_key=self.config.get("lucid_sha1_hashing_key"), exp_config=self.config, recruitment_config=json.loads(self.config.get("lucid_recruitment_config")), ) self.store = kwargs.get("store", RedisStore()) @property def survey_number_storage_key(self): experiment_id = self.config.get("id") return "{}:{}".format(self.__class__.__name__, experiment_id) @property def in_progress(self): """Does a Lucid survey for the current experiment ID already exist?""" return self.current_survey_number() is not None def verify_consents(self, consents): error_msg = "Lucid recruitment requires consent 'LucidConsent' and optionally one of `AudiovisualConsent` or `OpenScienceConsent` (in this order)." if isinstance(consents[0], self.required_consent_page): if len(consents) == 1: pass elif len(consents) == 2 and isinstance( consents[1], self.optional_consent_pages ): pass else: raise RuntimeError(error_msg) else: raise RuntimeError(error_msg)
[docs] def current_survey_number(self): """ Return the survey number associated with the active experiment ID if any such survey exists. """ return self.store.get(self.survey_number_storage_key)
[docs] def open_recruitment(self, n=1): """Open a connection to Lucid and create a survey.""" from .experiment import get_and_load_config, get_experiment self.lucidservice.log(f"Opening initial recruitment for {n} participants.") if self.in_progress: raise LucidRecruiterException( "Tried to open_recruitment on already open recruiter." ) experiment = get_experiment() wage_per_hour = get_and_load_config().get("wage_per_hour") create_survey_request_params = { "bid_length_of_interview": ceil( experiment.estimated_completion_time(wage_per_hour) / 60 ), "live_url": self.ad_url.replace("http://", "https://"), "name": self.config.get("title"), "quota": n, "quota_cpi": round( experiment.estimated_max_reward(wage_per_hour), 2, ), } survey_info = self.lucidservice.create_survey(**create_survey_request_params) self._record_current_survey_number(survey_info["SurveyNumber"]) # Lucid Marketplace automatically adds 6 qualifications to US studies # when a survey is created (Age, Gender, Zip, Ethnicity, Hispanic, Standard HHI US). # We update the qualifications in this case to remove these constraints on the participants. # See https://developer.lucidhq.com/#post-create-a-survey if self.lucidservice.recruitment_config["survey"]["CountryLanguageID"] == 9: self.lucidservice.remove_default_qualifications_from_survey( self.current_survey_number() ) self.lucidservice.add_qualifications_to_survey(self.current_survey_number()) url = survey_info["ClientSurveyLiveURL"] self.lucidservice.log("Done creating Lucid project and survey.") self.lucidservice.log("----------") self.lucidservice.log("---------> " + url) self.lucidservice.log("----------") survey_id = self.current_survey_number() if survey_id is None: self.lucidservice.log("No survey in progress: Recruitment aborted.") return return { "items": [url], "message": f"Lucid survey {self.current_survey_number()} created successfully.", }
[docs] def close_recruitment(self): """ Lucid automatically ends recruitment when the number of completes has reached the target. """ self.lucidservice.log("Recruitment is automatically handled by Lucid.")
[docs] def normalize_entry_information(self, entry_information): """Accepts data from the recruited user and returns data needed to validate, create or load a Dallinger Participant. See :func:`~dallinger.experiment.Experiment.create_participant` for details. The default implementation extracts ``hit_id``, ``assignment_id``, and ``worker_id`` values directly from ``entry_information``. This implementation extracts the ``RID`` from ``entry_information`` and assigns the value to ``hit_id``, ``assignment_id``, and ``worker_id``. """ rid = entry_information.get("RID") hit_id = entry_information.get("hit_id") if hit_id is None: hit_id = entry_information.get("hitId") if rid is None and hit_id is None: raise LucidRecruiterException( "Either `RID` or `hit_id` has to be present in `entry_information`." ) if rid is None: rid = hit_id # Save RID info into the database try: LucidRID.query.filter_by(rid=rid).one() except NoResultFound: self.lucidservice.log(f"Saving RID '{rid}' into the database.") db.session.add(LucidRID(rid=rid)) db.session.commit() except MultipleResultsFound: raise MultipleResultsFound( f"Multiple rows for Lucid RID '{rid}' found. This should never happen." ) participant_data = { "hit_id": rid, "assignment_id": rid, "worker_id": rid, } if entry_information: participant_data["entry_information"] = entry_information return participant_data
[docs] def exit_response(self, experiment, participant): """ Delegate to the experiment for possible values to show to the participant and complete the survey. """ external_submit_url = self.external_submit_url(participant=participant) self.lucidservice.log(f"Exit redirect: {external_submit_url}") return render_template_with_translations( "exit_recruiter_lucid.html", external_submit_url=external_submit_url, )
[docs] def reward_bonus(self, participant, amount, reason): """ Set `completed_at` timestamp on participant's LucidRID entry """ if participant is not None and participant.progress == 1: self.complete_participant(participant.assignment_id) else: self.terminate_participant( participant.assignment_id, "Termination in 'reward_bonus' as 'participant.progress' was < 1", )
def _record_current_survey_number(self, survey_number): self.store.set(self.survey_number_storage_key, survey_number) def external_submit_url(self, participant=None, assignment_id=None): if participant is None and assignment_id is None: raise RuntimeError( "Error generating 'external_submit_url': One of 'participant' or 'assignment_id' needs to be provided." ) data = self.data_for_submit_url(participant, assignment_id) return self.lucidservice.generate_submit_url(ris=data["ris"], rid=data["rid"]) def data_for_submit_url(self, participant, assignment_id): # Standard terminate ris = 20 if participant is not None: assignment_id = participant.assignment_id if "performance_check" in participant.failure_tags: # Security terminate ris = 30 elif participant.progress == 1: # Complete ris = 10 if assignment_id is None: assignment_id = assignment_id return {"ris": ris, "rid": assignment_id} def error_page_content(self, _, _p, assignment_id, external_submit_url): if external_submit_url is None: external_submit_url = self.external_submit_url(assignment_id=assignment_id) html = tags.div() with html: tags.p( " ".join( [ _p( "lucid_error", "Redirecting to Lucid Marketplace...", ), ] ) ) tags.script( raw( 'setTimeout(() => { window.location = "' + external_submit_url + '"; }, 2000)' ) ) return html def time_until_termination_in_s(self, rid): return self.lucidservice.time_until_termination_in_s(rid) def complete_participant(self, rid): return self.lucidservice.complete_respondent(rid) def terminate_participant(self, rid, reason): return self.lucidservice.terminate_respondent(rid, reason) def set_termination_details(self, rid, reason): self.lucidservice.set_termination_details(rid, reason) @property def termination_time_in_s(self): lucid_recruitment_config = json.loads( self.config.get("lucid_recruitment_config") ) return lucid_recruitment_config.get("termination_time_in_s") @property def inactivity_timeout_in_s(self): lucid_recruitment_config = json.loads( self.config.get("lucid_recruitment_config") ) return lucid_recruitment_config.get("inactivity_timeout_in_s") @property def no_focus_timeout_in_s(self): lucid_recruitment_config = json.loads( self.config.get("lucid_recruitment_config") ) return lucid_recruitment_config.get("no_focus_timeout_in_s") @property def aggressive_no_focus_timeout_in_s(self): lucid_recruitment_config = json.loads( self.config.get("lucid_recruitment_config") ) return lucid_recruitment_config.get("aggressive_no_focus_timeout_in_s")
[docs] class DevLucidRecruiter(BaseLucidRecruiter): """ Development recruiter for the Lucid Marketplace. """ nickname = "dev-lucid-recruiter" def __init__(self, *args, **kwargs): super().__init__() self.ad_url = ( f"http://localhost.cap:5000/ad?recruiter={self.nickname}&RID=[%RID%]" )
[docs] class LucidRecruiter(BaseLucidRecruiter): """ The production Lucid recruiter. Recruit participants from the Lucid Marketplace. """ nickname = "lucid-recruiter" def __init__(self, *args, **kwargs): super().__init__() self.ad_url = f"{get_base_url()}/ad?recruiter={self.nickname}&RID=[%RID%]"
[docs] class GenericRecruiter(PsyNetRecruiter): """ An improved version of Dallinger's Hot-Air Recruiter. """ nickname = "generic"
[docs] def exit_response(self, experiment, participant): from psynet.timeline import Page message = experiment.render_exit_message(participant) if message is None: raise ValueError( "experiment.render_exit_message returned None. Did you forget to use 'return'?" ) elif isinstance(message, Page): raise ValueError( "Sorry, you can't return a Page from experiment.render_exit_message." ) elif message == "default_exit_message": return super().exit_response(experiment, participant) elif isinstance(message, str): html = dominate.tags.p(message).render() elif isinstance(message, dominate.dom_tag.dom_tag): html = message.render() else: raise ValueError( f"Invalid value of experiment.render_exit_message: {message}. " "You should return either a string or an HTML specification created using dominate tags " "(see https://pypi.org/project/dominate/)." ) return flask.render_template("custom_html.html", html=html)
[docs] def open_recruitment(self, n=1): res = super().open_recruitment(n=n) # Hide the Dallinger logs advice, because the advice doesn't work for SSH deployment res["message"] = re.sub( "Open the logs for this experiment.*", "", res["message"] ) res["message"] = re.sub( ".*in the logs for subsequent recruitment URLs\\.", "", res["message"] ) return res