Zenoh¶
This example explains in a simple way how a publisher and a subscriber can communicate with each other using Zenoh. The publisher will generate a random temperature value simulating a thermometer and will send this value to the topic “tmp”. We will then run the subscriber, which first will ask zenoh which is the last value for the topic “tmp”. Later the subscriber will subscribe itself to the topic “key”. We will run again the publisher which will generate a new temperature value and we will see how the subscriber handles it.
Note
This example is available in GitHub.
Class definition: The handler() function will overwritte the one from the class ZenohMixin at dataclay.contrib.zenoh_module
import logging
from dataclay import DataClayObject, activemethod
from dataclay.contrib.zenoh_module import ZenohMixin
logger = logging.getLogger(__name__)
class ZenohSubs(DataClayObject, ZenohMixin):
data: str
buf: str
key: str
@activemethod
def __init__(self, conf):
super().__init__(conf)
self = self
self.data = ""
self.buf = ""
self.key = ""
@activemethod
def handler(self, sample):
self.data = (
f"Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')"
)
logger.debug(self.data)
@activemethod
def set_buf(self, buf):
self.buf = buf
@activemethod
def set_key(self, key):
self.key = key
@activemethod
def get_data(self):
return self.data
Zenoh configuration: In this example we will be using a configuration which will let Zenoh store the last value of each topic.
{
plugins: {
rest: { // activate and configure the REST plugin
http_port: 8000 // with HTTP server listening on port 8000
},
storage_manager: { // activate and configure the storage_manager plugin
storages: {
data: { // configure a "data" storage
key_expr: "**", // which subscribes and replies to query on **
volume: { // and using the "memory" volume (always present by default)
id: "memory"
}
}
}
}
}
}
Then we can deploy dataclay using docker-compose. Create a file docker-compose.yml and use the following docker-compose file using docker-compose up.
version: '3.9'
services:
redis:
image: redis:latest
ports:
- 6379:6379
metadata-service:
image: "ghcr.io/bsc-dom/dataclay:dev"
depends_on:
- redis
ports:
- 16587:16587
environment:
- DATACLAY_KV_HOST=redis
- DATACLAY_KV_PORT=6379
- DATACLAY_ID
- DATACLAY_PASSWORD=s3cret
- DATACLAY_USERNAME=testuser
- DATACLAY_DATASET=testdata
- DATACLAY_METADATA_PORT=16587
- DATACLAY_LOGLEVEL=DEBUG
command: python -m dataclay.metadata
backend:
image: "ghcr.io/bsc-dom/dataclay:dev"
depends_on:
- redis
ports:
- 6867:6867
environment:
- DATACLAY_KV_HOST=redis
- DATACLAY_KV_PORT=6379
- DATACLAY_BACKEND_ID
- DATACLAY_BACKEND_NAME
- DATACLAY_BACKEND_PORT=6867
- DATACLAY_LOGLEVEL=DEBUG
command: bash -c "pip install eclipse-zenoh && python -m dataclay.backend"
volumes:
- ./model:/workdir/model:ro
backend_2:
image: "ghcr.io/bsc-dom/dataclay:dev"
depends_on:
- redis
ports:
- 6868:6868
environment:
- DATACLAY_KV_HOST=redis
- DATACLAY_KV_PORT=6379
- DATACLAY_BACKEND_ID
- DATACLAY_BACKEND_NAME
- DATACLAY_BACKEND_PORT=6868
- DATACLAY_LOGLEVEL=DEBUG
command: bash -c "pip install eclipse-zenoh && python -m dataclay.backend"
volumes:
- ./model:/workdir/model:ro
# zenoh eclipse
zenoh_net:
image: eclipse/zenoh
restart: unless-stopped
ports:
- 7447:7447
- 8000:8000
volumes:
- ./zenoh_docker:/root/.zenoh
environment:
- RUST_LOG=debug
command: -c /root/.zenoh/zenoh-conf.json5
In this moment we can run the publisher(sensor) code:
import random
from model.zenohsubs import ZenohSubs
from dataclay import Client
client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()
try:
zenohSub = ZenohSubs.get_by_alias()
except Exception:
zenohSub = ZenohSubs()
zenohSub.make_persistent()
zenohSub.set_key("tmp2")
zenohSub.set_buf(str(random.randint(0, 30)))
zenohSub.send_to_zenoh()
Run the subscriber code:
from model.zenohsubs import ZenohSubs
from dataclay import Client
client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()
try:
zenohSub = ZenohSubs.get_by_alias("subs")
except Exception:
zenohSub = ZenohSubs()
zenohSub.make_persistent(alias="subs")
zenohSub.set_key("tmp")
zenohSub.get_last_data(zenohSub.key)
zenohSub.receive_data(zenohSub.key)
Finally run the sensor code again
If we left our subscriber.py application running and we call the function “zenoh.get_data()”, we will get the temperature that the thermometer sent.