Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file nnstreamer-edge-mqtt-mosquitto.c
6 : * @date 14 Oct 2022
7 : * @brief Internal functions to support MQTT protocol (mosquitto Library).
8 : * @see https://github.com/nnstreamer/nnstreamer-edge
9 : * @author Gichan Jang <gichan2.jang@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #if !defined(ENABLE_MQTT)
14 : #error "This file can be built with mosquitto library."
15 : #endif
16 :
17 : #include <mosquitto.h>
18 : #include "nnstreamer-edge-mqtt.h"
19 : #include "nnstreamer-edge-log.h"
20 : #include "nnstreamer-edge-util.h"
21 : #include "nnstreamer-edge-queue.h"
22 : #include "nnstreamer-edge-data.h"
23 : #include "nnstreamer-edge-event.h"
24 :
25 : /**
26 : * @brief Data structure for mqtt broker handle.
27 : */
28 : typedef struct
29 : {
30 : void *mqtt_h;
31 : nns_edge_queue_h message_queue;
32 : char *id;
33 : char *topic;
34 : char *host;
35 : int port;
36 : bool connected;
37 :
38 : /* event callback for new message */
39 : nns_edge_event_cb event_cb;
40 : void *user_data;
41 :
42 : pthread_mutex_t lock;
43 : pthread_cond_t cond;
44 : bool cleared;
45 : } nns_edge_broker_s;
46 :
47 : /**
48 : * @brief Callback function to be called when a message is arrived.
49 : */
50 : static void
51 14 : on_message_callback (struct mosquitto *client, void *data,
52 : const struct mosquitto_message *message)
53 : {
54 14 : nns_edge_broker_s *bh = (nns_edge_broker_s *) data;
55 14 : char *msg = NULL;
56 : nns_size_t msg_len;
57 : int ret;
58 :
59 14 : if (!bh) {
60 0 : nns_edge_loge ("Invalid param, given broker handle is invalid.");
61 0 : return;
62 : }
63 :
64 14 : if (0 >= message->payloadlen) {
65 3 : nns_edge_logw ("Invalid payload length: %d", message->payloadlen);
66 3 : return;
67 : }
68 :
69 11 : nns_edge_logd ("MQTT message is arrived (ID:%d, Topic:%s).",
70 : message->mid, message->topic);
71 :
72 11 : msg_len = (nns_size_t) message->payloadlen;
73 11 : msg = nns_edge_memdup (message->payload, msg_len);
74 :
75 11 : if (msg) {
76 11 : if (bh->event_cb) {
77 : nns_edge_data_h data_h;
78 :
79 10 : if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
80 0 : nns_edge_loge ("Failed to create data handle in msg thread.");
81 0 : SAFE_FREE (msg);
82 0 : return;
83 : }
84 :
85 10 : nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len);
86 :
87 10 : ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data,
88 : NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
89 : NULL);
90 10 : if (ret != NNS_EDGE_ERROR_NONE)
91 0 : nns_edge_loge ("Failed to send an event for received message.");
92 :
93 10 : nns_edge_data_destroy (data_h);
94 10 : SAFE_FREE (msg);
95 : } else {
96 : /* Push received message into msg queue. DO NOT free msg here. */
97 1 : nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
98 : }
99 : }
100 :
101 11 : return;
102 : }
103 :
104 : /**
105 : * @brief Initializes MQTT object.
106 : */
107 : static int
108 11 : _nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host,
109 : const int port, nns_edge_broker_h * broker_h)
110 : {
111 : nns_edge_broker_s *bh;
112 : int mret;
113 : char *client_id;
114 : struct mosquitto *handle;
115 11 : int ver = MQTT_PROTOCOL_V311; /** @todo check mqtt version (TizenRT repo) */
116 :
117 11 : nns_edge_logd ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port);
118 :
119 11 : bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
120 11 : if (!bh) {
121 0 : nns_edge_loge ("Failed to allocate memory for broker handle.");
122 11 : return NNS_EDGE_ERROR_OUT_OF_MEMORY;
123 : }
124 :
125 11 : mosquitto_lib_init ();
126 11 : client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ());
127 :
128 11 : handle = mosquitto_new (client_id, TRUE, NULL);
129 11 : SAFE_FREE (client_id);
130 :
131 11 : if (!handle) {
132 0 : nns_edge_loge ("Failed to create mosquitto client instance.");
133 0 : goto error;
134 : }
135 :
136 11 : mosquitto_user_data_set (handle, bh);
137 :
138 11 : mret = mosquitto_opts_set (handle, MOSQ_OPT_PROTOCOL_VERSION, &ver);
139 11 : if (MOSQ_ERR_SUCCESS != mret) {
140 0 : nns_edge_loge ("Failed to set MQTT protocol version 3.1.1.");
141 0 : goto error;
142 : }
143 :
144 11 : mosquitto_message_callback_set (handle, on_message_callback);
145 :
146 11 : mret = mosquitto_loop_start (handle);
147 11 : if (mret != MOSQ_ERR_SUCCESS) {
148 0 : nns_edge_loge ("Failed to start mosquitto loop.");
149 0 : goto error;
150 : }
151 :
152 11 : mret = mosquitto_connect (handle, host, port, 60);
153 11 : if (mret != MOSQ_ERR_SUCCESS) {
154 1 : nns_edge_loge ("Failed to connect MQTT.");
155 1 : goto error;
156 : }
157 :
158 10 : mret = nns_edge_queue_create (&bh->message_queue);
159 10 : if (NNS_EDGE_ERROR_NONE != mret) {
160 0 : nns_edge_loge ("Failed to create message queue.");
161 0 : goto error;
162 : }
163 10 : bh->mqtt_h = handle;
164 10 : bh->id = nns_edge_strdup (id);
165 10 : bh->topic = nns_edge_strdup (topic);
166 10 : bh->host = nns_edge_strdup (host);
167 10 : bh->port = port;
168 10 : bh->connected = true;
169 10 : bh->event_cb = NULL;
170 10 : bh->user_data = NULL;
171 10 : bh->cleared = false;
172 10 : nns_edge_lock_init (bh);
173 10 : nns_edge_cond_init (bh);
174 :
175 10 : *broker_h = bh;
176 10 : return NNS_EDGE_ERROR_NONE;
177 :
178 1 : error:
179 1 : SAFE_FREE (bh);
180 1 : if (handle)
181 1 : mosquitto_destroy (handle);
182 1 : mosquitto_lib_cleanup ();
183 1 : return NNS_EDGE_ERROR_CONNECTION_FAILURE;
184 : }
185 :
186 : /**
187 : * @brief Connect to MQTT.
188 : * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
189 : */
190 : int
191 19 : nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
192 : const int port, nns_edge_broker_h * broker_h)
193 : {
194 19 : int ret = NNS_EDGE_ERROR_NONE;
195 :
196 19 : if (!STR_IS_VALID (id)) {
197 2 : nns_edge_loge ("Invalid param, given id is invalid.");
198 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
199 : }
200 :
201 17 : if (!STR_IS_VALID (topic)) {
202 2 : nns_edge_loge ("Invalid param, given topic is invalid.");
203 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
204 : }
205 :
206 15 : if (!STR_IS_VALID (host)) {
207 2 : nns_edge_loge ("Invalid param, given host is invalid.");
208 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
209 : }
210 :
211 13 : if (!PORT_IS_VALID (port)) {
212 1 : nns_edge_loge ("Invalid param, given port is invalid.");
213 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
214 : }
215 :
216 12 : if (!broker_h) {
217 1 : nns_edge_loge ("Invalid param, broker_h should not be null.");
218 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
219 : }
220 :
221 11 : ret = _nns_edge_mqtt_init_client (id, topic, host, port, broker_h);
222 11 : if (NNS_EDGE_ERROR_NONE != ret)
223 1 : nns_edge_loge ("Failed to initialize the MQTT client object.");
224 :
225 11 : return ret;
226 : }
227 :
228 : /**
229 : * @brief Publish callback for clearing retained message.
230 : * @note This callback is called both if the message is sent successfully or if the broker responded with an error.
231 : */
232 : static void
233 7 : _clear_retained_cb (struct mosquitto *mosq, void *obj, int mid)
234 : {
235 7 : nns_edge_broker_s *bh = NULL;
236 :
237 7 : bh = (nns_edge_broker_s *) mosquitto_userdata (mosq);
238 :
239 7 : if (!bh || bh->cleared)
240 0 : return;
241 :
242 7 : nns_edge_lock (bh);
243 7 : bh->cleared = true;
244 7 : nns_edge_cond_signal (bh);
245 7 : nns_edge_unlock (bh);
246 : }
247 :
248 : /**
249 : * @brief Clear retained message.
250 : */
251 : static void
252 10 : _nns_edge_clear_retained (nns_edge_broker_s * bh)
253 : {
254 : struct mosquitto *handle;
255 10 : unsigned int wait = 0U;
256 :
257 10 : if (!bh)
258 0 : return;
259 :
260 10 : handle = bh->mqtt_h;
261 10 : if (handle) {
262 10 : nns_edge_lock (bh);
263 10 : bh->cleared = false;
264 :
265 10 : mosquitto_publish_callback_set (handle, _clear_retained_cb);
266 10 : mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
267 :
268 : /* Wait up to 10 seconds. */
269 3016 : while (!bh->cleared && ++wait < 1000U)
270 3006 : nns_edge_cond_wait_until (bh, 10);
271 :
272 10 : mosquitto_publish_callback_set (handle, NULL);
273 10 : bh->cleared = true;
274 10 : nns_edge_unlock (bh);
275 : }
276 : }
277 :
278 : /**
279 : * @brief Close the connection to MQTT.
280 : * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
281 : */
282 : int
283 11 : nns_edge_mqtt_close (nns_edge_broker_h broker_h)
284 : {
285 : nns_edge_broker_s *bh;
286 : struct mosquitto *handle;
287 :
288 11 : if (!broker_h) {
289 1 : nns_edge_loge ("Invalid param, given broker handle is invalid.");
290 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
291 : }
292 :
293 10 : bh = (nns_edge_broker_s *) broker_h;
294 10 : handle = bh->mqtt_h;
295 :
296 10 : if (handle) {
297 10 : nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
298 : bh->id, bh->host, bh->port);
299 :
300 10 : _nns_edge_clear_retained (bh);
301 :
302 10 : mosquitto_disconnect (handle);
303 10 : mosquitto_destroy (handle);
304 10 : mosquitto_lib_cleanup ();
305 : }
306 :
307 10 : bh->mqtt_h = NULL;
308 10 : bh->connected = false;
309 :
310 10 : nns_edge_queue_destroy (bh->message_queue);
311 10 : bh->message_queue = NULL;
312 10 : nns_edge_lock_destroy (bh);
313 10 : nns_edge_cond_destroy (bh);
314 10 : SAFE_FREE (bh->id);
315 10 : SAFE_FREE (bh->topic);
316 10 : SAFE_FREE (bh->host);
317 10 : SAFE_FREE (bh);
318 :
319 10 : return NNS_EDGE_ERROR_NONE;
320 : }
321 :
322 : /**
323 : * @brief Internal util function to send edge-data via MQTT connection.
324 : */
325 : int
326 5 : nns_edge_mqtt_publish_data (nns_edge_broker_h broker_h, nns_edge_data_h data_h)
327 : {
328 : int ret;
329 5 : void *data = NULL;
330 : nns_size_t size;
331 :
332 5 : ret = nns_edge_data_serialize (data_h, &data, &size);
333 5 : if (NNS_EDGE_ERROR_NONE != ret) {
334 0 : nns_edge_loge ("Failed to serialize the edge data.");
335 5 : return ret;
336 : }
337 :
338 5 : ret = nns_edge_mqtt_publish (broker_h, data, size);
339 5 : if (NNS_EDGE_ERROR_NONE != ret)
340 0 : nns_edge_loge ("Failed to send data to destination.");
341 :
342 5 : SAFE_FREE (data);
343 5 : return ret;
344 : }
345 :
346 : /**
347 : * @brief Publish raw data.
348 : * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
349 : */
350 : int
351 9 : nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data,
352 : const int length)
353 : {
354 : nns_edge_broker_s *bh;
355 : struct mosquitto *handle;
356 : int ret;
357 :
358 9 : if (!broker_h) {
359 1 : nns_edge_loge ("Invalid param, given broker handle is invalid.");
360 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
361 : }
362 :
363 8 : if (!data || length <= 0) {
364 2 : nns_edge_loge ("Invalid param, given data is invalid.");
365 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
366 : }
367 :
368 6 : bh = (nns_edge_broker_s *) broker_h;
369 6 : handle = bh->mqtt_h;
370 :
371 6 : if (!handle) {
372 0 : nns_edge_loge ("Invalid state, MQTT connection was not completed.");
373 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
374 : }
375 :
376 6 : if (!bh->connected) {
377 0 : nns_edge_loge ("Failed to publish message, MQTT is not connected.");
378 0 : return NNS_EDGE_ERROR_IO;
379 : }
380 :
381 : /* Publish a message (default QoS 1 - at least once and retained true). */
382 6 : ret = mosquitto_publish (handle, NULL, bh->topic, length, data, 1, true);
383 6 : if (MOSQ_ERR_SUCCESS != ret) {
384 0 : nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
385 : bh->id, bh->topic);
386 0 : return NNS_EDGE_ERROR_IO;
387 : }
388 :
389 6 : return NNS_EDGE_ERROR_NONE;
390 : }
391 :
392 : /**
393 : * @brief Subscribe a topic.
394 : * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
395 : */
396 : int
397 4 : nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
398 : {
399 : nns_edge_broker_s *bh;
400 : void *handle;
401 : int ret;
402 :
403 4 : if (!broker_h) {
404 1 : nns_edge_loge ("Invalid param, given broker handle is invalid.");
405 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
406 : }
407 :
408 3 : bh = (nns_edge_broker_s *) broker_h;
409 3 : handle = bh->mqtt_h;
410 :
411 3 : if (!handle) {
412 0 : nns_edge_loge ("Invalid state, MQTT connection was not completed.");
413 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
414 : }
415 :
416 3 : if (!bh->connected) {
417 0 : nns_edge_loge ("Failed to subscribe, MQTT is not connected.");
418 0 : return NNS_EDGE_ERROR_IO;
419 : }
420 :
421 : /* Subscribe a topic (default QoS 1 - at least once). */
422 3 : ret = mosquitto_subscribe (handle, NULL, bh->topic, 1);
423 3 : if (MOSQ_ERR_SUCCESS != ret) {
424 0 : nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
425 : bh->id, bh->topic);
426 0 : return NNS_EDGE_ERROR_IO;
427 : }
428 :
429 3 : return NNS_EDGE_ERROR_NONE;
430 : }
431 :
432 : /**
433 : * @brief Get message from mqtt broker within timeout (0 for infinite timeout).
434 : */
435 : int
436 6 : nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
437 : nns_size_t * msg_len, unsigned int timeout)
438 : {
439 : int ret;
440 : nns_edge_broker_s *bh;
441 :
442 6 : if (!broker_h) {
443 1 : nns_edge_loge ("Invalid param, given broker handle is invalid.");
444 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
445 : }
446 :
447 5 : if (!msg) {
448 1 : nns_edge_loge ("Invalid param, given msg param is invalid.");
449 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
450 : }
451 :
452 4 : bh = (nns_edge_broker_s *) broker_h;
453 :
454 : /**
455 : * The time to wait for new data, in milliseconds.
456 : * (Default: 0 for infinite timeout)
457 : */
458 4 : ret = nns_edge_queue_wait_pop (bh->message_queue, timeout, msg, msg_len);
459 4 : if (NNS_EDGE_ERROR_NONE != ret)
460 3 : nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
461 :
462 4 : return ret;
463 : }
464 :
465 : /**
466 : * @brief Check mqtt connection
467 : */
468 : bool
469 11 : nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
470 : {
471 : nns_edge_broker_s *bh;
472 :
473 11 : if (!broker_h) {
474 6 : nns_edge_loge ("Invalid param, given broker handle is invalid.");
475 6 : return false;
476 : }
477 :
478 5 : bh = (nns_edge_broker_s *) broker_h;
479 :
480 5 : return bh->connected;
481 : }
482 :
483 : /**
484 : * @brief Set event callback for new message.
485 : */
486 : int
487 4 : nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h,
488 : nns_edge_event_cb cb, void *user_data)
489 : {
490 : nns_edge_broker_s *bh;
491 :
492 4 : if (!broker_h) {
493 1 : nns_edge_loge ("Invalid param, given MQTT handle is invalid.");
494 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
495 : }
496 :
497 3 : bh = (nns_edge_broker_s *) broker_h;
498 :
499 3 : bh->event_cb = cb;
500 3 : bh->user_data = user_data;
501 :
502 3 : return NNS_EDGE_ERROR_NONE;
503 : }
|