MQTT¶
This example explains in a simple way how two clients can communicate with each other using mqtt. One client, called sender.py, will generate a random temperature value simulating a thermometer. We will also have another client, called receiver.py, which will be subscribed to the topic “tmp”. The sender client will send the temperature value to the broker with the topic “tmp”. This will trigger the reciever message_handler() function, which will categorize the temperature between 3 different options: freezing, cold or warm.
Note
This example is available in GitHub.
Class definition: The message_handler() function will overwritte the one from the class MQTTMixin at dataclay.contrib.mqtt
import logging
from typing import Any
from dataclay import DataClayObject, activemethod
from dataclay.contrib.mqtt import MQTTMixin
logger = logging.getLogger(__name__)
class MqttSubs(DataClayObject, MQTTMixin):
data: dict[str, Any]
topic: str
temperature: str
@activemethod
def __init__(self):
self = self
self.data = ""
self.topic = ""
self.temperature = "NO DATA"
@activemethod
def message_handling(self, client, userdata, msg):
from json import loads
tmp = loads(msg.payload.decode())
# self.data = tmp
int_tmp = int(tmp)
if int_tmp < 5:
self.temperature = "freezing(" + tmp + ")"
else:
if int_tmp < 17:
self.temperature = "cold(" + tmp + ")"
else:
self.temperature = "warm(" + tmp + ")"
logger.debug("Temperature is %s (%s)", self.temperature, tmp)
@activemethod
def set_topic(self, topic):
self.topic = topic
@activemethod
def set_data(self, data):
self.data = data
@activemethod
def get_temp(self):
return self.temperature
Mosquitto configuration: In this example we will be using the mosquitto broker with the lowest level of configuration.
# =================================================================
# Listeners
# =================================================================
listener 1883
# =================================================================
# Security
# =================================================================
allow_anonymous true
Then we can deploy dataclay using docker-compose. Create a file docker-compose.yml and use the following docker-compose file using docker-compose up. You can modify the environment variables MQTT_HOST, MQTT_PORT and MQTT_PRODUCER_ID
version: '3.9'
services:
redis:
image: redis:latest
ports:
- 6379:6379
metadata-service:
image: "ghcr.io/bsc-dom/dataclay:dev"
depends_on:
- redis
ports:
- 16587:16587
environment:
- DATACLAY_KV_HOST=redis
- DATACLAY_KV_PORT=6379
- DATACLAY_ID
- DATACLAY_PASSWORD=s3cret
- DATACLAY_USERNAME=testuser
- DATACLAY_DATASET=testdata
- DATACLAY_METADATA_PORT=16587
- DATACLAY_LOGLEVEL=DEBUG
command: python -m dataclay.metadata
backend:
image: "ghcr.io/bsc-dom/dataclay:dev"
depends_on:
- redis
ports:
- 6867:6867
environment:
- DATACLAY_KV_HOST=redis
- DATACLAY_KV_PORT=6379
- DATACLAY_BACKEND_ID
- DATACLAY_BACKEND_NAME
- DATACLAY_BACKEND_PORT=6867
- DATACLAY_LOGLEVEL=DEBUG
- MQTT_HOST=mqtt5
- MQTT_PORT=1883
- MQTT_PRODUCER_ID=client1
command: bash -c "pip install --upgrade paho-mqtt && python -m dataclay.backend"
volumes:
- ./model:/workdir/model:ro
backend_2:
image: "ghcr.io/bsc-dom/dataclay:dev"
depends_on:
- redis
ports:
- 6868:6868
environment:
- DATACLAY_KV_HOST=redis
- DATACLAY_KV_PORT=6379
- DATACLAY_BACKEND_ID
- DATACLAY_BACKEND_NAME
- DATACLAY_BACKEND_PORT=6868
- DATACLAY_LOGLEVEL=DEBUG
- MQTT_HOST=mqtt5
- MQTT_PORT=1883
- MQTT_PRODUCER_ID=client2
command: bash -c "pip install --upgrade paho-mqtt && python -m dataclay.backend"
volumes:
- ./model:/workdir/model:ro
# mqtt5 eclipse-mosquitto
mqtt5:
image: eclipse-mosquitto
container_name: mqtt5
ports:
- "1884:1883" #default mqtt port
- "9001:9001" #default mqtt port for websockets
volumes:
- ./config:/mosquitto/config:rw
restart: unless-stopped
Then we can run the receiver client application:
from model.mqttsubs import MqttSubs
from dataclay import Client
client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()
try:
mqttsub = MqttSubs.get_by_alias("receiver")
except Exception:
mqttsub = MqttSubs()
mqttsub.make_persistent(alias="receiver")
mqttsub.set_topic("tmp")
mqttsub.subscribe_to_topic(mqttsub.topic)
And run the sender client application:
import random
from model.mqttsubs import MqttSubs
from dataclay import Client
client = Client(host="127.0.0.1", username="testuser", password="s3cret", dataset="testdata")
client.start()
try:
mqttsub = MqttSubs.get_by_alias("sender")
except Exception:
mqttsub = MqttSubs()
mqttsub.make_persistent(alias="sender")
mqttsub.set_topic("tmp")
mqttsub.set_data(str(random.randint(0, 30)))
mqttsub.send_to_mqtt()
If we leave our receiver.py application running and we call the function “mqttsub.get_temp()”, we will get the temperature that the thermometer sent.