Source code for dataclay.alien

import asyncio
import functools
import logging
from typing import ClassVar, Generic, Type, TypeVar

from .config import get_runtime
from .dataclay_object import DataClayObject
from .event_loop import get_dc_event_loop

ignore_fields = frozenset(
    [
        "__class__",
        "__getattr__",
        "__getattribute__",
        "__setattr__",
        "__delattr__",
        "__dir__",
        "__repr__",
        "__slots__",
        "__weakref__",
        "__init__",
        "__new__",
        "__getstate__",
        "__setstate__",
        "__reduce__",
        "__reduce_ex__",
        "__dict__",
        "__module__",
    ]
)

local_fields = frozenset(
    ["_dc_meta", "_dc_is_local", "_dc_is_loaded", "_dc_is_registered", "_dc_is_replica", "__dict__"]
)


logger = logging.getLogger(__name__)


def virtualactivemethod(func):
    """Internal decorator for AlienDataClayObject methods.

    This is based on activemethod but with slight changes to accomodate its
    usage.
    """

    @functools.wraps(func)
    def wrapper(self: AlienDataClayObject, *args, **kwargs):
        try:
            # Example to make __init__ active:
            # if func.__name__ == "__init__" and not self._dc_is_registered:
            #     self.make_persistent()

            dc_meta = object.__getattribute__(self, "_dc_meta")

            if object.__getattribute__(self, "_dc_is_local"):
                logger.debug(
                    "(%s) Calling virtualactivemethod '%s' locally", dc_meta.id, func.__name__
                )

                return func(self._dc_base_object, *args, **kwargs)
            else:
                logger.debug(
                    "(%s) Calling virtualactivemethod '%s' remotely", dc_meta.id, func.__name__
                )

                # TODO what if it is a coroutinefunction?
                return asyncio.run_coroutine_threadsafe(
                    get_runtime().call_remote_method(self, func.__name__, args, kwargs),
                    get_dc_event_loop(),
                ).result()
        except Exception:
            logger.debug("Error calling virtualactivemethod '%s'", func.__name__, exc_info=True)
            raise

    # wrapper.is_activemethod = True
    return wrapper


T = TypeVar("T")


[docs] class AlienDataClayObject(DataClayObject, Generic[T]): """Base class for having Alien Persistent Objects. This class is used to create a proxy-like Python object and leverage dataClay features on non-DataClayObject instances. Using this class has certain limitations, but it can be used under certain scenarios in which having a DataClayObject class is challenging or inappropriate. For example, this class can accommodate the use of NumPy arrays or Pandas DataFrames. """ _dc_proxy_classes_cache: ClassVar[dict] = dict() _dc_base_object: T @classmethod def _create_class_proxy(cls, extraneous_class: Type[T]) -> Type[T]: """Create a new class (type instance) for an extraneous class. This class method will be called once per class to create the specialized proxy class mimicking the extraneous one. This method introspects the extraneous class and creates the appropriate active methods. """ try: return cls._dc_proxy_classes_cache[extraneous_class] except KeyError: cls_namespace = {} for name in dir(extraneous_class): subject = getattr(extraneous_class, name) if callable(subject) and name not in ignore_fields: cls_namespace[name] = virtualactivemethod(subject) new_cls = type( "%s[%s.%s]" % (cls.__name__, extraneous_class.__module__, extraneous_class.__name__), (cls,), cls_namespace, ) cls._dc_proxy_classes_cache[extraneous_class] = new_cls return new_cls def __new__(cls, obj: T, *args, **kwargs): """Creates a AlienDataClayObject instance referencing `obj`.""" proxy_class = cls._create_class_proxy(obj.__class__) instance = DataClayObject.__new__(proxy_class) object.__getattribute__(instance, "_dc_meta").class_name = ( f"AlienDataClayObject[{type(obj).__module__}.{type(obj).__name__}]" ) object.__setattr__(instance, "_dc_base_object", obj) return instance def __getattr__(self, name): if name in local_fields: return object.__getattribute__(self, name) elif object.__getattribute__(self, "_dc_is_local"): logger.debug("local get") if not object.__getattribute__(self, "_dc_is_loaded"): asyncio.run_coroutine_threadsafe( get_runtime().data_manager.load_object(self), get_dc_event_loop() ).result() return getattr(self._dc_base_object, name) else: logger.debug("local get") return asyncio.run_coroutine_threadsafe( get_runtime().call_remote_method(self, "__getattribute__", (name,), {}), get_dc_event_loop(), ).result() def __setattr__(self, name, value): if name in local_fields: return object.__setattr__(self, name, value) elif object.__getattribute__(self, "_dc_is_local"): logger.debug("local set") if not object.__getattribute__(self, "_dc_is_loaded"): asyncio.run_coroutine_threadsafe( get_runtime().data_manager.load_object(self), get_dc_event_loop() ).result() setattr(self._dc_base_object, name, value) else: logger.debug("remote set") return asyncio.run_coroutine_threadsafe( get_runtime().call_remote_method(self, "__setattr__", (name, value), {}), get_dc_event_loop(), ).result() def __delattr__(self, name: str): if name in local_fields: raise SystemError("Should not delete local field %s" % name) elif object.__getattribute__(self, "_dc_is_local"): logger.debug("local del") if not object.__getattribute__(self, "_dc_is_loaded"): asyncio.run_coroutine_threadsafe( get_runtime().data_manager.load_object(self), get_dc_event_loop() ).result() delattr(self._dc_base_object, name) else: logger.debug("remote del") return asyncio.run_coroutine_threadsafe( get_runtime().call_remote_method(self, "__delattr__", (name,), {}), get_dc_event_loop(), ).result() def __getstate__(self): return self._dc_base_object def __setstate__(self, state): object.__setattr__(self, "_dc_base_object", state)