MQTT

This example explains in a simple way how two clients can communicate with each other using mqtt. One client, called sender.py, will generate a random temperature value simulating a thermometer. We will also have another client, called receiver.py, which will be subscribed to the topic “tmp”. The sender client will send the temperature value to the broker with the topic “tmp”. This will trigger the reciever message_handler() function, which will categorize the temperature between 3 different options: freezing, cold or warm.

Note

This example is available in GitHub.

Class definition: The message_handler() function will overwritte the one from the class MQTTMixin at dataclay.contrib.mqtt

mqttsubs.py
import logging
from typing import Any

from dataclay import DataClayObject, activemethod
from dataclay.contrib.mqtt import MQTTMixin

logger = logging.getLogger(__name__)


class MqttSubs(DataClayObject, MQTTMixin):

    data: dict[str, Any]
    topic: str
    temperature: str

    @activemethod
    def __init__(self):
        self = self
        self.data = ""
        self.topic = ""
        self.temperature = "NO DATA"

    @activemethod
    def message_handling(self, client, userdata, msg):
        from json import loads

        tmp = loads(msg.payload.decode())
        # self.data = tmp
        int_tmp = int(tmp)
        if int_tmp < 5:
            self.temperature = "freezing(" + tmp + ")"
        else:
            if int_tmp < 17:
                self.temperature = "cold(" + tmp + ")"
            else:
                self.temperature = "warm(" + tmp + ")"

        logger.debug("Temperature is %s (%s)", self.temperature, tmp)

    @activemethod
    def set_topic(self, topic):
        self.topic = topic

    @activemethod
    def set_data(self, data):
        self.data = data

    @activemethod
    def get_temp(self):
        return self.temperature

Mosquitto configuration: In this example we will be using the mosquitto broker with the lowest level of configuration.

mosquitto.conf
# =================================================================
# Listeners
# =================================================================
listener 1883

# =================================================================
# Security
# =================================================================
allow_anonymous true

Then we can deploy dataclay using docker-compose. Create a file docker-compose.yml and use the following docker-compose file using docker-compose up. You can modify the environment variables MQTT_HOST, MQTT_PORT and MQTT_PRODUCER_ID

docker-compose.yml
version: '3.9'
services:

  redis:
    image: redis:latest
    ports:
      - 6379:6379

  metadata-service:
    image: "ghcr.io/bsc-dom/dataclay:dev"
    depends_on:
      - redis
    ports:
      - 16587:16587
    environment:
      - DATACLAY_KV_HOST=redis
      - DATACLAY_KV_PORT=6379
      - DATACLAY_ID
      - DATACLAY_PASSWORD=s3cret
      - DATACLAY_USERNAME=testuser
      - DATACLAY_DATASET=testdata
      - DATACLAY_METADATA_PORT=16587
      - DATACLAY_LOGLEVEL=DEBUG
    command: python -m dataclay.metadata

  backend:
    image: "ghcr.io/bsc-dom/dataclay:dev"
    depends_on:
      - redis
    ports:
      - 6867:6867
    environment:
      - DATACLAY_KV_HOST=redis
      - DATACLAY_KV_PORT=6379
      - DATACLAY_BACKEND_ID
      - DATACLAY_BACKEND_NAME
      - DATACLAY_BACKEND_PORT=6867
      - DATACLAY_LOGLEVEL=DEBUG
      - MQTT_HOST=mqtt5
      - MQTT_PORT=1883
      - MQTT_PRODUCER_ID=client1
    command: bash -c "pip install --upgrade paho-mqtt && python -m dataclay.backend"
    volumes:
      - ./model:/workdir/model:ro

  backend_2:
    image: "ghcr.io/bsc-dom/dataclay:dev"
    depends_on:
      - redis
    ports:
      - 6868:6868
    environment:
      - DATACLAY_KV_HOST=redis
      - DATACLAY_KV_PORT=6379
      - DATACLAY_BACKEND_ID
      - DATACLAY_BACKEND_NAME
      - DATACLAY_BACKEND_PORT=6868
      - DATACLAY_LOGLEVEL=DEBUG
      - MQTT_HOST=mqtt5
      - MQTT_PORT=1883
      - MQTT_PRODUCER_ID=client2
    command: bash -c "pip install --upgrade paho-mqtt && python -m dataclay.backend"
    volumes:
      - ./model:/workdir/model:ro
  
  # mqtt5 eclipse-mosquitto
  mqtt5:
    image: eclipse-mosquitto
    container_name: mqtt5
    ports:
      - "1884:1883" #default mqtt port
      - "9001:9001" #default mqtt port for websockets
    volumes:
      - ./config:/mosquitto/config:rw
    restart: unless-stopped


    

Then we can run the receiver client application:

receiver.py
from model.mqttsubs import MqttSubs

from dataclay import Client

client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()

try:
    mqttsub = MqttSubs.get_by_alias("receiver")
except Exception:
    mqttsub = MqttSubs()
    mqttsub.make_persistent(alias="receiver")

mqttsub.set_topic("tmp")

mqttsub.subscribe_to_topic(mqttsub.topic)

And run the sender client application:

sender.py
import random

from model.mqttsubs import MqttSubs

from dataclay import Client

client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()

try:
    mqttsub = MqttSubs.get_by_alias("sender")
except Exception:
    mqttsub = MqttSubs()
    mqttsub.make_persistent(alias="sender")

mqttsub.set_topic("tmp")

mqttsub.set_data(str(random.randint(0, 30)))

mqttsub.send_to_mqtt()

If we leave our receiver.py application running and we call the function “mqttsub.get_temp()”, we will get the temperature that the thermometer sent.