Zenoh

This example explains in a simple way how a publisher and a subscriber can communicate with each other using Zenoh. The publisher will generate a random temperature value simulating a thermometer and will send this value to the topic “tmp”. We will then run the subscriber, which first will ask zenoh which is the last value for the topic “tmp”. Later the subscriber will subscribe itself to the topic “key”. We will run again the publisher which will generate a new temperature value and we will see how the subscriber handles it.

Note

This example is available in GitHub.

Class definition: The handler() function will overwritte the one from the class ZenohMixin at dataclay.contrib.zenoh_module

zenohsubs.py
import logging

from dataclay import DataClayObject, activemethod
from dataclay.contrib.zenoh_module import ZenohMixin

logger = logging.getLogger(__name__)


class ZenohSubs(DataClayObject, ZenohMixin):

    data: str
    buf: str
    key: str

    @activemethod
    def __init__(self, conf):
        super().__init__(conf)
        self = self
        self.data = ""
        self.buf = ""
        self.key = ""

    @activemethod
    def handler(self, sample):
        self.data = (
            f"Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')"
        )
        logger.debug(self.data)

    @activemethod
    def set_buf(self, buf):
        self.buf = buf

    @activemethod
    def set_key(self, key):
        self.key = key

    @activemethod
    def get_data(self):
        return self.data

Zenoh configuration: In this example we will be using a configuration which will let Zenoh store the last value of each topic.

zenoh-conf.json5
{
  plugins: {
    rest: {                        // activate and configure the REST plugin
      http_port: 8000              // with HTTP server listening on port 8000
    },
    storage_manager: {             // activate and configure the storage_manager plugin
      storages: {
        data: {                  // configure a "data" storage
          key_expr: "**",   // which subscribes and replies to query on **
          volume: {                // and using the "memory" volume (always present by default)
            id: "memory"
          }
        }
      }
    }
  }
}

Then we can deploy dataclay using docker-compose. Create a file docker-compose.yml and use the following docker-compose file using docker-compose up.

docker-compose.yml
version: '3.9'
services:

  redis:
    image: redis:latest
    ports:
      - 6379:6379

  metadata-service:
    image: "ghcr.io/bsc-dom/dataclay:dev"
    depends_on:
      - redis
    ports:
      - 16587:16587
    environment:
      - DATACLAY_KV_HOST=redis
      - DATACLAY_KV_PORT=6379
      - DATACLAY_ID
      - DATACLAY_PASSWORD=s3cret
      - DATACLAY_USERNAME=testuser
      - DATACLAY_DATASET=testdata
      - DATACLAY_METADATA_PORT=16587
      - DATACLAY_LOGLEVEL=DEBUG
    command: python -m dataclay.metadata

  backend:
    image: "ghcr.io/bsc-dom/dataclay:dev"
    depends_on:
      - redis
    ports:
      - 6867:6867
    environment:
      - DATACLAY_KV_HOST=redis
      - DATACLAY_KV_PORT=6379
      - DATACLAY_BACKEND_ID
      - DATACLAY_BACKEND_NAME
      - DATACLAY_BACKEND_PORT=6867
      - DATACLAY_LOGLEVEL=DEBUG
    command: bash -c "pip install eclipse-zenoh && python -m dataclay.backend"
    volumes:
      - ./model:/workdir/model:ro

  backend_2:
    image: "ghcr.io/bsc-dom/dataclay:dev"
    depends_on:
      - redis
    ports:
      - 6868:6868
    environment:
      - DATACLAY_KV_HOST=redis
      - DATACLAY_KV_PORT=6379
      - DATACLAY_BACKEND_ID
      - DATACLAY_BACKEND_NAME
      - DATACLAY_BACKEND_PORT=6868
      - DATACLAY_LOGLEVEL=DEBUG
    command: bash -c "pip install eclipse-zenoh && python -m dataclay.backend"
    volumes:
      - ./model:/workdir/model:ro
  
  # zenoh eclipse
  zenoh_net:
    image: eclipse/zenoh
    restart: unless-stopped
    ports:
      - 7447:7447
      - 8000:8000
    volumes:
      - ./zenoh_docker:/root/.zenoh
    environment:
      - RUST_LOG=debug
    command: -c /root/.zenoh/zenoh-conf.json5


    

In this moment we can run the publisher(sensor) code:

sensor.py
import random

from model.zenohsubs import ZenohSubs

from dataclay import Client

client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()

try:
    zenohSub = ZenohSubs.get_by_alias()
except Exception:
    zenohSub = ZenohSubs()
    zenohSub.make_persistent()

zenohSub.set_key("tmp2")
zenohSub.set_buf(str(random.randint(0, 30)))

zenohSub.send_to_zenoh()

Run the subscriber code:

subscriber.py
from model.zenohsubs import ZenohSubs

from dataclay import Client

client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()

try:
    zenohSub = ZenohSubs.get_by_alias("subs")
except Exception:
    zenohSub = ZenohSubs()
    zenohSub.make_persistent(alias="subs")

zenohSub.set_key("tmp")

zenohSub.get_last_data(zenohSub.key)

zenohSub.receive_data(zenohSub.key)

Finally run the sensor code again

If we left our subscriber.py application running and we call the function “zenoh.get_data()”, we will get the temperature that the thermometer sent.