LCOV - code coverage report
Current view: top level - nnstreamer-edge-0.2.6/src/libnnstreamer-edge - nnstreamer-edge-mqtt-mosquitto.c (source / functions) Coverage Total Hit
Test: NNStreamer-edge 0.2.6-1 nnstreamer/nnstreamer-edge#121619a22eefb07aef74ab765d1eb0ec59b1416e Lines: 85.8 % 219 188
Test Date: 2025-06-06 05:20:40 Functions: 100.0 % 12 12

            Line data    Source code
       1              : /* SPDX-License-Identifier: Apache-2.0 */
       2              : /**
       3              :  * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
       4              :  *
       5              :  * @file   nnstreamer-edge-mqtt-mosquitto.c
       6              :  * @date   14 Oct 2022
       7              :  * @brief  Internal functions to support MQTT protocol (mosquitto Library).
       8              :  * @see    https://github.com/nnstreamer/nnstreamer-edge
       9              :  * @author Gichan Jang <gichan2.jang@samsung.com>
      10              :  * @bug    No known bugs except for NYI items
      11              :  */
      12              : 
      13              : #if !defined(ENABLE_MQTT)
      14              : #error "This file can be built with mosquitto library."
      15              : #endif
      16              : 
      17              : #include <mosquitto.h>
      18              : #include "nnstreamer-edge-mqtt.h"
      19              : #include "nnstreamer-edge-log.h"
      20              : #include "nnstreamer-edge-util.h"
      21              : #include "nnstreamer-edge-queue.h"
      22              : #include "nnstreamer-edge-data.h"
      23              : #include "nnstreamer-edge-event.h"
      24              : 
      25              : /**
      26              :  * @brief Data structure for mqtt broker handle.
      27              :  */
      28              : typedef struct
      29              : {
      30              :   void *mqtt_h;
      31              :   nns_edge_queue_h message_queue;
      32              :   char *id;
      33              :   char *topic;
      34              :   char *host;
      35              :   int port;
      36              :   bool connected;
      37              : 
      38              :   /* event callback for new message */
      39              :   nns_edge_event_cb event_cb;
      40              :   void *user_data;
      41              : 
      42              :   pthread_mutex_t lock;
      43              :   pthread_cond_t cond;
      44              :   bool cleared;
      45              : } nns_edge_broker_s;
      46              : 
      47              : /**
      48              :  * @brief Callback function to be called when a message is arrived.
      49              :  */
      50              : static void
      51           14 : on_message_callback (struct mosquitto *client, void *data,
      52              :     const struct mosquitto_message *message)
      53              : {
      54           14 :   nns_edge_broker_s *bh = (nns_edge_broker_s *) data;
      55           14 :   char *msg = NULL;
      56              :   nns_size_t msg_len;
      57              :   int ret;
      58              : 
      59           14 :   if (!bh) {
      60            0 :     nns_edge_loge ("Invalid param, given broker handle is invalid.");
      61            0 :     return;
      62              :   }
      63              : 
      64           14 :   if (0 >= message->payloadlen) {
      65            3 :     nns_edge_logw ("Invalid payload length: %d", message->payloadlen);
      66            3 :     return;
      67              :   }
      68              : 
      69           11 :   nns_edge_logd ("MQTT message is arrived (ID:%d, Topic:%s).",
      70              :       message->mid, message->topic);
      71              : 
      72           11 :   msg_len = (nns_size_t) message->payloadlen;
      73           11 :   msg = nns_edge_memdup (message->payload, msg_len);
      74              : 
      75           11 :   if (msg) {
      76           11 :     if (bh->event_cb) {
      77              :       nns_edge_data_h data_h;
      78              : 
      79           10 :       if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
      80            0 :         nns_edge_loge ("Failed to create data handle in msg thread.");
      81            0 :         SAFE_FREE (msg);
      82            0 :         return;
      83              :       }
      84              : 
      85           10 :       nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len);
      86              : 
      87           10 :       ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data,
      88              :           NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
      89              :           NULL);
      90           10 :       if (ret != NNS_EDGE_ERROR_NONE)
      91            0 :         nns_edge_loge ("Failed to send an event for received message.");
      92              : 
      93           10 :       nns_edge_data_destroy (data_h);
      94           10 :       SAFE_FREE (msg);
      95              :     } else {
      96              :       /* Push received message into msg queue. DO NOT free msg here. */
      97            1 :       nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
      98              :     }
      99              :   }
     100              : 
     101           11 :   return;
     102              : }
     103              : 
     104              : /**
     105              :  * @brief Initializes MQTT object.
     106              :  */
     107              : static int
     108           11 : _nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host,
     109              :     const int port, nns_edge_broker_h * broker_h)
     110              : {
     111              :   nns_edge_broker_s *bh;
     112              :   int mret;
     113              :   char *client_id;
     114              :   struct mosquitto *handle;
     115           11 :   int ver = MQTT_PROTOCOL_V311; /** @todo check mqtt version (TizenRT repo) */
     116              : 
     117           11 :   nns_edge_logd ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port);
     118              : 
     119           11 :   bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
     120           11 :   if (!bh) {
     121            0 :     nns_edge_loge ("Failed to allocate memory for broker handle.");
     122           11 :     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
     123              :   }
     124              : 
     125           11 :   mosquitto_lib_init ();
     126           11 :   client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ());
     127              : 
     128           11 :   handle = mosquitto_new (client_id, TRUE, NULL);
     129           11 :   SAFE_FREE (client_id);
     130              : 
     131           11 :   if (!handle) {
     132            0 :     nns_edge_loge ("Failed to create mosquitto client instance.");
     133            0 :     goto error;
     134              :   }
     135              : 
     136           11 :   mosquitto_user_data_set (handle, bh);
     137              : 
     138           11 :   mret = mosquitto_opts_set (handle, MOSQ_OPT_PROTOCOL_VERSION, &ver);
     139           11 :   if (MOSQ_ERR_SUCCESS != mret) {
     140            0 :     nns_edge_loge ("Failed to set MQTT protocol version 3.1.1.");
     141            0 :     goto error;
     142              :   }
     143              : 
     144           11 :   mosquitto_message_callback_set (handle, on_message_callback);
     145              : 
     146           11 :   mret = mosquitto_loop_start (handle);
     147           11 :   if (mret != MOSQ_ERR_SUCCESS) {
     148            0 :     nns_edge_loge ("Failed to start mosquitto loop.");
     149            0 :     goto error;
     150              :   }
     151              : 
     152           11 :   mret = mosquitto_connect (handle, host, port, 60);
     153           11 :   if (mret != MOSQ_ERR_SUCCESS) {
     154            1 :     nns_edge_loge ("Failed to connect MQTT.");
     155            1 :     goto error;
     156              :   }
     157              : 
     158           10 :   mret = nns_edge_queue_create (&bh->message_queue);
     159           10 :   if (NNS_EDGE_ERROR_NONE != mret) {
     160            0 :     nns_edge_loge ("Failed to create message queue.");
     161            0 :     goto error;
     162              :   }
     163           10 :   bh->mqtt_h = handle;
     164           10 :   bh->id = nns_edge_strdup (id);
     165           10 :   bh->topic = nns_edge_strdup (topic);
     166           10 :   bh->host = nns_edge_strdup (host);
     167           10 :   bh->port = port;
     168           10 :   bh->connected = true;
     169           10 :   bh->event_cb = NULL;
     170           10 :   bh->user_data = NULL;
     171           10 :   bh->cleared = false;
     172           10 :   nns_edge_lock_init (bh);
     173           10 :   nns_edge_cond_init (bh);
     174              : 
     175           10 :   *broker_h = bh;
     176           10 :   return NNS_EDGE_ERROR_NONE;
     177              : 
     178            1 : error:
     179            1 :   SAFE_FREE (bh);
     180            1 :   if (handle)
     181            1 :     mosquitto_destroy (handle);
     182            1 :   mosquitto_lib_cleanup ();
     183            1 :   return NNS_EDGE_ERROR_CONNECTION_FAILURE;
     184              : }
     185              : 
     186              : /**
     187              :  * @brief Connect to MQTT.
     188              :  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
     189              :  */
     190              : int
     191           19 : nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
     192              :     const int port, nns_edge_broker_h * broker_h)
     193              : {
     194           19 :   int ret = NNS_EDGE_ERROR_NONE;
     195              : 
     196           19 :   if (!STR_IS_VALID (id)) {
     197            2 :     nns_edge_loge ("Invalid param, given id is invalid.");
     198            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     199              :   }
     200              : 
     201           17 :   if (!STR_IS_VALID (topic)) {
     202            2 :     nns_edge_loge ("Invalid param, given topic is invalid.");
     203            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     204              :   }
     205              : 
     206           15 :   if (!STR_IS_VALID (host)) {
     207            2 :     nns_edge_loge ("Invalid param, given host is invalid.");
     208            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     209              :   }
     210              : 
     211           13 :   if (!PORT_IS_VALID (port)) {
     212            1 :     nns_edge_loge ("Invalid param, given port is invalid.");
     213            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     214              :   }
     215              : 
     216           12 :   if (!broker_h) {
     217            1 :     nns_edge_loge ("Invalid param, broker_h should not be null.");
     218            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     219              :   }
     220              : 
     221           11 :   ret = _nns_edge_mqtt_init_client (id, topic, host, port, broker_h);
     222           11 :   if (NNS_EDGE_ERROR_NONE != ret)
     223            1 :     nns_edge_loge ("Failed to initialize the MQTT client object.");
     224              : 
     225           11 :   return ret;
     226              : }
     227              : 
     228              : /**
     229              :  * @brief Publish callback for clearing retained message.
     230              :  * @note This callback is called both if the message is sent successfully or if the broker responded with an error.
     231              :  */
     232              : static void
     233            7 : _clear_retained_cb (struct mosquitto *mosq, void *obj, int mid)
     234              : {
     235            7 :   nns_edge_broker_s *bh = NULL;
     236              : 
     237            7 :   bh = (nns_edge_broker_s *) mosquitto_userdata (mosq);
     238              : 
     239            7 :   if (!bh || bh->cleared)
     240            0 :     return;
     241              : 
     242            7 :   nns_edge_lock (bh);
     243            7 :   bh->cleared = true;
     244            7 :   nns_edge_cond_signal (bh);
     245            7 :   nns_edge_unlock (bh);
     246              : }
     247              : 
     248              : /**
     249              :  * @brief Clear retained message.
     250              :  */
     251              : static void
     252           10 : _nns_edge_clear_retained (nns_edge_broker_s * bh)
     253              : {
     254              :   struct mosquitto *handle;
     255           10 :   unsigned int wait = 0U;
     256              : 
     257           10 :   if (!bh)
     258            0 :     return;
     259              : 
     260           10 :   handle = bh->mqtt_h;
     261           10 :   if (handle) {
     262           10 :     nns_edge_lock (bh);
     263           10 :     bh->cleared = false;
     264              : 
     265           10 :     mosquitto_publish_callback_set (handle, _clear_retained_cb);
     266           10 :     mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
     267              : 
     268              :     /* Wait up to 10 seconds. */
     269         3016 :     while (!bh->cleared && ++wait < 1000U)
     270         3006 :       nns_edge_cond_wait_until (bh, 10);
     271              : 
     272           10 :     mosquitto_publish_callback_set (handle, NULL);
     273           10 :     bh->cleared = true;
     274           10 :     nns_edge_unlock (bh);
     275              :   }
     276              : }
     277              : 
     278              : /**
     279              :  * @brief Close the connection to MQTT.
     280              :  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
     281              :  */
     282              : int
     283           11 : nns_edge_mqtt_close (nns_edge_broker_h broker_h)
     284              : {
     285              :   nns_edge_broker_s *bh;
     286              :   struct mosquitto *handle;
     287              : 
     288           11 :   if (!broker_h) {
     289            1 :     nns_edge_loge ("Invalid param, given broker handle is invalid.");
     290            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     291              :   }
     292              : 
     293           10 :   bh = (nns_edge_broker_s *) broker_h;
     294           10 :   handle = bh->mqtt_h;
     295              : 
     296           10 :   if (handle) {
     297           10 :     nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
     298              :         bh->id, bh->host, bh->port);
     299              : 
     300           10 :     _nns_edge_clear_retained (bh);
     301              : 
     302           10 :     mosquitto_disconnect (handle);
     303           10 :     mosquitto_destroy (handle);
     304           10 :     mosquitto_lib_cleanup ();
     305              :   }
     306              : 
     307           10 :   bh->mqtt_h = NULL;
     308           10 :   bh->connected = false;
     309              : 
     310           10 :   nns_edge_queue_destroy (bh->message_queue);
     311           10 :   bh->message_queue = NULL;
     312           10 :   nns_edge_lock_destroy (bh);
     313           10 :   nns_edge_cond_destroy (bh);
     314           10 :   SAFE_FREE (bh->id);
     315           10 :   SAFE_FREE (bh->topic);
     316           10 :   SAFE_FREE (bh->host);
     317           10 :   SAFE_FREE (bh);
     318              : 
     319           10 :   return NNS_EDGE_ERROR_NONE;
     320              : }
     321              : 
     322              : /**
     323              :  * @brief Internal util function to send edge-data via MQTT connection.
     324              :  */
     325              : int
     326            5 : nns_edge_mqtt_publish_data (nns_edge_broker_h broker_h, nns_edge_data_h data_h)
     327              : {
     328              :   int ret;
     329            5 :   void *data = NULL;
     330              :   nns_size_t size;
     331              : 
     332            5 :   ret = nns_edge_data_serialize (data_h, &data, &size);
     333            5 :   if (NNS_EDGE_ERROR_NONE != ret) {
     334            0 :     nns_edge_loge ("Failed to serialize the edge data.");
     335            5 :     return ret;
     336              :   }
     337              : 
     338            5 :   ret = nns_edge_mqtt_publish (broker_h, data, size);
     339            5 :   if (NNS_EDGE_ERROR_NONE != ret)
     340            0 :     nns_edge_loge ("Failed to send data to destination.");
     341              : 
     342            5 :   SAFE_FREE (data);
     343            5 :   return ret;
     344              : }
     345              : 
     346              : /**
     347              :  * @brief Publish raw data.
     348              :  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
     349              :  */
     350              : int
     351            9 : nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data,
     352              :     const int length)
     353              : {
     354              :   nns_edge_broker_s *bh;
     355              :   struct mosquitto *handle;
     356              :   int ret;
     357              : 
     358            9 :   if (!broker_h) {
     359            1 :     nns_edge_loge ("Invalid param, given broker handle is invalid.");
     360            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     361              :   }
     362              : 
     363            8 :   if (!data || length <= 0) {
     364            2 :     nns_edge_loge ("Invalid param, given data is invalid.");
     365            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     366              :   }
     367              : 
     368            6 :   bh = (nns_edge_broker_s *) broker_h;
     369            6 :   handle = bh->mqtt_h;
     370              : 
     371            6 :   if (!handle) {
     372            0 :     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
     373            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     374              :   }
     375              : 
     376            6 :   if (!bh->connected) {
     377            0 :     nns_edge_loge ("Failed to publish message, MQTT is not connected.");
     378            0 :     return NNS_EDGE_ERROR_IO;
     379              :   }
     380              : 
     381              :   /* Publish a message (default QoS 1 - at least once and retained true). */
     382            6 :   ret = mosquitto_publish (handle, NULL, bh->topic, length, data, 1, true);
     383            6 :   if (MOSQ_ERR_SUCCESS != ret) {
     384            0 :     nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
     385              :         bh->id, bh->topic);
     386            0 :     return NNS_EDGE_ERROR_IO;
     387              :   }
     388              : 
     389            6 :   return NNS_EDGE_ERROR_NONE;
     390              : }
     391              : 
     392              : /**
     393              :  * @brief Subscribe a topic.
     394              :  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
     395              :  */
     396              : int
     397            4 : nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
     398              : {
     399              :   nns_edge_broker_s *bh;
     400              :   void *handle;
     401              :   int ret;
     402              : 
     403            4 :   if (!broker_h) {
     404            1 :     nns_edge_loge ("Invalid param, given broker handle is invalid.");
     405            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     406              :   }
     407              : 
     408            3 :   bh = (nns_edge_broker_s *) broker_h;
     409            3 :   handle = bh->mqtt_h;
     410              : 
     411            3 :   if (!handle) {
     412            0 :     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
     413            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     414              :   }
     415              : 
     416            3 :   if (!bh->connected) {
     417            0 :     nns_edge_loge ("Failed to subscribe, MQTT is not connected.");
     418            0 :     return NNS_EDGE_ERROR_IO;
     419              :   }
     420              : 
     421              :   /* Subscribe a topic (default QoS 1 - at least once). */
     422            3 :   ret = mosquitto_subscribe (handle, NULL, bh->topic, 1);
     423            3 :   if (MOSQ_ERR_SUCCESS != ret) {
     424            0 :     nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
     425              :         bh->id, bh->topic);
     426            0 :     return NNS_EDGE_ERROR_IO;
     427              :   }
     428              : 
     429            3 :   return NNS_EDGE_ERROR_NONE;
     430              : }
     431              : 
     432              : /**
     433              :  * @brief Get message from mqtt broker within timeout (0 for infinite timeout).
     434              :  */
     435              : int
     436            6 : nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
     437              :     nns_size_t * msg_len, unsigned int timeout)
     438              : {
     439              :   int ret;
     440              :   nns_edge_broker_s *bh;
     441              : 
     442            6 :   if (!broker_h) {
     443            1 :     nns_edge_loge ("Invalid param, given broker handle is invalid.");
     444            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     445              :   }
     446              : 
     447            5 :   if (!msg) {
     448            1 :     nns_edge_loge ("Invalid param, given msg param is invalid.");
     449            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     450              :   }
     451              : 
     452            4 :   bh = (nns_edge_broker_s *) broker_h;
     453              : 
     454              :   /**
     455              :    * The time to wait for new data, in milliseconds.
     456              :    * (Default: 0 for infinite timeout)
     457              :    */
     458            4 :   ret = nns_edge_queue_wait_pop (bh->message_queue, timeout, msg, msg_len);
     459            4 :   if (NNS_EDGE_ERROR_NONE != ret)
     460            3 :     nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     461              : 
     462            4 :   return ret;
     463              : }
     464              : 
     465              : /**
     466              :  * @brief Check mqtt connection
     467              :  */
     468              : bool
     469           11 : nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
     470              : {
     471              :   nns_edge_broker_s *bh;
     472              : 
     473           11 :   if (!broker_h) {
     474            6 :     nns_edge_loge ("Invalid param, given broker handle is invalid.");
     475            6 :     return false;
     476              :   }
     477              : 
     478            5 :   bh = (nns_edge_broker_s *) broker_h;
     479              : 
     480            5 :   return bh->connected;
     481              : }
     482              : 
     483              : /**
     484              :  * @brief Set event callback for new message.
     485              :  */
     486              : int
     487            4 : nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h,
     488              :     nns_edge_event_cb cb, void *user_data)
     489              : {
     490              :   nns_edge_broker_s *bh;
     491              : 
     492            4 :   if (!broker_h) {
     493            1 :     nns_edge_loge ("Invalid param, given MQTT handle is invalid.");
     494            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     495              :   }
     496              : 
     497            3 :   bh = (nns_edge_broker_s *) broker_h;
     498              : 
     499            3 :   bh->event_cb = cb;
     500            3 :   bh->user_data = user_data;
     501              : 
     502            3 :   return NNS_EDGE_ERROR_NONE;
     503              : }
        

Generated by: LCOV version 2.0-1