Source code for psynet.asset

import os
import shutil
import subprocess
import tempfile
import time
import urllib
import urllib.parse
import urllib.request
import uuid
import warnings
from functools import cached_property
from typing import Optional

import boto3
import paramiko
import requests
from dallinger import db
from sqlalchemy import Boolean, Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import deferred, relationship
from tqdm import tqdm

from psynet.timeline import NullElt

from . import deployment_info
from .data import SQLBase, SQLMixin, ingest_to_model, register_table
from .field import PythonDict, PythonObject  # , register_extra_var
from .media import get_aws_credentials
from .process import LocalAsyncProcess
from .serialize import prepare_function_for_serialization
from .utils import (
    cache,
    classproperty,
    get_args,
    get_extension,
    get_file_size_mb,
    get_folder_size_mb,
    get_from_config,
    get_logger,
    md5_directory,
    md5_file,
    md5_object,
)

logger = get_logger()


[docs] class AssetSpecification(NullElt): """ A base class for asset specifications. An asset specification defines some kind of asset or collection or assets. It can be included within an experiment timeline. Parameters ---------- local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` uniquely identify that asset (i.e. no other asset should share that combination of properties). description : str An optional longer string that provides further documentation about the asset. """ def __init__( self, local_key, key_within_module, key_within_experiment, description ): self.export_path = None self.local_key = local_key self.key_within_module = key_within_module self.key_within_experiment = key_within_experiment self.description = description def prepare_for_deployment(self, registry): raise NotImplementedError def generate_export_path(self): path = self.key_within_experiment if ( path is not None and hasattr(self, "extension") and self.extension and not path.endswith(self.extension) ): path += self.extension return path
[docs] class AssetCollection(AssetSpecification): """ A base class for specifying a collection of assets. """ pass
[docs] class InheritedAssets(AssetCollection): """ Experimental: Provides a way to load assets from a previously deployed experiment into a new experiment. Parameters ---------- path : str Path to a CSV file specifying the previous assets. This CSV file should come from the ``db/asset.csv`` file of an experiment export. The CSV file can optionally be customized by deleting rows corresponding to unneeded assets, or it can be merged with analogous CSV files from other experiments. key : str A string that is used to identify the source of the imported assets in the ``Asset.inherited_from`` attribute. """ def __init__(self, path: str, key: str): raise NotImplementedError( "This code needs revisiting, the implementation has not been updated yet" ) super().__init__(key, local_key=None, description=None) self.path = path def prepare_for_deployment(self, registry): self.ingest_specification_to_db() def ingest_specification_to_db(self): clear_columns = ["parent"] with open(self.path, "r") as file: ingest_to_model( file, Asset, clear_columns=clear_columns, replace_columns=dict( inherited=True, inherited_from=self.key, ), )
[docs] @register_table class Asset(AssetSpecification, SQLBase, SQLMixin): """ Assets represent particular files (or sometimes collections of files) that are used within an experiment. It is encouraged to register things like audio files and video files as assets so that they can be managed appropriately by PsyNet's deploy and export mechanisms. Parameters ---------- local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` uniquely identify that asset (i.e. no other asset should share that combination of properties). description : str An optional longer string that provides further documentation about the asset. is_folder : bool Whether the asset is a folder. data_type : str Experimental: the nature of the asset's data. Could be used to determine visualization methods etc. extension : str The file extension, if applicable. parent : object The object that 'owns' the asset, if applicable, for example a Participant or a Node. key_within_module : str A string that uniquely identifies the asset within a given module. If left unspecified, this will be automatically generated with reference to the ``parent`` and the ``local_key`` arguments. key_within_experiment : str A string that uniquely identifies the asset within a given experiment. If left unspecified, this will be automatically generated with reference to the ``key_within_module`` and the ``module_id`` arguments. module_id : str The module within which the asset is located. personal : bool Whether the asset is 'personal' and hence omitted from anonymous database exports. Attributes ---------- needs_storage_backend : bool Whether the asset type needs a storage backend, e.g. a file server, or whether it can do without (e.g. in the case of an externally hosted resource accessible by a URL). psynet_version : str The version of PsyNet used to create the asset. deployment_id : str A string used to identify the particular experiment deployment. deposited: bool Whether the asset has been deposited yet. inherited : bool Whether the asset was inherited from a previous experiment, typically via the ``InheritedAssets`` functionality. inherited_from : str Identifies the source of an inherited asset. export_path : str A relative path that will be used by default when the asset is exported. participant_id : int ID of the participant who 'owns' the asset, if applicable. content_id : str A token used for checking whether the contents of two assets are equivalent. This takes various forms depending on the asset type. For a file, the ``content_id`` would typically be a hash; for an externally hosted asset, it would be the URL, etc. host_path : str The filepath used to host the asset within the storage repository, if applicable. url : str The URL that can be used to access the asset from the perspective of the experiment front-end. storage : AssetStorage The storage backend used for the asset. async_processes : list Lists all async processes that have been created for the asset, including completed ones. participant : If the parent is a ``Participant``, returns that participant. participants : list Lists all participants associated with the asset. trial : If the parent is a ``trial``, returns that trial. trials : list Lists all trials associated with the asset. node : If the parent is a ``Node``, returns that participant. nodes : list Lists all nodes associated with the asset. network : If the parent is a ``Network``, returns that participant. networks : list Lists all networks associated with the asset. errors : list Lists the errors associated with the asset. Linking assets to other database objects ---------------------------------------- PsyNet assets may be linked to other database objects. There are two kinds of links that may be used. First, an asset may possess a *parent*. This parental relationship is strict in the sense that an asset may not possess more than one parent. However, in addition to the parental relationship, it is possible to link the asset to an arbitrary number of additional database objects. These latter links have a key-value construction, meaning that one can access a given asset by reference to a given key, for example: ``node.assets["response"]``. Importantly, the same asset can have different keys for different objects; for example, it might be the ``response`` for one node, but the ``stimulus`` for another node. These latter relationships are instantiated with logic like the following: :: participant.assets["stimulus"] = my_asset """ # Inheriting from ``SQLBase`` and ``SQLMixin`` means that the ``Asset`` object is stored in the database. # Inheriting from ``NullElt`` means that the ``Asset`` object can be placed in the timeline. __tablename__ = "asset" __extra_vars__ = {} id = SQLMixin.id # Remove default SQL columns failed = None failed_reason = None time_of_death = None needs_storage_backend = True psynet_version = Column(String) deployment_id = Column(String) deposited = Column(Boolean) inherited = Column(Boolean, default=False) inherited_from = Column(String) module_id = Column(String, index=True) local_key = Column(String) key_within_module = Column(String, index=True) key_within_experiment = Column(String, index=True) # , onupdate="cascade") export_path = Column(String, index=True, unique=True) is_global = Column(Integer, ForeignKey("experiment.id"), index=True) parent = deferred(Column(PythonObject)) module_state_id = Column(Integer, ForeignKey("module_state.id"), index=True) participant_id = Column(Integer, ForeignKey("participant.id"), index=True) trial_id = Column(Integer, ForeignKey("info.id"), index=True) node_id = Column(Integer, ForeignKey("node.id"), index=True) network_id = Column(Integer, ForeignKey("network.id"), index=True) description = Column(String) personal = Column(Boolean) content_id = Column(String) host_path = Column(String) url = Column(String) is_folder = Column(Boolean) data_type = Column(String) extension = Column(String) storage = Column(PythonObject) node_definition = Column(PythonObject) async_processes = relationship("AsyncProcess") module_state_links = relationship( "AssetModuleState", order_by="AssetModuleState.creation_time", ) module_states = association_proxy("module_state_links", "module_state") participant_links = relationship( "AssetParticipant", order_by="AssetParticipant.creation_time", ) participants = association_proxy("participant_links", "participant") trial_links = relationship( "AssetTrial", order_by="AssetTrial.creation_time", ) trials = association_proxy("trial_links", "trial") node_links = relationship( "AssetNode", order_by="AssetNode.creation_time", ) nodes = association_proxy("node_links", "node") network_links = relationship( "AssetNetwork", order_by="AssetNetwork.creation_time", ) networks = association_proxy("network_links", "network") errors = relationship("ErrorRecord") @property def trial(self): from .trial.main import Trial if isinstance(self.parent, Trial): return self.parent @property def node(self): from .trial.main import Trial, TrialNode if isinstance(self.parent, Trial): return self.parent.node elif isinstance(self.parent, TrialNode): return self.parent @property def network(self): from .trial.main import Trial, TrialNetwork, TrialNode if isinstance(self.parent, (Trial, TrialNode)): return self.parent.network elif isinstance(self.parent, TrialNetwork): return self.parent @property def participant(self): from .participant import Participant if self.parent is None: return None elif isinstance(self.parent, Participant): return self.parent else: return self.parent.participant @property def trial_maker(self): from psynet.experiment import get_trial_maker return get_trial_maker(self.trial_maker_id) @classproperty def experiment_class(cls): # noqa from .experiment import import_local_experiment return import_local_experiment()["class"] @classproperty def registry(cls): # noqa return cls.experiment_class.assets @classproperty def default_storage(cls): # noqa return cls.registry.storage def __init__( self, *, local_key=None, key_within_module=None, key_within_experiment=None, description=None, is_folder=False, data_type=None, extension=None, parent=None, module_id=None, personal=False, ): self.deposit_on_the_fly = True self.key_within_module = key_within_module from . import __version__ as psynet_version self.psynet_version = psynet_version self.is_folder = is_folder self.extension = extension if extension else self.get_extension() if data_type is None: data_type = self.infer_data_type() self.data_type = data_type self.parent = parent from psynet.participant import Participant from psynet.trial import Trial from psynet.trial.main import TrialNetwork, TrialNode if isinstance(parent, Participant): self.participant_id = parent.id elif isinstance(parent, Trial): self.trial_id = parent.id elif isinstance(parent, TrialNode): self.node_id = parent.id elif isinstance(parent, TrialNetwork): self.network_id = parent.id if module_id: self.module_id = module_id else: if self.parent: self.module_id = self.parent.module_id if self.participant: if self.participant.module_state: self.module_state = self.participant.module_state self.personal = personal super().__init__( local_key, key_within_module, key_within_experiment, description ) def get_ancestors(self): return { "network": self.network.id if self.network else None, "node": self.node.id if self.node else None, "degree": self.node.degree if hasattr(self.node, "degree") else None, "trial": self.trial.id if self.trial else None, "participant": self.participant.id if self.participant else None, } def set_keys(self): if self.key_within_module is None: self.key_within_module = self.generate_key_within_module() if self.key_within_experiment is None: self.key_within_experiment = self.generate_key_within_experiment() if not self.local_key and self.key_within_module: self.local_key = self.key_within_module self.host_path = self.generate_host_path() self.export_path = self.generate_export_path() self.url = self.get_url() def generate_key_within_experiment(self): if self.module_id is None: base = "common" else: base = self.module_id return base + "/" + self.key_within_module def generate_key_within_module(self): return os.path.join( self.generate_key_within_module_parents(), self.generate_key_within_module_child(), ) def generate_key_within_module_parents(self): ids = [] if self.participant: ids.append(f"participants/participant_{self.participant.id}") elif self.node: ids.append("nodes") return "/".join(ids) def generate_key_within_module_child(self): ids = self.generate_key_within_module_child_ids() if self.local_key: ids.append(f"{self.local_key}") return "__".join(ids) def generate_key_within_module_child_ids(self): from psynet.trial.static import StaticNetwork ancestors = self.get_ancestors() if self.network and isinstance(self.network, StaticNetwork): id_types = ["node", "trial"] else: id_types = ["network", "degree", "node", "trial"] return [ f"{id_type}_{ancestors[id_type]}" for id_type in id_types if ancestors[id_type] is not None ] def consume(self, experiment, participant): if not self.module_id: self.module_id = participant.module_id self.set_keys() if self.deposit_on_the_fly: self.deposit() def infer_data_type(self): if self.extension in ["wav", "mp3"]: return "audio" elif self.extension in ["mp4", "avi"]: return "video" else: return None def get_extension(self): raise NotImplementedError
[docs] def prepare_for_deployment(self, registry): """Runs in advance of the experiment being deployed to the remote server.""" self.deposit(self.default_storage)
[docs] def deposit( self, storage=None, async_: bool = False, delete_input: bool = False, ): """ Parameters ---------- storage : If set to an ``AssetStorage`` object, the asset will be deposited to the provided storage location rather than defaulting to the Experiment class's storage location. async_ : If set to ``True``, then the asset deposit will be performed asynchronously and the program's execution will continue without waiting for the deposit to complete. It is sensible to set this to ``True`` if you see that the participant interface is being held up noticeably by waiting for the deposit to complete. delete_input : If set to ``True``, then the input file will be deleted after it has been deposited. """ try: if storage is None: storage = self.default_storage self.storage = storage self.deployment_id = self.registry.deployment_id self.content_id = self.get_content_id() self.set_keys() db.session.add(self) if self.parent: assert self.local_key _local_key = self.local_key self.parent.assets[_local_key] = self ancestors = self.get_ancestors() self.network_id = ancestors["network"] self.node_id = ancestors["node"] self.trial_id = ancestors["trial"] self.participant_id = ancestors["participant"] if not self.participant_id: from .experiment import get_experiment exp = get_experiment() exp.global_assets.append(self) # Note: performing the deposit cues post-deposit actions as well (e.g. async_post_trial), # which may rely on the asset being in its complete state. Any information that may be needed # by these post-deposit actions must be saved before this step. self._deposit(self.storage, async_, delete_input) if not self.content_id: self.content_id = self.get_content_id() return self finally: pass
def _deposit(self, storage: "AssetStorage", async_: bool, delete_input: bool): """ Performs the actual deposit, confident that no duplicates exist. Returns ------- Returns ``True`` if the deposit has been completed, or ``False`` if the deposit has yet to be completed, typically because it is being performed in an asynchronous process which will take responsibility for marking the deposit as complete in due course. """ raise NotImplementedError
[docs] def delete_input(self): """ Deletes the input file(s) that make(s) up the asset. """ raise NotImplementedError
def get_content_id(self): raise NotImplementedError def generate_host_path(self): raise NotImplementedError def export(self, path, ssh_host=None, ssh_user=None): try: self.storage.export(self, path, ssh_host=ssh_host, ssh_user=ssh_user) except Exception: from .command_line import log log(f"Failed to export the asset {self.id} to path {path}.") raise def export_subfile(self, subfile, path): assert self.is_folder try: self.storage.export_subfile(self, subfile, path) except Exception: from .command_line import log log( f"Failed to export the subfile {subfile} from asset {self.id} to path {path}." ) raise def export_subfolder(self, subfolder, path): try: self.storage.export_subfolder(self, subfolder, path) except Exception: from .command_line import log log( f"Failed to export the subfolder {subfolder} from asset {self.id} to path {path}." ) raise def receive_node_definition(self, definition): self.node_definition = definition def read_text(self): assert not self.is_folder with tempfile.NamedTemporaryFile() as f: self.export(f.name) with open(f.name, "r") as reader: return reader.read()
[docs] @register_table class AssetModuleState(AssetLink, SQLBase, SQLMixin): __tablename__ = "asset_module_state" module_state_id = Column(Integer, ForeignKey("module_state.id"), primary_key=True) module_state = relationship( "psynet.timeline.ModuleState", back_populates="asset_links" ) asset = relationship("Asset", back_populates="module_state_links")
[docs] @register_table class AssetParticipant(AssetLink, SQLBase, SQLMixin): __tablename__ = "asset_participant" participant_id = Column(Integer, ForeignKey("participant.id"), primary_key=True) participant = relationship( "psynet.participant.Participant", back_populates="asset_links" ) asset = relationship("Asset", back_populates="participant_links")
[docs] @register_table class AssetTrial(AssetLink, SQLBase, SQLMixin): __tablename__ = "asset_trial" trial_id = Column(Integer, ForeignKey("info.id"), primary_key=True) trial = relationship("psynet.trial.main.Trial", back_populates="asset_links") asset = relationship("Asset", back_populates="trial_links")
[docs] @register_table class AssetNode(AssetLink, SQLBase, SQLMixin): __tablename__ = "asset_node" node_id = Column(Integer, ForeignKey("node.id"), primary_key=True) node = relationship("TrialNode", back_populates="asset_links") asset = relationship( "Asset", back_populates="node_links", )
[docs] @register_table class AssetNetwork(AssetLink, SQLBase, SQLMixin): __tablename__ = "asset_network" network_id = Column(Integer, ForeignKey("network.id"), primary_key=True) network = relationship("TrialNetwork", back_populates="asset_links") asset = relationship( "Asset", back_populates="network_links", )
[docs] class ManagedAsset(Asset): """ This is a parent class for assets that are actively 'managed' by PsyNet. Active managing means that PsyNet takes responsibility for storing the asset in its own storage repositories. This class is not generally instantiated directly, but is instead instantiated via its subclasses. Parameters ---------- input_path : str Path to the file/folder from which the asset is to be created. local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` should uniquely identify that asset (i.e. no other asset should share that combination of properties). description : str An optional longer string that provides further documentation about the asset. is_folder : bool Whether the asset is a folder. data_type : str Experimental: the nature of the asset's data. Could be used to determine visualization methods etc. extension : str The file extension, if applicable. parent : object The object that 'owns' the asset, if applicable, for example a Participant or a Node. key_within_module : str An optional key that uniquely identifies the asset within a given module. If left unspecified, this will be automatically generated with reference to the ``parent`` and the ``local_key`` arguments. key_within_experiment : str A string that uniquely identifies the asset within a given experiment. If left unspecified, this will be automatically generated with reference to the ``key_within_module`` and the ``module_id`` arguments. module_id : str Identifies the module with which the asset should be associated. If left blank, PsyNet will attempt to infer the ``module_id`` from the ``parent`` parameter, if provided. personal : bool Whether the asset is 'personal' and hence omitted from anonymous database exports. obfuscate : int Determines the extent to which the asset's generated URL should be obfuscated. By default, ``obfuscate=1``, which means that the URL contains a human-readable component containing useful metadata (e.g the participant ID), but also contains a randomly generated string so that malicious agents cannot retrieve arbitrary assets by guessing URLs. If ``obfuscate=0``, then the randomly generated string is not added. If ``obfuscate=2``, then the human-readable component is omitted, and only the random portion is kept. This might be useful in cases where you're worried about participants cheating on the experiment by looking at file URLs. Attributes ---------- md5_contents : str Contains an automatically generated MD5 hash of the object's contents, where 'contents' is liberally defined; it could mean hashing the file itself, or hashing the arguments of the function used to generate that file. size_mb : float The size of the asset's file(s) (in MB). deposit_time_sec : float The time it took to deposit the asset. needs_storage_backend : bool Whether the asset type needs a storage backend, e.g. a file server, or whether it can do without (e.g. in the case of an externally hosted resource accessible by a URL). psynet_version : str The version of PsyNet used to create the asset. deployment_id : str A string used to identify the particular experiment deployment. deposited: bool Whether the asset has been deposited yet. inherited : bool Whether the asset was inherited from a previous experiment, typically via the ``InheritedAssets`` functionality. inherited_from : str Identifies the source of an inherited asset. export_path : str A relative path constructed from the key that will be used by default when the asset is exported. participant_id : int ID of the participant who 'owns' the asset, if applicable. content_id : str A token used for checking whether the contents of two assets are equivalent. This takes various forms depending on the asset type. For a file, the ``content_id`` would typically be a hash; for an externally hosted asset, it would be the URL, etc. host_path : str The filepath used to host the asset within the storage repository, if applicable. url : str The URL that can be used to access the asset from the perspective of the experiment front-end. storage : AssetStorage The storage backend used for the asset. async_processes : list Lists all async processes that have been created for the asset, including completed ones. participant : If the parent is a ``Participant``, returns that participant. participants : list Lists all participants associated with the asset. trial : If the parent is a ``trial``, returns that trial. trials : list Lists all trials associated with the asset. node : If the parent is a ``Node``, returns that participant. nodes : list Lists all nodes associated with the asset. network : If the parent is a ``Network``, returns that participant. networks : list Lists all networks associated with the asset. errors : list Lists the errors associated with the asset. Linking assets to other database objects ---------------------------------------- PsyNet assets may be linked to other database objects. There are two kinds of links that may be used. First, an asset may possess a *parent*. This parental relationship is strict in the sense that an asset may not possess more than one parent. However, in addition to the parental relationship, it is possible to link the asset to an arbitrary number of additional database objects. These latter links have a key-value construction, meaning that one can access a given asset by reference to a given key, for example: ``node.assets["response"]``. Importantly, the same asset can have different keys for different objects; for example, it might be the ``response`` for one node, but the ``stimulus`` for another node. These latter relationships are instantiated with logic like the following: :: participant.assets["stimulus"] = my_asset """ input_path = Column(String) obfuscate = Column(Integer) md5_contents = Column(String) size_mb = Column(Float) deposit_time_sec = Column(Float) def __init__( self, input_path: str, *, local_key=None, key_within_module=None, key_within_experiment=None, description=None, is_folder=None, data_type=None, extension=None, parent=None, module_id=None, personal=False, obfuscate=1, # 0: no obfuscation; 1: can't guess URL; 2: can't guess content ): self.deposited = False self.input_path = input_path self.obfuscate = obfuscate if is_folder is None: is_folder = os.path.isdir(input_path) super().__init__( local_key=local_key, key_within_module=key_within_module, key_within_experiment=key_within_experiment, is_folder=is_folder, description=description, data_type=data_type, extension=extension, module_id=module_id, parent=parent, personal=personal, ) def get_content_id(self): return self.get_md5_contents() def get_md5_contents(self): return self._get_md5_contents(self.input_path, self.is_folder) @cache def _get_md5_contents(self, path, is_folder): f = md5_directory if is_folder else md5_file return f(path) def get_extension(self): return get_extension(self.input_path) def _deposit(self, storage: "AssetStorage", async_: bool, delete_input: bool): if self.needs_storage_backend and isinstance(storage, NoStorage): raise RuntimeError( "Cannot deposit this asset " f"(type = {type(self).__name__}, id = {self.id}) " "without an asset storage backend. " "Please add one to your experiment class, for example by writing " "asset_storage = S3Storage('your-s3-bucket', 'your-subdirectory') " "in your experiment class." ) self.set_keys() self.storage.update_asset_metadata(self) if self._needs_depositing(): time_start = time.perf_counter() self.prepare_input() self.size_mb = self.get_size_mb() self.md5_contents = self.get_md5_contents() storage.receive_deposit(self, self.host_path, async_, delete_input) time_end = time.perf_counter() self.deposit_time_sec = time_end - time_start else: self.deposited = True def prepare_input(self): pass def _needs_depositing(self): return True def after_deposit(self): # logger.info("Calling after_deposit.") if self.trial: logger.info( "Calling check_if_can_run_async_post_trial as part of after_deposit." ) self.trial.check_if_can_run_async_post_trial() self.trial.check_if_can_mark_as_finalized() def get_url(self): return self.storage.get_url(self.host_path)
[docs] def delete_input(self): if self.is_folder: shutil.rmtree(self.input_path) else: os.remove(self.input_path)
def get_size_mb(self): if self.is_folder: return get_folder_size_mb(self.input_path) else: return get_file_size_mb(self.input_path) def generate_host_path(self): raise NotImplementedError @staticmethod def generate_uuid(): return str(uuid.uuid4())
[docs] class ExperimentAsset(ManagedAsset): """ The ``ExperimentAsset`` class is one of the most commonly used Asset classes. It refers to assets that are specific to the current experiment deployment. This would typically mean assets that are generated *during the course* of the experiment, for example recordings from a singer, or stimuli generated on the basis of participant responses. Examples -------- :: import tempfile with tempfile.NamedTemporaryFile("w") as file: file.write(f"Your message here") asset = ExperimentAsset( local_key="my_message", input_path=file.name, extension=".txt", parent=participant, ) asset.deposit() Parameters ---------- input_path : str Path to the file/folder from which the asset is to be created. local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` should uniquely identify that asset (i.e. no other asset should share that combination of properties). description : str An optional longer string that provides further documentation about the asset. is_folder : bool Whether the asset is a folder. data_type : str Experimental: the nature of the asset's data. Could be used to determine visualization methods etc. extension : str The file extension, if applicable. parent : object The object that 'owns' the asset, if applicable, for example a Participant or a Node. key_within_module : str An optional key that uniquely identifies the asset within a given module. If left unspecified, this will be automatically generated with reference to the ``parent`` and the ``local_key`` arguments. key_within_experiment : str A string that uniquely identifies the asset within a given experiment. If left unspecified, this will be automatically generated with reference to the ``key_within_module`` and the ``module_id`` arguments. module_id : str Identifies the module with which the asset should be associated. If left blank, PsyNet will attempt to infer the ``module_id`` from the ``parent`` parameter, if provided. personal : bool Whether the asset is 'personal' and hence omitted from anonymous database exports. obfuscate : int Determines the extent to which the asset's generated URL should be obfuscated. By default, ``obfuscate=1``, which means that the URL contains a human-readable component containing useful metadata (e.g the participant ID), but also contains a randomly generated string so that malicious agents cannot retrieve arbitrary assets by guessing URLs. If ``obfuscate=0``, then the randomly generated string is not added. If ``obfuscate=2``, then the human-readable component is omitted, and only the random portion is kept. This might be useful in cases where you're worried about participants cheating on the experiment by looking at file URLs. Attributes ---------- needs_storage_backend : bool Whether the asset type needs a storage backend, e.g. a file server, or whether it can do without (e.g. in the case of an externally hosted resource accessible by a URL). psynet_version : str The version of PsyNet used to create the asset. deployment_id : str A string used to identify the particular experiment deployment. deposited: bool Whether the asset has been deposited yet. inherited : bool Whether the asset was inherited from a previous experiment, typically via the ``InheritedAssets`` functionality. inherited_from : str Identifies the source of an inherited asset. export_path : str A relative path constructed that will be used by default when the asset is exported. participant_id : int ID of the participant who 'owns' the asset, if applicable. content_id : str A token used for checking whether the contents of two assets are equivalent. This takes various forms depending on the asset type. For a file, the ``content_id`` would typically be a hash; for an externally hosted asset, it would be the URL, etc. host_path : str The filepath used to host the asset within the storage repository, if applicable. url : str The URL that can be used to access the asset from the perspective of the experiment front-end. storage : AssetStorage The storage backend used for the asset. async_processes : list Lists all async processes that have been created for the asset, including completed ones. participant : If the parent is a ``Participant``, returns that participant. participants : list Lists all participants associated with the asset. trial : If the parent is a ``trial``, returns that trial. trials : list Lists all trials associated with the asset. node : If the parent is a ``Node``, returns that participant. nodes : list Lists all nodes associated with the asset. network : If the parent is a ``Network``, returns that participant. networks : list Lists all networks associated with the asset. errors : list Lists the errors associated with the asset. Linking assets to other database objects ---------------------------------------- PsyNet assets may be linked to other database objects. There are two kinds of links that may be used. First, an asset may possess a *parent*. This parental relationship is strict in the sense that an asset may not possess more than one parent. However, in addition to the parental relationship, it is possible to link the asset to an arbitrary number of additional database objects. These latter links have a key-value construction, meaning that one can access a given asset by reference to a given key, for example: ``node.assets["response"]``. Importantly, the same asset can have different keys for different objects; for example, it might be the ``response`` for one node, but the ``stimulus`` for another node. These latter relationships are instantiated with logic like the following: :: participant.assets["stimulus"] = my_asset """ def generate_host_path(self): path = self.obfuscate_key(self.key_within_experiment) if self.extension: path += self.extension return os.path.join("experiments", self.deployment_id, path) def obfuscate_key(self, key): random = self.generate_uuid() if self.obfuscate == 0: return key elif self.obfuscate == 1: key += "__" + random elif self.obfuscate == 2: key = "private/" + random else: raise ValueError(f"Invalid value of obfuscate: {self.obfuscate}") return key
[docs] class CachedAsset(ManagedAsset): """ The classic use of a ``CachedAsset`` would be to store some kind of stimulus that is pre-defined in advance of experiment launch. For example: :: asset = CachedAsset( key_within_module="bier", input_path="bier.wav", description="A recording of someone saying 'bier'", ) Cached assets are most commonly instantiated by passing them to the ``assets`` arguments of modules or nodes when defining the Experiment timeline. PsyNet compiles these assets together before experiment deployment and makes sure they are uploaded if necessary. In contrast to Experiment Assets, Cached Assets are shared between different experiments, so as to avoid duplicating time-consuming file generation or upload routines. The cached assets are stored in the selected asset storage back-end, and if PsyNet detects that the requested asset exists already then it will skip creating/uploading that asset. Under the hood there is some special logic to ensure that the caches are invalidated if the file content has changed. Parameters ---------- input_path : str Path to the file/folder from which the asset is to be created. local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` should uniquely identify that asset (i.e. no other asset should share that combination of properties). description : str An optional longer string that provides further documentation about the asset. is_folder : bool Whether the asset is a folder. data_type : str Experimental: the nature of the asset's data. Could be used to determine visualization methods etc. extension : str The file extension, if applicable. parent : object The object that 'owns' the asset, if applicable, for example a Participant or a Node. key_within_module : str An optional key that uniquely identifies the asset within a given module. If left unspecified, this will be automatically generated with reference to the ``parent`` and the ``local_key`` arguments. key_within_experiment : str A string that uniquely identifies the asset within a given experiment. If left unspecified, this will be automatically generated with reference to the ``key_within_module`` and the ``module_id`` arguments. module_id : str Identifies the module with which the asset should be associated. If left blank, PsyNet will attempt to infer the ``module_id`` from the ``parent`` parameter, if provided. personal : bool Whether the asset is 'personal' and hence omitted from anonymous database exports. obfuscate : int Determines the extent to which the asset's generated URL should be obfuscated. By default, ``obfuscate=1``, which means that the URL contains a human-readable component containing useful metadata (e.g the participant ID), but also contains a randomly generated string so that malicious agents cannot retrieve arbitrary assets by guessing URLs. If ``obfuscate=0``, then the randomly generated string is not added. If ``obfuscate=2``, then the human-readable component is omitted, and only the random portion is kept. This might be useful in cases where you're worried about participants cheating on the experiment by looking at file URLs. Attributes ---------- md5_contents : str Contains an automatically generated MD5 hash of the object's contents, where 'contents' is liberally defined; it could mean hashing the file itself, or hashing the arguments of the function used to generate that file. size_mb : float The size of the asset's file(s) (in MB). deposit_time_sec : float The time it took to deposit the asset. needs_storage_backend : bool Whether the asset type needs a storage backend, e.g. a file server, or whether it can do without (e.g. in the case of an externally hosted resource accessible by a URL). psynet_version : str The version of PsyNet used to create the asset. deployment_id : str A string used to identify the particular experiment deployment. deposited: bool Whether the asset has been deposited yet. inherited : bool Whether the asset was inherited from a previous experiment, typically via the ``InheritedAssets`` functionality. inherited_from : str Identifies the source of an inherited asset. export_path : str A relative path that will be used by default when the asset is exported. participant_id : int ID of the participant who 'owns' the asset, if applicable. content_id : str A token used for checking whether the contents of two assets are equivalent. This takes various forms depending on the asset type. For a file, the ``content_id`` would typically be a hash; for an externally hosted asset, it would be the URL, etc. host_path : str The filepath used to host the asset within the storage repository, if applicable. url : str The URL that can be used to access the asset from the perspective of the experiment front-end. storage : AssetStorage The storage backend used for the asset. async_processes : list Lists all async processes that have been created for the asset, including completed ones. participant : If the parent is a ``Participant``, returns that participant. participants : list Lists all participants associated with the asset. trial : If the parent is a ``trial``, returns that trial. trials : list Lists all trials associated with the asset. node : If the parent is a ``Node``, returns that participant. nodes : list Lists all nodes associated with the asset. network : If the parent is a ``Network``, returns that participant. networks : list Lists all networks associated with the asset. errors : list Lists the errors associated with the asset. Linking assets to other database objects ---------------------------------------- PsyNet assets may be linked to other database objects. There are two kinds of links that may be used. First, an asset may possess a *parent*. This parental relationship is strict in the sense that an asset may not possess more than one parent. However, in addition to the parental relationship, it is possible to link the asset to an arbitrary number of additional database objects. These latter links have a key-value construction, meaning that one can access a given asset by reference to a given key, for example: ``node.assets["response"]``. Importantly, the same asset can have different keys for different objects; for example, it might be the ``response`` for one node, but the ``stimulus`` for another node. These latter relationships are instantiated with logic like the following: :: participant.assets["stimulus"] = my_asset """ used_cache = Column(Boolean) @cached_property def cache_key(self): return self.get_md5_contents() def generate_host_path(self): key = self.key_within_experiment # e.g. big-audio-file.wav cache_key = self.cache_key if self.obfuscate == 2: base = "private" else: base = key host_path = os.path.join("cached", base, cache_key) if self.type != "folder": host_path += self.extension return host_path def _needs_depositing(self): exists_in_cache = self.storage.check_cache( self.host_path, is_folder=self.is_folder ) self.used_cache = exists_in_cache return not exists_in_cache def retrieve_contents(self): pass
[docs] def delete_input(self): pass
[docs] class FunctionAssetMixin: """ This Mixin is used to define Asset classes that create their assets not from input files but from functions that are called with a specified set of arguments. It is not to be instantiated directly. Parameters ---------- function : callable A function responsible for generating the asset. The function should receive an argument called ``path`` and create a file or a folder at that path. It can also receive additional arguments specified via the ``arguments`` parameter. arguments : dict An optional dictionary of arguments that should be passed to the function. Attributes ---------- computation_time_sec : float The time taken to generate the asset. """ # The following conditional logic in the column definitions is required # to prevent column conflict errors, see # https://docs.sqlalchemy.org/en/13/orm/extensions/declarative/inheritance.html#resolving-column-conflicts @declared_attr def function(cls): return cls.__table__.c.get("function", Column(PythonObject)) @declared_attr def arguments(cls): return cls.__table__.c.get("arguments", deferred(Column(PythonDict))) @declared_attr def computation_time_sec(cls): return cls.__table__.c.get("computation_time_sec", Column(Float)) def __init__( self, function, *, arguments: Optional[dict] = None, is_folder=False, description=None, data_type=None, extension=None, local_key: Optional[str] = None, key_within_module=None, key_within_experiment=None, module_id=None, parent=None, personal=False, obfuscate=1, # 0: no obfuscation; 1: can't guess URL; 2: can't guess content ): if arguments is None: arguments = {} function, arguments = prepare_function_for_serialization(function, arguments) self.function = function self.arguments = arguments self.temp_dir = None self.input_path = None super().__init__( local_key=local_key, key_within_module=key_within_module, key_within_experiment=key_within_experiment, input_path=self.input_path, is_folder=is_folder, description=description, data_type=data_type, extension=extension, parent=parent, module_id=module_id, personal=personal, obfuscate=obfuscate, ) def __del__(self): if hasattr(self, "temp_dir") and self.temp_dir: self.temp_dir.cleanup() def deposit( self, storage=None, async_: bool = False, ): self.input_path = self.generate_input_path() super().deposit( storage, async_, delete_input=True, ) def generate_input_path(self): if self.is_folder: return tempfile.mkdtemp() else: suffix = self.extension if self.extension else "" return tempfile.NamedTemporaryFile(delete=False, suffix=suffix).name @property def instructions(self): """ The 'instructions' that define how to create the asset. """ return dict(function=self.function, arguments=self.arguments) def get_md5_instructions(self): return md5_object(self.instructions) def get_md5_contents(self): # TODO - consider whether this should be deleted if self.input_path is None: return None else: return super().get_md5_contents() def get_size_mb(self): if self.input_path is None: return None else: return super().get_size_mb() def prepare_input(self): time_start = time.perf_counter() self.function(path=self.input_path, **self.arguments) time_end = time.perf_counter() self.computation_time_sec = time_end - time_start def receive_node_definition(self, definition): super().receive_node_definition(definition) requested_args = get_args(self.function) for key, value in definition.items(): if key in requested_args: self.arguments[key] = value
# class FunctionAsset(FunctionAssetMixin, ExperimentAsset): # # FunctionAssetMixin comes first in the inheritance hierarchy # # because we need to use its ``__init__`` method. # """ # # """ # pass
[docs] class OnDemandAsset(FunctionAssetMixin, ExperimentAsset): """ An on-demand asset is an asset whose files are not stored directly in any storage back-end, but instead are created on demand when the asset is requested. This creation is typically triggered by making a call to the asset's URL, accessible via the ``OnDemandAsset.url`` attribute. Parameters ---------- function : callable A function responsible for generating the asset. The function should receive an argument called ``path`` and create a file or a folder at that path. It can also receive additional arguments specified via the ``arguments`` parameter. local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` should uniquely identify that asset (i.e. no other asset should share that combination of properties). arguments : dict An optional dictionary of arguments that should be passed to the function. is_folder : bool Whether the asset is a folder. description : str An optional longer string that provides further documentation about the asset. data_type : str Experimental: the nature of the asset's data. Could be used to determine visualization methods etc. extension : str The file extension, if applicable. key_within_module : str An optional key that uniquely identifies the asset within a given module. If left unspecified, this will be automatically generated with reference to the ``parent`` and the ``local_key`` arguments. key_within_experiment : str A string that uniquely identifies the asset within a given experiment. If left unspecified, this will be automatically generated with reference to the ``key_within_module`` and the ``module_id`` arguments. module_id : str Identifies the module with which the asset should be associated. If left blank, PsyNet will attempt to infer the ``module_id`` from the ``parent`` parameter, if provided. parent : object The object that 'owns' the asset, if applicable, for example a Participant or a Node. personal : bool Whether the asset is 'personal' and hence omitted from anonymous database exports. obfuscate : int Determines the extent to which the asset's generated URL should be obfuscated. By default, ``obfuscate=1``, which means that the URL contains a human-readable component containing useful metadata (e.g the participant ID), but also contains a randomly generated string so that malicious agents cannot retrieve arbitrary assets by guessing URLs. If ``obfuscate=0``, then the randomly generated string is not added. If ``obfuscate=2``, then the human-readable component is omitted, and only the random portion is kept. This might be useful in cases where you're worried about participants cheating on the experiment by looking at file URLs. Attributes ---------- secret : str A randomly generated string that is used to prevent malicious agents from guessing the asset's URL. TODO - check whether the URL obfuscation functionality makes this redundant. Attributes ---------- computation_time_sec : float The time taken to generate the asset. TODO - check whether this is populated in practice. psynet_version : str The version of PsyNet used to create the asset. deployment_id : str A string used to identify the particular experiment deployment. deposited: bool Whether the asset has been deposited yet. inherited : bool Whether the asset was inherited from a previous experiment, typically via the ``InheritedAssets`` functionality. inherited_from : str Identifies the source of an inherited asset. export_path : str A relative path that will be used by default when the asset is exported. participant_id : int ID of the participant who 'owns' the asset, if applicable. content_id : str A token used for checking whether the contents of two assets are equivalent. This takes various forms depending on the asset type. For a file, the ``content_id`` would typically be a hash; for an externally hosted asset, it would be the URL, etc. host_path : str The filepath used to host the asset within the storage repository, if applicable. url : str The URL that can be used to access the asset from the perspective of the experiment front-end. storage : AssetStorage The storage backend used for the asset. async_processes : list Lists all async processes that have been created for the asset, including completed ones. participant : If the parent is a ``Participant``, returns that participant. participants : list Lists all participants associated with the asset. trial : If the parent is a ``trial``, returns that trial. trials : list Lists all trials associated with the asset. node : If the parent is a ``Node``, returns that participant. nodes : list Lists all nodes associated with the asset. network : If the parent is a ``Network``, returns that participant. networks : list Lists all networks associated with the asset. errors : list Lists the errors associated with the asset. Linking assets to other database objects ---------------------------------------- PsyNet assets may be linked to other database objects. There are two kinds of links that may be used. First, an asset may possess a *parent*. This parental relationship is strict in the sense that an asset may not possess more than one parent. However, in addition to the parental relationship, it is possible to link the asset to an arbitrary number of additional database objects. These latter links have a key-value construction, meaning that one can access a given asset by reference to a given key, for example: ``node.assets["response"]``. Importantly, the same asset can have different keys for different objects; for example, it might be the ``response`` for one node, but the ``stimulus`` for another node. These latter relationships are instantiated with logic like the following: :: participant.assets["stimulus"] = my_asset """ secret = Column(String) needs_storage_backend = False def __init__( self, *, function, local_key=None, key_within_module: Optional[str] = None, key_within_experiment=None, arguments: Optional[dict] = None, is_folder: bool = False, description=None, data_type=None, extension=None, module_id: Optional[str] = None, parent=None, personal=False, obfuscate=1, # 0: no obfuscation; 1: can't guess URL; 2: can't guess content ): super().__init__( function=function, local_key=local_key, key_within_module=key_within_module, key_within_experiment=key_within_experiment, arguments=arguments, is_folder=is_folder, description=description, data_type=data_type, extension=extension, module_id=module_id, parent=parent, personal=personal, obfuscate=obfuscate, ) self.secret = uuid.uuid4() # Used to protect unauthorized access @classproperty def default_storage(cls): # noqa return NoStorage() def _needs_depositing(self): return False def generate_input_path(self): return None def export(self, path, **kwargs): self.function(path=path, **self.arguments) def export_subfile(self, subfile, path): assert self.is_folder with tempfile.TemporaryDirectory() as tempdir: self.export(tempdir) shutil.copyfile(tempdir + "/" + subfile, path) def export_subfolder(self, subfolder, path): assert self.is_folder with tempfile.TemporaryDirectory() as tempdir: self.export(tempdir) shutil.copytree(tempdir + "/" + subfolder, path) def get_url(self): # We need to flush to make sure that self.id is populated db.session.flush() return f"/on-demand-asset?id={self.id}&secret={self.secret}" def generate_host_path(self): return None
[docs] class FastFunctionAsset(OnDemandAsset): """ .. deprecated:: 11.7.0 Use ``OnDemandAsset`` instead. """ def __init__( self, *, function, local_key=None, key_within_module: Optional[str] = None, key_within_experiment=None, arguments: Optional[dict] = None, is_folder: bool = False, description=None, data_type=None, extension=None, module_id: Optional[str] = None, parent=None, personal=False, obfuscate=1, # 0: no obfuscation; 1: can't guess URL; 2: can't guess content ): warnings.warn( f"{self.__class__.__name__} is deprecated and will be removed in future versions. " f"Please use OnDemandAsset instead.", DeprecationWarning, ) super().__init__( function=function, local_key=local_key, key_within_module=key_within_module, key_within_experiment=key_within_experiment, arguments=arguments, is_folder=is_folder, description=description, data_type=data_type, extension=extension, module_id=module_id, parent=parent, personal=personal, obfuscate=obfuscate, )
[docs] class CachedFunctionAsset(FunctionAssetMixin, CachedAsset): """ A Cached Function Asset is a type of asset whose files are created by running a function, and whose outputs are stored in a general repository that is shared between multiple experiment deployments, to avoid redundant computation or file uploads. Parameters ---------- function : callable A function responsible for generating the asset. The function should receive an argument called ``path`` and create a file or a folder at that path. It can also receive additional arguments specified via the ``arguments`` parameter. local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` should uniquely identify that asset (i.e. no other asset should share that combination of properties). arguments : dict An optional dictionary of arguments that should be passed to the function. is_folder : bool Whether the asset is a folder. description : str An optional longer string that provides further documentation about the asset. data_type : str Experimental: the nature of the asset's data. Could be used to determine visualization methods etc. extension : str The file extension, if applicable. key_within_module : str An optional key that uniquely identifies the asset within a given module. If left unspecified, this will be automatically generated with reference to the ``parent`` and the ``local_key`` arguments. key_within_experiment : str A string that uniquely identifies the asset within a given experiment. If left unspecified, this will be automatically generated with reference to the ``key_within_module`` and the ``module_id`` arguments. module_id : str Identifies the module with which the asset should be associated. If left blank, PsyNet will attempt to infer the ``module_id`` from the ``parent`` parameter, if provided. parent : object The object that 'owns' the asset, if applicable, for example a Participant or a Node. personal : bool Whether the asset is 'personal' and hence omitted from anonymous database exports. obfuscate : int Determines the extent to which the asset's generated URL should be obfuscated. By default, ``obfuscate=1``, which means that the URL contains a human-readable component containing useful metadata (e.g the participant ID), but also contains a randomly generated string so that malicious agents cannot retrieve arbitrary assets by guessing URLs. If ``obfuscate=0``, then the randomly generated string is not added. If ``obfuscate=2``, then the human-readable component is omitted, and only the random portion is kept. This might be useful in cases where you're worried about participants cheating on the experiment by looking at file URLs. Attributes ---------- computation_time_sec : float The time taken to generate the asset. md5_contents : str Contains an automatically generated MD5 hash of the object's contents, where 'contents' is liberally defined; it could mean hashing the file itself, or hashing the arguments of the function used to generate that file. size_mb : float The size of the asset's file(s) (in MB). deposit_time_sec : float The time it took to deposit the asset. needs_storage_backend : bool Whether the asset type needs a storage backend, e.g. a file server, or whether it can do without (e.g. in the case of an externally hosted resource accessible by a URL). psynet_version : str The version of PsyNet used to create the asset. deployment_id : str A string used to identify the particular experiment deployment. deposited: bool Whether the asset has been deposited yet. inherited : bool Whether the asset was inherited from a previous experiment, typically via the ``InheritedAssets`` functionality. inherited_from : str Identifies the source of an inherited asset. export_path : str A relative path that will be used by default when the asset is exported. participant_id : int ID of the participant who 'owns' the asset, if applicable. content_id : str A token used for checking whether the contents of two assets are equivalent. This takes various forms depending on the asset type. For a file, the ``content_id`` would typically be a hash; for an externally hosted asset, it would be the URL, etc. host_path : str The filepath used to host the asset within the storage repository, if applicable. url : str The URL that can be used to access the asset from the perspective of the experiment front-end. storage : AssetStorage The storage backend used for the asset. async_processes : list Lists all async processes that have been created for the asset, including completed ones. participant : If the parent is a ``Participant``, returns that participant. participants : list Lists all participants associated with the asset. trial : If the parent is a ``trial``, returns that trial. trials : list Lists all trials associated with the asset. node : If the parent is a ``Node``, returns that participant. nodes : list Lists all nodes associated with the asset. network : If the parent is a ``Network``, returns that participant. networks : list Lists all networks associated with the asset. errors : list Lists the errors associated with the asset. Linking assets to other database objects ---------------------------------------- PsyNet assets may be linked to other database objects. There are two kinds of links that may be used. First, an asset may possess a *parent*. This parental relationship is strict in the sense that an asset may not possess more than one parent. However, in addition to the parental relationship, it is possible to link the asset to an arbitrary number of additional database objects. These latter links have a key-value construction, meaning that one can access a given asset by reference to a given key, for example: ``node.assets["response"]``. Importantly, the same asset can have different keys for different objects; for example, it might be the ``response`` for one node, but the ``stimulus`` for another node. These latter relationships are instantiated with logic like the following: :: participant.assets["stimulus"] = my_asset db.session.commit() """ @property def cache_key(self): return self.get_md5_instructions()
[docs] class ExternalAsset(Asset): """ An External Asset is an asset that is not managed by PsyNet. This would typically mean some kind of file that is hosted on a remote web server and is accessible by a URL. Parameters ---------- url : str The URL at which the external asset may be accessed. local_key : str A string identifier for the asset, for example ``"stimulus"``. If provided, this string identifier should together with ``parent`` and ``module_id`` uniquely identify that asset (i.e. no other asset should share that combination of properties). description : str An optional longer string that provides further documentation about the asset. is_folder : bool Whether the asset is a folder. data_type : str Experimental: the nature of the asset's data. Could be used to determine visualization methods etc. extension : str The file extension, if applicable. parent : object The object that 'owns' the asset, if applicable, for example a Participant or a Node. key_within_module : str A string that uniquely identifies the asset within a given module. If left unspecified, this will be automatically generated with reference to the ``parent`` and the ``local_key`` arguments. key_within_experiment : str A string that uniquely identifies the asset within a given experiment. If left unspecified, this will be automatically generated with reference to the ``key_within_module`` and the ``module_id`` arguments. module_id : str The module within which the asset is located. personal : bool Whether the asset is 'personal' and hence omitted from anonymous database exports. Attributes ---------- psynet_version : str The version of PsyNet used to create the asset. deployment_id : str A string used to identify the particular experiment deployment. deposited: bool Whether the asset has been deposited yet. inherited : bool Whether the asset was inherited from a previous experiment, typically via the ``InheritedAssets`` functionality. inherited_from : str Identifies the source of an inherited asset. export_path : str A relative path that will be used by default when the asset is exported. participant_id : int ID of the participant who 'owns' the asset, if applicable. content_id : str A token used for checking whether the contents of two assets are equivalent. This takes various forms depending on the asset type. For a file, the ``content_id`` would typically be a hash; for an externally hosted asset, it would be the URL, etc. host_path : str The filepath used to host the asset within the storage repository, if applicable. url : str The URL that can be used to access the asset from the perspective of the experiment front-end. storage : AssetStorage The storage backend used for the asset. async_processes : list Lists all async processes that have been created for the asset, including completed ones. participant : If the parent is a ``Participant``, returns that participant. participants : list Lists all participants associated with the asset. trial : If the parent is a ``trial``, returns that trial. trials : list Lists all trials associated with the asset. node : If the parent is a ``Node``, returns that participant. nodes : list Lists all nodes associated with the asset. network : If the parent is a ``Network``, returns that participant. networks : list Lists all networks associated with the asset. errors : list Lists the errors associated with the asset. Linking assets to other database objects ---------------------------------------- PsyNet assets may be linked to other database objects. There are two kinds of links that may be used. First, an asset may possess a *parent*. This parental relationship is strict in the sense that an asset may not possess more than one parent. However, in addition to the parental relationship, it is possible to link the asset to an arbitrary number of additional database objects. These latter links have a key-value construction, meaning that one can access a given asset by reference to a given key, for example: ``node.assets["response"]``. Importantly, the same asset can have different keys for different objects; for example, it might be the ``response`` for one node, but the ``stimulus`` for another node. These latter relationships are instantiated with logic like the following: :: participant.assets["stimulus"] = my_asset db.session.commit() """ def __init__( self, url, *, local_key=None, key_within_module=None, key_within_experiment=None, description=None, is_folder=False, data_type=None, extension=None, parent=None, module_id=None, personal=False, ): self.host_path = url self.url = url self.deposited = True super().__init__( local_key=local_key, key_within_module=key_within_module, key_within_experiment=key_within_experiment, is_folder=is_folder, description=description, data_type=data_type, extension=extension, module_id=module_id, parent=parent, personal=personal, ) def get_extension(self): return get_extension(self.url) def _deposit(self, storage: "AssetStorage", async_: bool, delete_input: bool): pass @property def identifiers(self): return { **super().identifiers, "url": self.url, } def get_content_id(self): return self.url @classproperty def default_storage(cls): # noqa return WebStorage()
[docs] def delete_input(self): raise NotImplementedError
def generate_host_path(self): return None def get_url(self): return self.url
[docs] class ExternalS3Asset(ExternalAsset): """ Represents an external asset that is stored in an Amazon Web Services S3 bucket. """ s3_bucket = Column(String) s3_key = Column(String) def __init__( self, *, s3_bucket: str, s3_key: str, local_key=None, key_within_module=None, key_within_experiment=None, is_folder=False, description=None, data_type=None, module_id=None, parent=None, personal=False, ): self.s3_bucket = s3_bucket self.s3_key = s3_key url = self.generate_url() super().__init__( url=url, is_folder=is_folder, description=description, data_type=data_type, local_key=local_key, module_id=module_id, key_within_module=key_within_module, key_within_experiment=key_within_experiment, parent=parent, personal=personal, ) def generate_url(self): return f"https://s3.amazonaws.com/{self.s3_bucket}/{self.s3_key}" @property def identifiers(self): return { **super().identifiers, "s3_bucket": self.s3_bucket, "s3_key": self.s3_key, } @cached_property def default_storage(self): # noqa return S3Storage(self.s3_bucket, root="")
[docs] def delete_input(self): raise NotImplementedError
[docs] class AssetStorage: """ Defines a storage back-end for storing assets. """ heroku_compatible = True @property def experiment(self): from .experiment import get_experiment return get_experiment() @property def deployment_id(self): return self.experiment.deployment_id def on_every_launch(self): pass def update_asset_metadata(self, asset: Asset): pass def receive_deposit(self, asset, host_path: str, async_: bool, delete_input: bool): if async_: f = self._async__call_receive_deposit else: f = self._call_receive_deposit f(asset, host_path, delete_input) def _receive_deposit(self, asset: Asset, host_path: str): self.raise_not_implemented_error() def _call_receive_deposit( self, asset: Asset, host_path: str, delete_input: bool, # , db_commit: bool = False ): # logger.info("Calling _call_receive_deposit...") # We include this for compatibility with threaded dispatching. # Without it, SQLAlchemy complains that the object has become disconnected # from the SQLAlchemy session. This command 'merges' it back into the session. asset = db.session.merge(asset) self._receive_deposit(asset, host_path) asset.deposited = True asset.after_deposit() if delete_input: asset.delete_input() def _async__call_receive_deposit( self, asset: Asset, host_path: str, delete_input: bool ): LocalAsyncProcess( self._call_receive_deposit, arguments=dict( asset=asset, host_path=host_path, delete_input=delete_input, # db_commit=True, ), asset=asset, ) def export(self, asset, path, **kwargs): self.raise_not_implemented_error() def prepare_for_deployment(self): pass def get_url(self, host_path: str): self.raise_not_implemented_error() @staticmethod def raise_not_implemented_error(): raise NotImplementedError( "If your experiment uses assets you must specify a storage back-end in your experiment class, " "typically by writing something like\n\n" " asset_storage = LocalStorage()\n\n" "in your experiment class. You will probably need to add the following to your imports too:\n\n" " from psynet.asset import LocalStorage" )
[docs] def check_cache(self, host_path: str, is_folder: bool): """ Checks whether the registry can find an asset cached at ``host_path``. The implementation is permitted to make optimizations for speed that may result in missed caches, i.e. returning ``False`` when the cache did actually exists. However, the implementation should only return ``True`` if it is certain that the asset cache exists. Returns ------- ``True`` or ``False``. """ raise NotImplementedError
@classmethod def http_export(cls, asset, path): url = cls._prepare_url_for_http_export(asset.url) if asset.is_folder: cls._http_folder_export(url, path) else: cls._http_file_export(url, path) @staticmethod def _prepare_url_for_http_export(url): if not url.startswith("http"): host = get_from_config("host") if host == "0.0.0.0": prefix = "http://localhost:5000" else: prefix = host url = prefix + url return url def export_subfile(self, asset, subfile, path): url = asset.url + "/" + subfile url = self._prepare_url_for_http_export(url) self._http_file_export(url, path) def export_subfolder(self, asset, subfolder, path): raise RuntimeError( "export_subfolder is not supported for assets being exported over HTTP." "This is because the internet provides " "no standard way to list the contents of a folder hosted " "on an arbitrary web server. You can avoid this issue in future" "by listing each asset as a separate file." ) @staticmethod def _http_folder_export(url, path): with open(path, "w") as f: f.write( "It is not possible to automatically export assets over HTTP " "with type='folder'. This is because the internet provides " "no standard way to list the contents of a folder hosted " "on an arbitrary web server. You can avoid this issue in the " "future by listing each asset as a separate file." ) @staticmethod def _http_file_export(url, path): try: r = requests.get(url) if r.status_code != 200: raise ConnectionError( f"Failed to download from the following URL: {url} " f"(status code = {r.status_code})" ) with open(path, "wb") as file: file.write(r.content) except Exception: print( f"An error occurred when trying to download from the following URL: {url}" ) raise
[docs] class WebStorage(AssetStorage): """ The notional storage back-end for external web-hosted assets. """ def export(self, asset, path, **kwargs): self.http_export(asset, path)
[docs] class NoStorage(AssetStorage): """ A 'null' storage back-end for assets that don't require any storage. """ def _receive_deposit(self, asset, host_path: str): raise RuntimeError("Asset depositing is not supported by 'NoStorage' objects.") def update_asset_metadata(self, asset: Asset): pass
[docs] class LocalStorage(AssetStorage): """ Stores assets in a local folder on the same computer that is running your Python code. This approach is suitable when you are running experiments on a single local machine (e.g. when doing fieldwork or laboratory-based data collection), and when you are deploying your experiments to your own remote web server via Docker. It is *not* appropriate if you deploy your experiments via Heroku, because Heroku deployments split the processing over multiple web servers, and these different web servers do not share the same file system. """ label = "assets" heroku_compatible = False def __init__(self, root=None): """ Parameters ---------- root : Optional path to the directory to be used for storage. Tilde expansion (e.g. '~/psynet') is performed automatically. label : Label for the storage object. """ super().__init__() self._initialized = False self._root = root def setup_files(self): if self.on_deployed_server() or deployment_info.read("is_local_deployment"): self._ensure_root_dir_exists() self._create_symlink() def prepare_for_deployment(self): self.setup_files() def on_every_launch(self): self.setup_files() @cached_property def root(self): """ We defer the registration of the root until as late as possible to avoid circular imports when loading the experiment. """ if self._root: return self._root else: # return "$HOME/psynet-data/assets" if deployment_info.read("is_ssh_deployment"): return "/psynet-data/assets" else: return os.path.expanduser("~/psynet-data/assets") def _ensure_root_dir_exists(self): from pathlib import Path Path(self.root).mkdir(parents=True, exist_ok=True) @property def local_path(self): return os.path.join("static", self.label) @property def public_path(self): """ This is the publicly exposed path by which the web browser can access the storage registry. This corresponds to a (symlinked) directory inside the experiment directory. """ return "/" + self.local_path def _create_symlink(self): try: os.unlink(self.local_path) except (FileNotFoundError, IsADirectoryError, PermissionError): # Path(self.local_path).rmdir() try: shutil.rmtree(self.local_path) except (FileNotFoundError, NotADirectoryError, PermissionError, OSError): pass os.makedirs("static", exist_ok=True) try: os.symlink(self.root, self.local_path) except FileExistsError: pass def update_asset_metadata(self, asset: Asset): host_path = asset.host_path file_system_path = self.get_file_system_path(host_path) asset.var.file_system_path = file_system_path @staticmethod @cache def sftp_connection(ssh_host, ssh_user): from dallinger.command_line.docker_ssh import get_sftp return get_sftp(ssh_host, user=ssh_user) @staticmethod @cache def ssh_executor(ssh_host, ssh_user): from dallinger.command_line.docker_ssh import Executor return Executor(ssh_host, user=ssh_user) def _receive_deposit(self, asset: Asset, host_path: str): file_system_path = self.get_file_system_path(host_path) if self.on_deployed_server() or deployment_info.read("is_local_deployment"): # We are depositing an asset that sits on the present server already, # so we can just copy it. os.makedirs(os.path.dirname(file_system_path), exist_ok=True) if asset.is_folder: shutil.copytree( asset.input_path, os.path.expanduser(file_system_path), dirs_exist_ok=True, ) else: shutil.copyfile(asset.input_path, os.path.expanduser(file_system_path)) else: if deployment_info.read("is_ssh_deployment"): ssh_host = deployment_info.read("ssh_host") ssh_user = deployment_info.read("ssh_user") docker_host_path = ( self.ssh_host_home_dir(ssh_host, ssh_user) + file_system_path ) if asset.is_folder: self._put_folder( asset.input_path, docker_host_path, ssh_host, ssh_user, ) else: self._put_file( asset.input_path, docker_host_path, ssh_host, ssh_user, ) else: raise NotImplementedError asset.deposited = True # return dict( # url=os.path.abspath(file_system_path), # ) def _put_file(self, input_path, dest_path, ssh_host, ssh_user): from io import BytesIO sftp = self.sftp_connection(ssh_host, ssh_user) self._mk_dir_tree(os.path.dirname(dest_path), ssh_host, ssh_user) with open(input_path, "rb") as file: sftp.putfo(BytesIO(file.read()), dest_path) def _put_folder(self, input_path, dest_path, ssh_host, ssh_user): from io import BytesIO sftp = self.sftp_connection(ssh_host, ssh_user) self._mk_dir_tree(dest_path, ssh_host, ssh_user) # Traverse the local directory for dirpath, dirnames, filenames in os.walk(input_path): # For each directory in the local structure, create it remotely for dirname in dirnames: local_path = os.path.join(dirpath, dirname) relative_path = os.path.relpath(local_path, input_path) remote_path = os.path.join(dest_path, relative_path) self._mk_dir_tree(remote_path, ssh_host, ssh_user) # For each file, copy it to the remote directory for filename in filenames: local_path = os.path.join(dirpath, filename) relative_path = os.path.relpath(local_path, input_path) remote_path = os.path.join(dest_path, relative_path) with open(local_path, "rb") as file: sftp.putfo(BytesIO(file.read()), remote_path) def _mk_dir_tree(self, dir, ssh_host, ssh_user): executor = self.ssh_executor(ssh_host, ssh_user) executor.run(f'mkdir -p "{dir}"') def on_deployed_server(self): from psynet.experiment import in_deployment_package return in_deployment_package() def export(self, asset, path, ssh_host=None, ssh_user=None): if self.on_deployed_server(): self._export_via_copying(asset, path) elif ssh_host is not None: self._export_via_ssh(asset, path, ssh_host, ssh_user) else: AssetStorage.http_export(asset, path) def _export_via_ssh(self, asset, local_path, ssh_host=None, ssh_user=None): if ssh_host is None or ssh_user is None: raise ValueError( "To export via SSH you need to provide an ssh_host and ssh_user. If you are seeing this error " "it means that probably these values haven't been propagated properly through their caller functions." ) docker_host_path = ( self.ssh_host_home_dir(ssh_host, ssh_user) + asset.var.file_system_path ) sftp = self.sftp_connection(ssh_host, ssh_user) paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow( 2, 22 ) # 4 MB per chunk, prevents SFTPError('Garbage packet received') sftp.get(docker_host_path, local_path) def _export_via_copying(self, asset: Asset, path): from_ = self.get_file_system_path(asset.host_path) to_ = path if asset.is_folder: shutil.copytree(from_, to_, dirs_exist_ok=True) else: shutil.copyfile(from_, to_) def export_subfile(self, asset, subfile, path): if self.on_deployed_server() or deployment_info.read("is_local_deployment"): from_ = self.get_file_system_path(asset.host_path) + "/" + subfile to_ = path shutil.copyfile(from_, to_) else: super().export_subfile(asset, subfile, path) def export_subfolder(self, asset, subfolder, path): if self.on_deployed_server() or deployment_info.read("is_local_deployment"): from_ = self.get_file_system_path(asset.host_path) + "/" + subfolder to_ = path shutil.copytree(from_, to_, dirs_exist_ok=True) else: super().export_subfolder(asset, subfolder, path) def get_file_system_path(self, host_path): if host_path: return os.path.join(self.root, host_path) else: return None def get_url(self, host_path): assert ( self.root ) # Makes sure that the root storage location has been instantiated return urllib.parse.quote(os.path.join(self.public_path, host_path))
[docs] def check_cache(self, host_path: str, is_folder: bool): if self.on_deployed_server() or deployment_info.read("is_local_deployment"): return self.check_local_cache(host_path, is_folder) elif deployment_info.read("is_ssh_deployment"): return self.check_ssh_cache( host_path, is_folder, ssh_host=deployment_info.read("ssh_host"), ssh_user=deployment_info.read("ssh_user"), ) else: raise RuntimeError( f"Not sure how to check cache given the current run configuration: {deployment_info.read_all()}" )
def check_local_cache(self, host_path: str, is_folder: bool): file_system_path = self.get_file_system_path(host_path) return os.path.exists(file_system_path) and ( (is_folder and os.path.isdir(file_system_path)) or (not is_folder and os.path.isfile(file_system_path)) ) def check_ssh_cache( self, host_path: str, is_folder: bool, ssh_host: str, ssh_user: str ): sftp = self.sftp_connection(ssh_host, ssh_user) # At some point, we need to refactor the logic for get_file_system_path to clarify # whether we are running in Docker or not. # Docker: /psynet-data/assets # SSH: /home/pmch2/psynet-data/assets # local machine: ~/psynet-data/assets # # For now we hard-code... file_system_path = self.ssh_host_home_dir( ssh_host, ssh_user ) + self.get_file_system_path(host_path) try: if is_folder: sftp.listdir(file_system_path) else: sftp.stat(file_system_path) return True except FileNotFoundError: return False @cache def ssh_host_home_dir(self, ssh_host, ssh_user): executor = self.ssh_executor(ssh_host, ssh_user) return executor.run("echo $HOME").strip()
[docs] class DebugStorage(LocalStorage): """ A local storage back-end used for debugging. .. deprecated:: 11.0.0 Use ``LocalStorage`` instead. """ def __init__(self, *args, **kwargs): warnings.warn( "DebugStorage is deprecated, please replace it with LocalStorage.", DeprecationWarning, ) super().__init__(*args, **kwargs)
# def create_bucket_if_necessary(fun): # @wraps(fun) # def wrapper(self, *args, **kwargs): # try: # return fun(self, *args, **kwargs) # except botocore.exceptions.ClientError as ex: # if ex.response["Error"]["Code"] == "NoSuchBucket": # create_bucket(self.s3_bucket) # return fun(self, *args, **kwargs) # else: # raise # # return wrapper @cache def get_boto3_s3_session(): return boto3.Session(**get_aws_credentials()) @cache def get_boto3_s3_client(): return boto3.client("s3", **get_aws_credentials()) @cache def get_boto3_s3_resource(): return get_boto3_s3_session().resource("s3") @cache def get_boto3_s3_bucket(name): return get_boto3_s3_resource().Bucket(name)
[docs] def list_files_in_s3_bucket( bucket_name: str, prefix: str = "", ): """ Lists files in an S3 bucket. Parameters ---------- bucket_name : Bucket to list files within. prefix : Only lists files whose keys begin with this string. Returns ------- A generator that yields keys. """ logger.info( "Listing files in S3 bucket %s with prefix '%s'...", bucket_name, prefix ) paginator = get_boto3_s3_client().get_paginator("list_objects_v2") return list( [ content["Key"] for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix) for content in page.get("Contents", ()) ] )
@cache def list_files_in_s3_bucket__cached(*args, **kwargs): return list_files_in_s3_bucket(*args, **kwargs)
[docs] class AwsCliError(RuntimeError): pass
class S3TransferBackend: def __init__(self, s3_bucket: str): self.s3_bucket = s3_bucket def get_s3_url(self, s3_key: str): return f"s3://{self.s3_bucket}/{s3_key}" def check_recursive(self, recursive, local_path): assert recursive == os.path.isdir(local_path) def upload(self, path, s3_key, recursive): raise NotImplementedError def download(self, s3_key, target_path, recursive): raise NotImplementedError def delete(self, s3_key, recursive): raise NotImplementedError class S3Boto3TransferBackend(S3TransferBackend): def upload(self, path, s3_key, recursive): client = get_boto3_s3_client() self.check_recursive(recursive, path) if os.path.isfile(path): client.upload_file(path, self.s3_bucket, s3_key) else: for _dir_path, _dir_names, _file_names in os.walk(path): _rel_dir_path = os.path.relpath(_dir_path, path) for _file_name in _file_names: _local_path = os.path.join(_dir_path, _file_name) if _rel_dir_path == ".": _file_key = os.path.join(s3_key, _file_name) else: _file_key = os.path.join(s3_key, _rel_dir_path, _file_name) client.upload_file(_local_path, self.s3_bucket, _file_key) def _download(self, client, s3_key, target_path): import botocore try: client.download_file(self.s3_bucket, s3_key, target_path) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": raise FileNotFoundError raise return True def download(self, s3_key, target_path, recursive): client = get_boto3_s3_client() if recursive: bucket = get_boto3_s3_bucket(self.s3_bucket) for obj in bucket.objects.filter(Prefix=s3_key + "/"): server_path = obj.key relative_path = server_path.replace(s3_key + "/", "") _target_path = os.path.join(target_path, relative_path) target_dir = os.path.dirname(_target_path) os.makedirs(target_dir, exist_ok=True) self._download(client, server_path, _target_path) else: return self._download(client, s3_key, target_path) def delete(self, s3_key, recursive): bucket = get_boto3_s3_bucket(self.s3_bucket) if recursive: bucket.objects.filter(Prefix=s3_key + "/").delete() else: bucket.Object(s3_key).delete() class S3AwscliTransferBackend(S3TransferBackend): def __init__(self, s3_bucket): super().__init__(s3_bucket) try: self.run_command(["aws", "--version"], verbose=False) except AwsCliError: raise RuntimeError( "AWS CLI is not installed. Please install it and try again." ) def copy(self, source, target, recursive): cmd = ["aws", "s3", "cp", source, target] if recursive: cmd.append("--recursive") self.run_command(cmd) def upload(self, path, s3_key, recursive): self.check_recursive(recursive, path) url = self.get_s3_url(s3_key) try: self.copy(path, url, recursive) except AwsCliError as err: if "NoSuchBucket" in str(err): S3Storage.create_bucket(self.s3_bucket) self.copy(path, url, recursive) else: raise def download(self, s3_key, target_path, recursive): logger.info(f"Downloading from AWS: {s3_key}") url = self.get_s3_url(s3_key) self.copy(url, target_path, recursive) def delete(self, s3_key, recursive): url = self.get_s3_url(s3_key) cmd = ["aws", "s3", "rm", url] if recursive: cmd.append("--recursive") self.run_command(cmd) def run_command(self, cmd, verbose=True): if verbose: logger.info(f"Running AWS CLI command: {cmd}") try: subprocess.run( cmd, check=True, capture_output=True, env={ **os.environ, **get_aws_credentials(capitalize=True), }, ) except subprocess.CalledProcessError as err: message = err.stderr.decode("utf8") raise AwsCliError(message)
[docs] class S3Storage(AssetStorage): """ A storage back-end that stores assets using Amazon Web Services' S3 Storage system. This service is relatively inexpensive as long as your file collection does not number more than a few gigabytes. To use this service you will need to sign up for an Amazon Web Services account. Parameters ---------- s3_bucket : str The name of the S3 bucket to use. root : str The root directory within the bucket to use. backend : str The backend to use for transferring files to S3. Can be either "boto3" or "awscli". "awscli" relies on aws client being installed. It is faster than "boto3" (especially for uploading folders) but requires more dependencies which are not supported on Heroku. The default is "boto3". """ def __init__(self, s3_bucket, root, backend="boto3"): super().__init__() assert not root.endswith("/") self.s3_bucket = s3_bucket self.root = root if backend == "boto3": self.backend = S3Boto3TransferBackend(s3_bucket) elif backend == "awscli": self.backend = S3AwscliTransferBackend(s3_bucket) else: NotImplementedError(f"Transfer backend {backend} is not supported.") def prepare_for_deployment(self): from .media import make_bucket_public if not self.bucket_exists(self.s3_bucket): self.create_bucket(self.s3_bucket) make_bucket_public(self.s3_bucket) def _receive_deposit(self, asset, host_path): s3_key = self.get_s3_key(host_path) if asset.is_folder: self.upload_folder(asset.input_path, s3_key) else: self.upload_file(asset.input_path, s3_key) def get_url(self, host_path: str): s3_key = self.get_s3_key(host_path) return os.path.join( "https://s3.amazonaws.com", self.s3_bucket, self.escape_s3_key(s3_key) ) @staticmethod def bucket_exists(bucket_name): import botocore resource = get_boto3_s3_resource() try: resource.meta.client.head_bucket(Bucket=bucket_name) except botocore.exceptions.ClientError as e: error_code = int(e.response["Error"]["Code"]) if error_code == 404: return False return True def get_s3_key(self, host_path: str): return os.path.join(self.root, host_path) def escape_s3_key(self, s3_key): # This might need revisiting as and when we find special characters that aren't quoted correctly return urllib.parse.quote_plus(s3_key, safe="/~()*!.'")
[docs] def check_cache(self, host_path: str, is_folder: bool, use_cache=None): """ Checks whether a file or folder is present in the remote bucket. Uses caching where appropriate for efficiency. """ s3_key = os.path.join(self.root, host_path) if use_cache is None: from .experiment import is_experiment_launched use_cache = is_experiment_launched() if is_folder: return self.check_cache_for_folder(s3_key, use_cache) else: return self.check_cache_for_file(s3_key, use_cache)
def check_cache_for_folder(self, s3_key, use_cache): files = self.list_files_with_prefix(s3_key + "/", use_cache) return len(files) > 0 def check_cache_for_file(self, s3_key, use_cache): files = self.list_files_with_prefix(s3_key, use_cache) return s3_key in files def list_files_with_prefix(self, prefix, use_cache): try: if use_cache: # If we are in the 'preparation' phase of deployment, then we rely on a cached listing # of the files in the S3 bucket. This is necessary because the preparation phase # may involve checking caches for thousands of files at a time, and it would be slow # to talk to S3 separately for each one. This wouldn't catch situations where # the cache has been added during the preparation phase itself, but this shouldn't happen very often, # so doesn't need to be optimized for just yet. return [ x for x in list_files_in_s3_bucket__cached( self.s3_bucket, prefix=self.root ) if x.startswith(prefix) ] else: return list_files_in_s3_bucket(self.s3_bucket, prefix) except Exception as err: if "NoSuchBucket" in str(err): return [] raise # @create_bucket_if_necessary # def folder_exists__slow(self, s3_key): # return len(self.list_folder(s3_key)) > 0 # # # @create_bucket_if_necessary # def list_folder(self, folder): # # cmd = f"aws s3 ls {s3_bucket}/{folder}/" # # from subprocess import PIPE # # credentials = psynet.media.get_aws_credentials() # # cmd = "" # # cmd += f"export AWS_ACCESS_KEY_ID={credentials['aws_access_key_id']}; " # # cmd += f"export AWS_SECRET_ACCESS_KEY={credentials['aws_secret_access_key']}; " # # cmd += f"aws s3 ls {s3_bucket} " # # x = subprocess.run(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True, shell=True) # # breakpoint() # return [x.key for x in self.boto3_bucket.objects.filter(Prefix="folder" + "/")] # @cached_property # def regex_pattern(self): # return re.compile("https://s3.amazonaws.com/(.*)/(.*)") def export(self, asset, path, **kwargs): s3_key = self.get_s3_key(asset.host_path) if asset.is_folder: self.download_folder(s3_key, path) else: self.download_file(s3_key, path) def export_subfile(self, asset, subfile, path): assert asset.is_folder s3_key = self.get_s3_key(asset.host_path) + "/" + subfile self.download_file(s3_key, path) def export_subfolder(self, asset, subfolder, path): assert asset.is_folder s3_key = self.get_s3_key(asset.host_path) + "/" + subfolder self.download_folder(s3_key, path) def download_file(self, s3_key, target_path): return self._download(s3_key, target_path, recursive=False) def download_folder(self, s3_key, target_path): return self._download(s3_key, target_path, recursive=True) def _download(self, s3_key, target_path, recursive): return self.backend.download(s3_key, target_path, recursive) def upload_file(self, path, s3_key): return self._upload(path, s3_key, recursive=False) def upload_folder(self, path, s3_key): return self._upload(path, s3_key, recursive=True) def _upload(self, path, s3_key, recursive): return self.backend.upload(path, s3_key, recursive) @staticmethod def create_bucket(s3_bucket): client = get_boto3_s3_client() client.create_bucket(Bucket=s3_bucket) def delete_file(self, s3_key): self.backend.delete(s3_key, recursive=False) def delete_folder(self, s3_key): self.backend.delete(s3_key, recursive=True) def delete_all(self): self.delete_folder(self.root)
class AssetRegistry: initial_asset_manifesto_path = "pre_deployed_assets.csv" def __init__(self, storage: AssetStorage, n_parallel=None): self.storage = storage self.n_parallel = n_parallel self._staged_asset_specifications = [] self._staged_asset_lookup_table = {} # inspector = sqlalchemy.inspect(db.engine) # if inspector.has_table("asset") and Asset.query.count() == 0: # self.populate_db_with_initial_assets() # def __getitem__(self, item): # from psynet.asset import Asset # # return Asset.query.filter_by(key=item).one() @property def deployment_id(self): return self.storage.deployment_id @property def experiment(self): from .experiment import get_experiment return get_experiment() def stage(self, *args): for asset in [*args]: assert isinstance(asset, AssetSpecification) self._staged_asset_specifications.append(asset) # self._staged_asset_lookup_table[asset.key] = asset def update_asset_metadata(self, asset: Asset): pass def receive_deposit( self, asset: Asset, host_path: str, async_: bool, delete_input: bool ): return self.storage.receive_deposit(asset, host_path, async_, delete_input) # def get(self, key): # return get_asset(key) def prepare_for_deployment(self): self.prepare_assets_for_deployment() self.storage.prepare_for_deployment() def prepare_assets_for_deployment(self): # if self.n_parallel: # n_jobs = self.n_parallel # elif len(self._staged_asset_specifications) < 25: # n_jobs = 1 # else: # n_jobs = psutil.cpu_count() # OLD NOTES, may not be relevant any more # # The parallel implementation is not reliable yet; # the language_tests demo fails due to a deadlock between # competing transactions. As a patch for now we disable # parallel processing. # # If you wish to revist this, you may find the following Postgres # code useful: it displays all blocking processes along with the # responsible queries. # # SELECT # activity.pid, # activity.usename, # activity.query, # blocking.pid AS blocking_id, # blocking.query AS blocking_query # FROM pg_stat_activity AS activity # JOIN pg_stat_activity AS blocking ON blocking.pid = ANY(pg_blocking_pids(activity.pid)); # SSH currently fails if we try to open more than one connection at the same time, # so for now we hard-code the number of jobs to zero. It would be good to revisit this. # Uploading all the files over one SSH connection shouldn't be slower than uploading them # over multiple connections. The main limitation with the current situation though # is that we can no longer programmatically generate stimuli in parallel. for a in tqdm( self._staged_asset_specifications, desc="Generating/uploading assets..." ): a.prepare_for_deployment(registry=self) # logger.info("Preparing assets for deployment...") # n_jobs = 1 # Parallel( # n_jobs=n_jobs, # verbose=10, # backend="threading", # )( # delayed( # lambda a: threadsafe__prepare_asset_for_deployment( # asset=a, registry=self # ) # )(a) # for a in self._staged_asset_specifications # ) db.session.commit() # def save_initial_asset_manifesto(self): # copy_db_table_to_csv("asset", self.initial_asset_manifesto_path) # def populate_db_with_initial_assets(self): # with open(self.initial_asset_manifesto_path, "r") as file: # ingest_to_model(file, Asset) def threadsafe__prepare_asset_for_deployment(asset, registry): asset_2 = db.session.merge(asset) asset_2.prepare_for_deployment(registry=registry)