Source code for dataclay.contrib.mqtt

"""
dataClay includes support for client communication via MQTT.

In order to use this functionality, the client class has to inherit the :class:`MQTTMixin` class.

The client will be able to specify how the messages should be handled, which topics will be subscribed to,
and send messages with a topic.
"""

import inspect
import logging
import os
from typing import Any

from dataclay import activemethod

try:
    import paho.mqtt.client as mqtt
except ImportError:
    import warnings

    warnings.warn("Warning: <import paho.mqtt.client> failed", category=ImportWarning)
try:
    from json import dumps
except ImportError:
    import warnings

    warnings.warn("Warning: <from json import dumps> failed", category=ImportWarning)
try:
    from dataclay.contrib.mqtt import MQTT_PRODUCERS
except ImportError:
    import warnings

    warnings.warn(
        "Warning: <from dataclay.contrib.mqtt import MQTT_PRODUCERS> failed", category=ImportWarning
    )

""" Mqtt pool of producers """
MQTT_PRODUCERS = dict()

logger = logging.getLogger(__name__)


[docs] class MQTTMixin: """MQTT mechanisms"""
[docs] @activemethod def message_handling(self, client, userdata, msg): """Placeholder for function message_handling. This function describes how the client will handle a message. Args: client (paho.mqtt.client.Client): The client instance for this callback. userdata: The private user data as set in Client() or user_data_set(). msg (MQTTMessage): The received message. This is a class with members topic, payload, qos, retain. Raises: NotImplementedError: If the message_handling function has not been implemented, an error is raised. """ raise NotImplementedError("Must override message_handling")
[docs] @activemethod def subscribe_to_topic(self, topic: str = "dataclay"): """Subscribes the client to a topic and indicates how it will handle a message received. Args: topic (str, optional): String representing the topic. Defaults to "dataclay". """ mqtt_host = os.getenv("MQTT_HOST", "mqtt5") mqtt_port = int(os.getenv("MQTT_PORT", "1883")) mqtt_client = os.getenv("MQTT_PRODUCER_ID", "dataclay_mqtt_producer") mqtt_address = f"{mqtt_host}:{mqtt_port}" mqtt_username = os.getenv("MQTT_USERNAME", "") mqtt_password = os.getenv("MQTT_PASSWORD", "") if mqtt_address in MQTT_PRODUCERS: mqtt_producer = MQTT_PRODUCERS[mqtt_address] else: mqtt_producer = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, mqtt_client) mqtt_producer.on_message = self.message_handling if mqtt_username and mqtt_password: mqtt_producer.username_pw_set(mqtt_username, mqtt_password) mqtt_producer.connect(mqtt_host, mqtt_port) mqtt_producer.loop_start() MQTT_PRODUCERS[mqtt_address] = mqtt_producer mqtt_producer.subscribe(topic)
[docs] @activemethod def produce_mqtt_msg(self, data: dict[str, Any], topic: str = "dataclay", **more): """The client is connected to the broker, and it sends the message to the chosen topic. Args: data (dict[str, Any]): Message. topic (str, optional): Topic of the message. Defaults to "dataclay". """ mqtt_host = os.getenv("MQTT_HOST", "mqtt5") mqtt_port = int(os.getenv("MQTT_PORT", "1883")) mqtt_client = os.getenv("MQTT_PRODUCER_ID", "dataclay_mqtt_producer") mqtt_address = f"{mqtt_host}:{mqtt_port}" if mqtt_address in MQTT_PRODUCERS: mqtt_producer = MQTT_PRODUCERS[mqtt_address] else: mqtt_producer = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, mqtt_client) mqtt_producer.connect(mqtt_host, mqtt_port) mqtt_producer.loop_start() MQTT_PRODUCERS[mqtt_address] = mqtt_producer data_str = dumps(data).encode("utf-8") mqtt_producer.publish(topic, data_str, qos=1)
[docs] @activemethod def send_to_mqtt(self): """Previous function to produce_mqtt_msg. Gets all the arguments needed from the calling class.""" attributes = inspect.getmembers(self.__class__, lambda a: not (inspect.isroutine(a))) field_values = {} for field in attributes: fieldname = field[0] if not (fieldname.startswith("_")): field_values[fieldname] = getattr(self, fieldname) self.produce_mqtt_msg(**field_values)