Source code for dataclay.contrib.zenoh_module
"""
dataClay includes support for publisher/subscriber communication via Zenoh.
In order to use this functionality, the client class has to inherit the :class:`ZenohMixin` class. Communications can
be established using pub/sub and queries.
The subscriber can specify how the messages will be handled and to which topics it will be subscribed. Also, it
can ask for the last value stored in an existing subscription using the queries.
The publisher can send messages to a specific topic.
"""
try:
import zenoh
except ImportError:
import warnings
warnings.warn("Warning: <import zenoh> failed", category=ImportWarning)
import logging
import time
from threading import Thread
from dataclay import activemethod
logger = logging.getLogger(__name__)
[docs]
class ZenohMixin:
"""Zenoh mechanisms"""
subscriptions: str
conf: str
@activemethod
def __init__(self, conf: str = "{}"):
"""Class constructor.
Args:
conf (str, optional): Change the configuration if needed. Defaults to '{}'.
"""
self.subscriptions = []
self.conf = conf
[docs]
@activemethod
def handler(self, sample):
"""Placeholder for function handler. This function describes how the client will handle a message.
Args:
sample (zenoh.Sample): Information sent by Zenoh to the subscriber.
Raises:
NotImplementedError: If the handler function has not been implemented an error is raised.
"""
raise NotImplementedError("Must override handler")
[docs]
@activemethod
def produce_zenoh_msg(self, buf: str = "", key: str = "dataclay", **more):
"""Sends the message "buf" to the topic "key".
Args:
buf (str, optional): Message. Defaults to "".
key (str, optional): Topic. Defaults to "dataclay".
"""
session = zenoh.open(zenoh.Config.from_json5(self.conf))
pub = session.declare_publisher(key)
pub.put(buf)
[docs]
@activemethod
def send_to_zenoh(self):
"""Previous function to produce_zenoh_msg. Gets all the arguments needed from the calling class."""
import inspect
attributes = inspect.getmembers(self.__class__, lambda a: not (inspect.isroutine(a)))
field_values = {}
for field in attributes:
fieldname = field[0]
if not (fieldname.startswith("_")):
field_values[fieldname] = getattr(self, fieldname)
self.produce_zenoh_msg(**field_values)
@activemethod
def __receive_data(self, key):
"""While the client is subscribed to a topic, it will check and handle the receipt of a message.
Args:
key (str): Topic.
"""
session = zenoh.open(zenoh.Config.from_json5(self.conf))
sub = session.declare_subscriber(key, self.handler)
while key in self.subscriptions:
time.sleep(0.1)
return
[docs]
@activemethod
def receive_data(self, key: str = "dataclay"):
"""Create a thread which will check if a message from the topic arrives and will handle it.
Args:
key (str): Topic.
"""
if key not in self.subscriptions:
t = Thread(target=self.__receive_data, args=(key,))
self.subscriptions.append(key)
t.start()
[docs]
@activemethod
def unsubscribe(self, key: str = "dataclay"):
"""Unsubscribes the client from a topic.
Args:
key (str): Topic.
"""
try:
self.subscriptions.remove(key)
except:
logger.error("The client was not subscribed to this topic")
[docs]
@activemethod
def get_last_data(self, key: str = "dataclay"):
"""Returns the latest value stored in this Zenoh topic.
Args:
key (str): Topic.
Returns:
list[str{reply.key_expr};{reply.playload}]: Latest value stored on a specific Zenoh
topic.
"""
returns = []
session = zenoh.open(zenoh.Config.from_json5(self.conf))
replies = session.get(key, zenoh.ListCollector())
for reply in replies():
try:
returns.append(
"Received ('{}': '{}')".format(
reply.ok.key_expr, reply.ok.payload.decode("utf-8")
)
)
except:
returns.append("Received (ERROR: '{}')".format(reply.err.payload.decode("utf-8")))
return returns