import re
from datetime import datetime
from jsonpickle.unpickler import loadclass
from jsonpickle.util import importable_name
from sqlalchemy import Boolean, Column, Float, Integer, String, types
from sqlalchemy.ext.mutable import MutableDict, MutableList
from sqlalchemy.types import TypeDecorator
from .utils import get_logger
logger = get_logger()
marker = object()
[docs]
class PythonObject(TypeDecorator):
@property
def python_type(self):
return object
impl = types.String
def sanitize(self, value):
return value
[docs]
def process_bind_param(self, value, dialect):
if value is None:
return value
try:
return self.serialize(value)
except Exception:
logger.error(
f"An error occurred when trying to serialize the following Python object to the database: {value}"
)
raise
[docs]
def process_literal_param(self, value, dialect):
return value
[docs]
def process_result_value(self, value, dialect):
if value is None:
return None
try:
return self.unserialize(value)
except Exception:
logger.error(
f"An error occurred when trying to unserialize the following Python object from the database: {value}"
)
raise
@classmethod
def serialize(cls, value):
from .serialize import serialize
return serialize(value)
@classmethod
def unserialize(cls, value):
from .serialize import unserialize
return unserialize(value)
class _PythonList(PythonObject):
def serialize(self, value):
return super().serialize(list(value))
PythonList = MutableList.as_mutable(_PythonList)
[docs]
class PythonClass(PythonObject):
@property
def python_type(self):
return type
@classmethod
def serialize(cls, value):
return importable_name(value)
@classmethod
def unserialize(cls, value):
return loadclass(value)
def register_extra_var(extra_vars, name, overwrite=False, **kwargs):
if (not overwrite) and (name in extra_vars):
raise ValueError(f"tried to overwrite the variable {name}")
extra_vars[name] = {**kwargs}
# Don't apply this decorator to time consuming operations, especially database queries!
def extra_var(extra_vars):
def real_decorator(function):
register_extra_var(extra_vars, function.__name__, overwrite=True)
return function
return real_decorator
def claim_field(name: str, extra_vars: dict, field_type=object):
# Todo - add new argument corresponding to the default value of the field
register_extra_var(extra_vars, name, field_type=field_type)
if field_type is int:
col = Column(Integer, nullable=True)
elif field_type is float:
col = Column(Float, nullable=True)
elif field_type is bool:
col = Column(Boolean, nullable=True)
elif field_type is str:
col = Column(String, nullable=True)
elif field_type is list:
col = Column(PythonList, nullable=True)
elif field_type is dict:
col = Column(PythonDict, nullable=True)
elif field_type is object:
col = Column(PythonObject, nullable=True)
else:
raise NotImplementedError
return col
def claim_var(
name,
extra_vars: dict,
use_default=False,
default=lambda: None,
serialise=lambda x: x,
unserialise=lambda x: x,
):
@property
def function(self):
try:
return unserialise(getattr(self.var, name))
except KeyError:
if use_default:
return default()
raise
@function.setter
def function(self, value):
setattr(self.var, name, serialise(value))
register_extra_var(extra_vars, name)
return function
def check_type(x, allowed):
match = False
for t in allowed:
if isinstance(x, t):
match = True
if not match:
raise TypeError(f"{x} did not have a type in the approved list ({allowed}).")
class BaseVarStore:
def __getattr__(self, name):
raise NotImplementedError
def __setattr__(self, key, value):
raise NotImplementedError
def get(self, name: str, default=marker):
"""
Gets a variable with a specified name.
Parameters
----------
name
Name of variable to retrieve.
default
Optional default value to return when the variable is uninitialized.
Returns
-------
object
Retrieved variable.
Raises
------
KeyError
Thrown if the variable doesn't exist and no default value is provided.
"""
try:
return self.__getattr__(name)
except KeyError:
if default == marker:
raise
else:
return default
def set(self, name, value):
"""
Sets a variable. Calls can be chained, e.g.
``participant.var.set("a", 1).set("b", 2)``.
Parameters
----------
name
Name of variable to set.
value
Value to assign to the variable.
Returns
-------
VarStore
The original ``VarStore`` object (useful for chaining).
"""
self.__setattr__(name, value)
return self
def has(self, name):
"""
Tests for the existence of a variable.
Parameters
----------
name
Name of variable to look for.
Returns
-------
bool
``True`` if the variable exists, ``False`` otherwise.
"""
try:
self.get(name)
return True
except KeyError:
return False
def inc(self, name, value=1):
"""
Increments a variable. Calls can be chained, e.g.
``participant.var.inc("a").inc("b")``.
Parameters
----------
name
Name of variable to increment.
value
Value by which to increment the varibable (default = 1).
Returns
-------
VarStore
The original ``VarStore`` object (useful for chaining).
Raises
------
KeyError
Thrown if the variable doesn't exist.
"""
original = self.get(name)
new = original + value
self.set(name, new)
return self
def new(self, name, value):
"""
Like :meth:`~psynet.field.VarStore.set`, except throws
an error if the variable exists already.
Parameters
----------
name
Name of variable to set.
value
Value to assign to the variable.
Returns
-------
VarStore
The original ``VarStore`` object (useful for chaining).
Raises
------
KeyError
Thrown if the variable doesn't exist.
"""
if self.has(name):
raise ValueError(f"There is already a variable called {name}.")
self.set(name, value)
[docs]
class ImmutableVarStore(BaseVarStore, dict):
def __init__(self, data):
dict.__init__(self, **data)
def __getattr__(self, name):
return self[name]
def __setattr__(self, key, value):
raise RuntimeError(
"The variable store is locked and cannot currently be edited."
)
[docs]
class VarStore(BaseVarStore):
"""
A repository for arbitrary variables which will be serialized to JSON for storage into the
database, specifically in the ``details`` field. Variables can be set with the following syntax:
``participant.var.my_var_name = "value_to_set"``.
The variable can then be accessed with ``participant.var.my_var_name``.
See the methods below for an alternative API.
**TIP 1:** the standard setter function is unavailable in lambda functions,
which are otherwise convenient to use when defining e.g.
:class:`~psynet.timeline.CodeBlock` objects.
Use :meth:`psynet.field.VarStore.set` instead, for example:
::
from psynet.timeline import CodeBlock
CodeBlock(lambda participant: participant.var.set("my_var", 3))
**TIP 2:** by convention, the ``VarStore`` object is placed in an object's ``var`` slot.
You can add a ``VarStore`` object to a custom object (e.g. a Dallinger ``Node``) as follows:
::
from dallinger.models import Node
from psynet.field import VarStore
class CustomNode(Node):
__mapper_args__ = {"polymorphic_identity": "custom_node"}
@property
def var(self):
return VarStore(self)
**TIP 3:** avoid storing large objects here on account of the performance cost
of converting to and from JSON.
"""
def __init__(self, owner):
self._owner = owner
def __repr__(self):
# data = {
# key: self.decode_string(value) for key, value in self.get_vars().items()
# }
return f"VarStore: {self.__dict__['_owner'].vars}"
def __getattr__(self, name):
owner = self.__dict__["_owner"]
if name == "_owner":
return owner
else:
data = self.__dict__["_owner"].vars
if data is None:
raise KeyError("The VarStore has not been initialized yet")
else:
return data[name]
def items(self):
return self.__dict__["_owner"].vars.items()
def __setattr__(self, name, value):
if name == "_owner":
self.__dict__["_owner"] = value
else:
if self.__dict__["_owner"].vars is None:
self.__dict__["_owner"].vars = {}
self.__dict__["_owner"].vars[name] = value
# self[name] = value
# self.set_var(name, value)
# class DotDict(dict, BaseVarStore):
# def __setattr__(self, key, value):
# self[key] = value
#
# def __getattr__(self, key):
# return self[key]
#
#
# class _PythonDotDict(_PythonDict):
# def unserialize(cls, value):
# return DotDict(super().unserialize(value))
#
#
# PythonDotDict = MutableDict.as_mutable(_PythonDotDict)
def json_clean(x, details=False, contents=False):
for i in range(5):
try:
del x[f"property{i + 1}"]
except KeyError:
pass
if details:
del x["details"]
if contents:
del x["contents"]
if "metadata_" in x and "metadata" in x:
del x["metadata_"]
def json_unpack_field(x: dict, field: str, replace: bool = False):
if field in x and isinstance(x[field], dict):
for key, value in x[field].items():
if replace or (key not in x):
x[key] = value
return x
def json_add_extra_vars(x, obj):
def valid_key(key):
return not re.search("^_", key)
for key in obj.__extra_vars__.keys():
if valid_key(key):
try:
val = getattr(obj, key)
except KeyError:
val = None
x[key] = val
if hasattr(obj, "var") and isinstance(obj.var, VarStore):
for key, value in obj.var.items():
if valid_key(key):
x[key] = value
return x
def is_basic_type(value):
return value is None or isinstance(value, (int, float, str, bool))
def json_format_vars(x):
for key, value in x.items():
# TODO - revisit this? Will need some concurrent edits in Dallinger,
# e.g. the logic for sending __json__() outputs to the dashboard.
if isinstance(value, datetime):
value = value.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(value, MutableList):
value = list(value)
elif isinstance(value, MutableDict):
value = dict(value)
# if not is_basic_type(value):
# value = serialize(value)
x[key] = value
# class MutableDotDict(MutableDict, dict, BaseVarStore):
# def __setattr__(self, key, value):
# if self.is_internal(key):
# super().__setattr__(key, value)
# else:
# self[key] = value
#
# def __getattr__(self, item):
# if self.is_internal(item):
# return super().__getattr__(item)
# return self[item]
#
# def is_internal(self, key):
# return key.startswith("_")
class _PythonDict(PythonObject):
def serialize(cls, value):
return super().serialize(dict(value))
PythonDict = MutableDict.as_mutable(_PythonDict)