import asyncio
import logging
import sys
from typing import NamedTuple
# Note that session_var is only needed in _get_by_alias, and maybe should be moved to DataClayRuntime (maybe, TODO, check)
from .config import get_runtime, session_var
from .dataclay_object import DataClayObject
from .event_loop import get_dc_event_loop
local_fields = frozenset(
[
"_dc_stub_classname",
"_dc_meta",
"_dc_is_local",
"_dc_is_loaded",
"_dc_is_registered",
"_dc_is_replica",
"__dict__",
]
)
logger = logging.getLogger(__name__)
class StubInfo(NamedTuple):
classname: str
properties: list[str]
activemethods: list[str]
def stubmethodwrapper(basename, method_name):
def remote_calling(self, *args, **kwargs):
logger.debug("Calling activemethod '%s' from stub %r", method_name, self)
return asyncio.run_coroutine_threadsafe(
get_runtime().call_remote_method(self, method_name, args, kwargs), get_dc_event_loop()
).result()
remote_calling.__name__ = method_name
remote_calling.__qualname__ = f"{basename}.{method_name}"
return remote_calling
class _StubMetaClass(type):
cached_classes = {}
def __getitem__(cls, key):
classname = key
# so only classname is used as the key for the StubDataClayObject["<classname>"]
if classname not in cls.cached_classes:
# Retrieve `properties` and `activemethods` from dataClay
properties, activemethods = asyncio.run_coroutine_threadsafe(
get_runtime().get_class_info(classname),
get_dc_event_loop(),
).result()
# Remove "__init__" from activemethods if it is there
if "__init__" in activemethods:
activemethods.remove("__init__")
# Prepare the StubInfo tuple
stub_info = StubInfo(
classname=classname,
properties=properties,
activemethods=activemethods,
)
# Extract original module from classname
_, basename = classname.rsplit(".", 1)
# The class dictionary containing all the methods
clsdict = {
method_name: stubmethodwrapper(basename, method_name)
for method_name in activemethods
}
# and also the stub info in its proper place
clsdict["_dc_stub_info"] = stub_info
# That should be enough for creating the class
cls.cached_classes[classname] = type(
f"StubDataClayObject[{classname}]",
(StubDataClayObject,),
clsdict,
)
return cls.cached_classes[classname]
[docs]
class StubDataClayObject(DataClayObject, metaclass=_StubMetaClass):
_dc_stub_info: StubInfo
@classmethod
def _check_stub_info(cls):
if not hasattr(cls, "_dc_stub_info"):
raise TypeError('Need to specify class first: StubDataClayObject["<classname>"]')
def __init__(self, *args, **kwargs):
self._check_stub_info()
# We override the class_name to make it work as if it was the original class
self._dc_meta.class_name = self._dc_stub_info.classname
# We need to first create a remote object and then initialize it
self.make_persistent()
return asyncio.run_coroutine_threadsafe(
get_runtime().call_remote_method(self, "__init__", args, kwargs),
get_dc_event_loop(),
).result()
@classmethod
async def _get_by_alias(cls, alias: str, dataset_name: str = None):
# This is adapted from DataClayRuntime.get_object_by_alias
# TODO: Maybe this should be done there instead? Not sure.
if dataset_name is None:
dataset_name = session_var.get()["dataset_name"]
object_md = await get_runtime().metadata_service.get_object_md_by_alias(alias, dataset_name)
new_stub = cls.__new__(cls)
new_stub._dc_meta = object_md
new_stub._dc_is_local = False
new_stub._dc_is_replica = False
new_stub._dc_is_loaded = False
new_stub._dc_is_registered = True
return new_stub
[docs]
@classmethod
def get_by_alias(cls, alias: str, dataset_name: str = None):
"""Create a new instance of the stub class by alias.
This class overrides the behavior of :meth:`~dataclay.DataClayObject.get_by_alias`
and creates stub instance from the alias.
Args:
alias: The alias of the object to retrieve.
dataset_name: Optional. The name of the dataset where the alias is stored.
If not provided, the active dataset is used.
Returns:
A stub instance of the object associated with the given alias.
"""
cls._check_stub_info()
return asyncio.run_coroutine_threadsafe(
cls._get_by_alias(alias, dataset_name), get_dc_event_loop()
).result()
def __getattr__(self, name):
if name in local_fields:
return object.__getattribute__(self, name)
elif name in self._dc_stub_info.properties:
logger.debug("remote get")
return asyncio.run_coroutine_threadsafe(
get_runtime().call_remote_method(self, "__getattribute__", (name,), {}),
get_dc_event_loop(),
).result()
else:
raise AttributeError(
f"Property {name} is not defined in {self._dc_stub_info.classname}"
)
def __setattr__(self, name, value):
if name in local_fields:
return object.__setattr__(self, name, value)
elif name in self._dc_stub_info.properties:
logger.debug("remote set")
return asyncio.run_coroutine_threadsafe(
get_runtime().call_remote_method(self, "__setattr__", (name, value), {}),
get_dc_event_loop(),
).result()
else:
raise AttributeError(
f"Property {name} is not defined in {self._dc_stub_info.classname}"
)
def __delattr__(self, name):
if name in local_fields:
raise SystemError("Should not delete local field %s" % name)
elif name in self._dc_stub_info.properties:
logger.debug("remote del")
return asyncio.run_coroutine_threadsafe(
get_runtime().call_remote_method(self, "__delattr__", (name,), {}),
get_dc_event_loop(),
).result()
else:
raise AttributeError(
f"Property {name} is not defined in {self._dc_stub_info.classname}"
)