Source code for dataclay.client.api

"""Common library client API for dataClay.

Runtime.
Note that importing this module has a basic semantic: it prepares the dataClay
core and sets the "client" mode for the library.
"""

from __future__ import annotations

import threading

__all__ = ["init", "finish", "DataClayObject"]

import asyncio
import logging
import logging.config
from typing import TYPE_CHECKING, Optional

from dataclay.config import (
    ClientSettings,
    get_runtime,
    logger_config,
    session_var,
    set_runtime,
    settings,
)
from dataclay.event_loop import EventLoopThread, get_dc_event_loop, set_dc_event_loop
from dataclay.proxy import generate_jwt
from dataclay.runtime import ClientRuntime
from dataclay.utils.telemetry import trace

if TYPE_CHECKING:
    from uuid import UUID

    from dataclay.backend.client import BackendClient
    from dataclay.dataclay_object import DataClayObject

# This will be populated during initialization
# LOCAL = _UNDEFINED_LOCAL

tracer = trace.get_tracer(__name__)
logger = logging.getLogger(__name__)

_telemetry_started = False


def start_telemetry():
    global _telemetry_started
    if _telemetry_started:
        return

    if settings.service_name is None:
        settings.service_name = "client"

    if settings.tracing:
        # pylint: disable=import-outside-toplevel
        import dataclay.utils.telemetry

        dataclay.utils.telemetry.set_tracing(
            settings.service_name,
            settings.tracing_host,
            settings.tracing_port,
            settings.tracing_exporter,
        )

    if settings.metrics:
        # pylint: disable=import-outside-toplevel
        import dataclay.utils.metrics

        dataclay.utils.metrics.set_metrics(
            settings.metrics_host,
            settings.metrics_port,
            settings.metrics_exporter,
        )

    _telemetry_started = True


def init():
    client_api = Client()
    client_api.start()
    return client_api


def finish():
    pass


[docs] class Client: """Client API for dataClay. Usually you create client instance and call start() method to initialize it. .. code-block:: python from dataclay import Client client = Client(host="127.0.0.1") client.start() All the Client configuration variables are detailed in :class:`ClientSettings`. :param host: Metadata Service host. It is mutually exclusive with `proxy_host`. :param port: Metadata Service port. Optional. Use if you want to override the default port. :param username: Username. Authentication and authorization happens only at the Proxy. :param password: `DEPRECATED` :param dataset: Dataset name. All objects created by this client will be associated with this dataset. :param local_backend: Name of the local backend. If set, the client will use this backend for all operations. :param proxy_host: Proxy host. It is mutually exclusive with `host`. If this is set, this client will connect to the proxy instead for communicating with the metadata service or any backend. :param proxy_port: Proxy port. Optional. Use if you want to override the default port. """ settings: ClientSettings runtime: ClientRuntime previous_settings: Optional[ClientSettings] previous_runtime: Optional[ClientRuntime] is_active: bool = False _token: bytes _TOKEN_EXPIRATION = 24 * 30 def __init__( self, host: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, dataset: Optional[str] = None, local_backend: Optional[str] = None, proxy_host: Optional[str] = None, proxy_port: Optional[int] = None, ): # Set settings settings_kwargs = {} if host: settings_kwargs["dataclay_host"] = host if port: settings_kwargs["dataclay_port"] = port if username: settings_kwargs["username"] = username if password: settings_kwargs["password"] = password if dataset: settings_kwargs["dataset"] = dataset if local_backend: settings_kwargs["local_backend"] = local_backend if proxy_host: settings_kwargs["proxy_host"] = proxy_host settings_kwargs["proxy_enabled"] = True if proxy_port: settings_kwargs["proxy_port"] = proxy_port settings_kwargs["proxy_enabled"] = True self._token = b"" self.settings = ClientSettings(**settings_kwargs) start_telemetry() # Set local backend # if settings.LOCAL_BACKEND: # for ee_id, ee_info in self.runtime.ee_infos.items(): # if ee_info.sl_name == settings.LOCAL_BACKEND: # global LOCAL # LOCAL = ee_id # break # else: # logger.warning( # "Backend with name '%s' not found, ignoring", settings.LOCAL_BACKEND # )
[docs] @tracer.start_as_current_span("start") def start(self): """Start the client runtime""" logger_config(level=settings.loglevel) if self.is_active: logger.warning("Client already active. Ignoring") return logger.info("Starting client runtime") loop = get_dc_event_loop() if loop is None: logger.info("Creating event loop in new thread") loop = asyncio.new_event_loop() set_dc_event_loop(loop) event_loop_thread = EventLoopThread(loop) event_loop_thread.start() event_loop_thread.ready.wait() else: logger.info("Using existing event loop") # Replace settings self.previous_settings = settings.client settings.client = self.settings # Create and replace runtime self.previous_runtime = get_runtime() if settings.client.proxy_enabled: logger.debug( "Using proxy connection to %s:%s", settings.client.proxy_host, settings.client.proxy_port, ) self.runtime = ClientRuntime(settings.client.proxy_host, settings.client.proxy_port) # Generate the JWT(JSON web token) self._token = generate_jwt( settings.client.password, settings.client.username, self._TOKEN_EXPIRATION ) else: self.runtime = ClientRuntime( settings.client.dataclay_host, settings.client.dataclay_port ) logger.info("Starting client runtime coroutine in event loop") assert loop._thread_id != threading.get_ident() # Redundancy check future = asyncio.run_coroutine_threadsafe(self.runtime.start(), loop) future.result() set_runtime(self.runtime) session_var.set( { "dataset_name": settings.client.dataset, "username": settings.client.username, "token": self._token, } ) # Cache the dataclay_id, to avoid later request # self.runtime.dataclay_id self.is_active = True logger.info("Client runtime started")
[docs] @tracer.start_as_current_span("stop") def stop(self): """Stop the client runtime""" if not self.is_active: logger.warning("Client is not active. Ignoring") return logger.info("Stopping client runtime") asyncio.run_coroutine_threadsafe(self.runtime.stop(), get_dc_event_loop()).result() settings.client = self.previous_settings set_runtime(self.previous_runtime) self.is_active = False logger.info("Client runtime stopped")
def __del__(self): # BUG: When calling stop() from __del__, it hangs the program at `run_coroutine_threadsafe` # self.stop() if self.is_active: logger.warning("Client instance deleted without calling stop() for a clean shutdown") pass def __enter__(self): self.start() return self def __exit__(self, *args): self.stop() async def __aenter__(self): self.start() return self async def __aexit__(self, *excinfo): self.stop()
[docs] @tracer.start_as_current_span("get_backends") def get_backends(self) -> dict[UUID, BackendClient]: """Get all backends available in the system. This method connects to the metadata service and retrieves an up-to-date list of backends. """ if not self.is_active: raise RuntimeError("Client is not active") asyncio.run_coroutine_threadsafe( self.runtime.backend_clients.update(), get_dc_event_loop() ).result() return self.runtime.backend_clients
[docs] @tracer.start_as_current_span("a_get_backends") async def a_get_backends(self) -> dict[UUID, BackendClient]: """Asynchronous version of :meth:`get_backends`""" if not self.is_active: raise RuntimeError("Client is not active") future = asyncio.run_coroutine_threadsafe( self.runtime.backend_clients.update(), get_dc_event_loop() ) await asyncio.wrap_future(future) return self.runtime.backend_clients