Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
4 : */
5 : /**
6 : * @file mqttsrc.c
7 : * @date 08 Mar 2021
8 : * @brief Subscribe a MQTT topic and push incoming data to the GStreamer pipeline
9 : * @see https://github.com/nnstreamer/nnstreamer
10 : * @author Wook Song <wook16.song@samsung.com>
11 : * @bug No known bugs except for NYI items
12 : */
13 :
14 : #ifdef HAVE_CONFIG_H
15 : #include <config.h>
16 : #endif
17 :
18 : #ifdef G_OS_WIN32
19 : #include <process.h>
20 : #else
21 : #include <sys/types.h>
22 : #include <unistd.h>
23 : #endif
24 :
25 : #include <gst/base/gstbasesrc.h>
26 : #include <MQTTAsync.h>
27 : #include <nnstreamer_util.h>
28 :
29 : #include "mqttsrc.h"
30 :
31 : static GstStaticPadTemplate src_pad_template = GST_STATIC_PAD_TEMPLATE ("src",
32 : GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
33 :
34 : #define gst_mqtt_src_parent_class parent_class
35 176 : G_DEFINE_TYPE (GstMqttSrc, gst_mqtt_src, GST_TYPE_BASE_SRC);
36 :
37 : GST_DEBUG_CATEGORY_STATIC (gst_mqtt_src_debug);
38 : #define GST_CAT_DEFAULT gst_mqtt_src_debug
39 :
40 : enum
41 : {
42 : PROP_0,
43 :
44 : PROP_DEBUG,
45 : PROP_IS_LIVE,
46 : PROP_MQTT_CLIENT_ID,
47 : PROP_MQTT_HOST_ADDRESS,
48 : PROP_MQTT_HOST_PORT,
49 : PROP_MQTT_SUB_TOPIC,
50 : PROP_MQTT_SUB_TIMEOUT,
51 : PROP_MQTT_OPT_CLEANSESSION,
52 : PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
53 : PROP_MQTT_QOS,
54 :
55 : PROP_LAST
56 : };
57 :
58 : enum
59 : {
60 : DEFAULT_DEBUG = FALSE,
61 : DEFAULT_IS_LIVE = TRUE,
62 : DEFAULT_MQTT_OPT_CLEANSESSION = TRUE,
63 : DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60, /* 1 minute */
64 : DEFAULT_MQTT_SUB_TIMEOUT = 10000000, /* 10 seconds */
65 : DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000, /* 1 seconds */
66 : DEFAULT_MQTT_QOS = 2, /* Once and one only */
67 : };
68 :
69 : static guint8 src_client_id = 0;
70 : static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
71 : static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
72 : static const gchar TAG_ERR_MQTTSRC[] = "ERROR: MQTTSrc";
73 : static const gchar DEFAULT_MQTT_CLIENT_ID[] =
74 : "$HOSTNAME_$PID_^[0-9][0-9]?$|^255$";
75 : static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_src%u";
76 :
77 : /** Function prototype declarations */
78 : static void
79 : gst_mqtt_src_set_property (GObject * object, guint prop_id,
80 : const GValue * value, GParamSpec * pspec);
81 : static void
82 : gst_mqtt_src_get_property (GObject * object, guint prop_id,
83 : GValue * value, GParamSpec * pspec);
84 : static void gst_mqtt_src_class_finalize (GObject * object);
85 :
86 : static GstStateChangeReturn
87 : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition);
88 :
89 : static gboolean gst_mqtt_src_start (GstBaseSrc * basesrc);
90 : static gboolean gst_mqtt_src_stop (GstBaseSrc * basesrc);
91 : static GstCaps *gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
92 : static gboolean gst_mqtt_src_renegotiate (GstBaseSrc * basesrc);
93 :
94 : static void
95 : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
96 : GstClockTime * start, GstClockTime * end);
97 : static gboolean gst_mqtt_src_is_seekable (GstBaseSrc * basesrc);
98 : static GstFlowReturn
99 : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
100 : GstBuffer ** buf);
101 : static gboolean gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query);
102 :
103 : static gboolean gst_mqtt_src_get_debug (GstMqttSrc * self);
104 : static void gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag);
105 : static gboolean gst_mqtt_src_get_is_live (GstMqttSrc * self);
106 : static void gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag);
107 : static gchar *gst_mqtt_src_get_client_id (GstMqttSrc * self);
108 : static void gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id);
109 : static gchar *gst_mqtt_src_get_host_address (GstMqttSrc * self);
110 : static void gst_mqtt_src_set_host_address (GstMqttSrc * self,
111 : const gchar * addr);
112 : static gchar *gst_mqtt_src_get_host_port (GstMqttSrc * self);
113 : static void gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port);
114 : static gint64 gst_mqtt_src_get_sub_timeout (GstMqttSrc * self);
115 : static void gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t);
116 : static gchar *gst_mqtt_src_get_sub_topic (GstMqttSrc * self);
117 : static void gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic);
118 : static gboolean gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self);
119 : static void gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self,
120 : const gboolean val);
121 : static gint gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self);
122 : static void gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self,
123 : const gint num);
124 : static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self);
125 : static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos);
126 :
127 : static void cb_mqtt_on_connection_lost (void *context, char *cause);
128 : static int cb_mqtt_on_message_arrived (void *context, char *topic_name,
129 : int topic_len, MQTTAsync_message * message);
130 : static void cb_mqtt_on_connect (void *context,
131 : MQTTAsync_successData * response);
132 : static void cb_mqtt_on_connect_failure (void *context,
133 : MQTTAsync_failureData * response);
134 : static void cb_mqtt_on_subscribe (void *context,
135 : MQTTAsync_successData * response);
136 : static void cb_mqtt_on_subscribe_failure (void *context,
137 : MQTTAsync_failureData * response);
138 : static void cb_mqtt_on_unsubscribe (void *context,
139 : MQTTAsync_successData * response);
140 : static void cb_mqtt_on_unsubscribe_failure (void *context,
141 : MQTTAsync_failureData * response);
142 :
143 : static void cb_memory_wrapped_destroy (void *p);
144 :
145 : static GstMQTTMessageHdr *_extract_mqtt_msg_hdr_from (GstMemory * mem,
146 : GstMemory ** hdr_mem, GstMapInfo * hdr_map_info);
147 : static void _put_timestamp_on_gst_buf (GstMqttSrc * self,
148 : GstMQTTMessageHdr * hdr, GstBuffer * buf);
149 : static gboolean _subscribe (GstMqttSrc * self);
150 : static gboolean _unsubscribe (GstMqttSrc * self);
151 :
152 : /**
153 : * @brief A utility function to check whether the timestamp marked by _put_timestamp_on_gst_buf () is valid or not
154 : */
155 : static inline gboolean
156 4 : _is_gst_buffer_timestamp_valid (GstBuffer * buf)
157 : {
158 4 : if (!GST_BUFFER_PTS_IS_VALID (buf) && !GST_BUFFER_DTS_IS_VALID (buf) &&
159 0 : !GST_BUFFER_DURATION_IS_VALID (buf))
160 0 : return FALSE;
161 4 : return TRUE;
162 : }
163 :
164 : /** Function definitions */
165 : /**
166 : * @brief Initialize GstMqttSrc object
167 : */
168 : static void
169 7 : gst_mqtt_src_init (GstMqttSrc * self)
170 : {
171 7 : MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
172 7 : MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
173 7 : GstBaseSrc *basesrc = GST_BASE_SRC (self);
174 :
175 7 : self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSRC);
176 :
177 7 : gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
178 7 : gst_base_src_set_async (basesrc, FALSE);
179 :
180 : /** init mqttsrc properties */
181 7 : self->mqtt_client_handle = NULL;
182 7 : self->debug = DEFAULT_DEBUG;
183 7 : self->is_live = DEFAULT_IS_LIVE;
184 7 : self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
185 7 : self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
186 7 : self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
187 7 : self->mqtt_topic = NULL;
188 7 : self->mqtt_sub_timeout = (gint64) DEFAULT_MQTT_SUB_TIMEOUT;
189 7 : self->mqtt_conn_opts = conn_opts;
190 7 : self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
191 7 : self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
192 7 : self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
193 7 : self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
194 7 : self->mqtt_conn_opts.context = self;
195 7 : self->mqtt_respn_opts = respn_opts;
196 7 : self->mqtt_respn_opts.onSuccess = NULL;
197 7 : self->mqtt_respn_opts.onFailure = NULL;
198 7 : self->mqtt_respn_opts.context = self;
199 7 : self->mqtt_qos = DEFAULT_MQTT_QOS;
200 :
201 : /** init private member variables */
202 7 : self->err = NULL;
203 7 : self->aqueue = g_async_queue_new ();
204 7 : g_cond_init (&self->mqtt_src_gcond);
205 7 : g_mutex_init (&self->mqtt_src_mutex);
206 7 : g_mutex_lock (&self->mqtt_src_mutex);
207 7 : self->is_connected = FALSE;
208 7 : self->is_subscribed = FALSE;
209 7 : self->latency = GST_CLOCK_TIME_NONE;
210 7 : g_mutex_unlock (&self->mqtt_src_mutex);
211 7 : self->base_time_epoch = GST_CLOCK_TIME_NONE;
212 7 : self->caps = NULL;
213 7 : self->num_dumped = 0;
214 :
215 7 : gst_base_src_set_live (basesrc, self->is_live);
216 7 : }
217 :
218 : /**
219 : * @brief Initialize GstMqttSrcClass object
220 : */
221 : static void
222 1 : gst_mqtt_src_class_init (GstMqttSrcClass * klass)
223 : {
224 1 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
225 1 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
226 1 : GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
227 :
228 1 : GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SRC, 0,
229 : "MQTT src");
230 :
231 1 : gobject_class->set_property = gst_mqtt_src_set_property;
232 1 : gobject_class->get_property = gst_mqtt_src_get_property;
233 1 : gobject_class->finalize = gst_mqtt_src_class_finalize;
234 :
235 1 : g_object_class_install_property (gobject_class, PROP_DEBUG,
236 : g_param_spec_boolean ("debug", "Debug",
237 : "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
238 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
239 :
240 1 : g_object_class_install_property (gobject_class, PROP_IS_LIVE,
241 : g_param_spec_boolean ("is-live", "Is Live",
242 : "Synchronize the incoming buffers' timestamp with the current running time",
243 : DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
244 :
245 1 : g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
246 : g_param_spec_string ("client-id", "Client ID",
247 : "The client identifier passed to the server (broker)",
248 : DEFAULT_MQTT_CLIENT_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249 :
250 1 : g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
251 : g_param_spec_string ("host", "Host", "Host (broker) to connect to",
252 : DEFAULT_MQTT_HOST_ADDRESS,
253 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254 :
255 1 : g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
256 : g_param_spec_string ("port", "Port",
257 : "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
258 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259 :
260 1 : g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TIMEOUT,
261 : g_param_spec_int64 ("sub-timeout", "Timeout for receiving a message",
262 : "The timeout (in microseconds) for receiving a message from subscribed topic",
263 : 1000000, G_MAXINT64, DEFAULT_MQTT_SUB_TIMEOUT,
264 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 :
266 1 : g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TOPIC,
267 : g_param_spec_string ("sub-topic", "Topic to Subscribe (mandatory)",
268 : "The topic's name to subscribe", NULL,
269 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270 :
271 1 : g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
272 : g_param_spec_boolean ("cleansession", "Cleansession",
273 : "When it is TRUE, the state information is discarded at connect and disconnect.",
274 : DEFAULT_MQTT_OPT_CLEANSESSION,
275 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276 :
277 1 : g_object_class_install_property (gobject_class,
278 : PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
279 : g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
280 : "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
281 : 1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
282 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283 :
284 1 : g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
285 : g_param_spec_int ("mqtt-qos", "mqtt QoS level",
286 : "The QoS level of MQTT.\n"
287 : "\t\t\t 0: At most once\n"
288 : "\t\t\t 1: At least once\n"
289 : "\t\t\t 2: Exactly once\n"
290 : "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
291 : 0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 :
293 1 : gstelement_class->change_state =
294 1 : GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
295 :
296 1 : gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_src_start);
297 1 : gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_src_stop);
298 1 : gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_caps);
299 1 : gstbasesrc_class->get_times = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_times);
300 1 : gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_mqtt_src_is_seekable);
301 1 : gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_mqtt_src_create);
302 1 : gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_src_query);
303 :
304 1 : gst_element_class_set_static_metadata (gstelement_class,
305 : "MQTT source", "Source/MQTT",
306 : "Subscribe a MQTT topic and push incoming data to the GStreamer pipeline",
307 : "Wook Song <wook16.song@samsung.com>");
308 1 : gst_element_class_add_static_pad_template (gstelement_class,
309 : &src_pad_template);
310 1 : }
311 :
312 : /**
313 : * @brief The setter for the mqttsrc's properties
314 : */
315 : static void
316 30 : gst_mqtt_src_set_property (GObject * object, guint prop_id,
317 : const GValue * value, GParamSpec * pspec)
318 : {
319 30 : GstMqttSrc *self = GST_MQTT_SRC (object);
320 :
321 30 : switch (prop_id) {
322 6 : case PROP_DEBUG:
323 6 : gst_mqtt_src_set_debug (self, g_value_get_boolean (value));
324 6 : break;
325 6 : case PROP_IS_LIVE:
326 6 : gst_mqtt_src_set_is_live (self, g_value_get_boolean (value));
327 6 : break;
328 1 : case PROP_MQTT_CLIENT_ID:
329 1 : gst_mqtt_src_set_client_id (self, g_value_get_string (value));
330 1 : break;
331 1 : case PROP_MQTT_HOST_ADDRESS:
332 1 : gst_mqtt_src_set_host_address (self, g_value_get_string (value));
333 1 : break;
334 1 : case PROP_MQTT_HOST_PORT:
335 1 : gst_mqtt_src_set_host_port (self, g_value_get_string (value));
336 1 : break;
337 6 : case PROP_MQTT_SUB_TIMEOUT:
338 6 : gst_mqtt_src_set_sub_timeout (self, g_value_get_int64 (value));
339 6 : break;
340 6 : case PROP_MQTT_SUB_TOPIC:
341 6 : gst_mqtt_src_set_sub_topic (self, g_value_get_string (value));
342 6 : break;
343 1 : case PROP_MQTT_OPT_CLEANSESSION:
344 1 : gst_mqtt_src_set_opt_cleansession (self, g_value_get_boolean (value));
345 1 : break;
346 1 : case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
347 1 : gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value));
348 1 : break;
349 1 : case PROP_MQTT_QOS:
350 1 : gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value));
351 1 : break;
352 0 : default:
353 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
354 0 : break;
355 : }
356 30 : }
357 :
358 : /**
359 : * @brief The getter for the mqttsrc's properties
360 : */
361 : static void
362 14 : gst_mqtt_src_get_property (GObject * object, guint prop_id,
363 : GValue * value, GParamSpec * pspec)
364 : {
365 14 : GstMqttSrc *self = GST_MQTT_SRC (object);
366 :
367 14 : switch (prop_id) {
368 2 : case PROP_DEBUG:
369 2 : g_value_set_boolean (value, gst_mqtt_src_get_debug (self));
370 2 : break;
371 1 : case PROP_IS_LIVE:
372 1 : g_value_set_boolean (value, gst_mqtt_src_get_is_live (self));
373 1 : break;
374 1 : case PROP_MQTT_CLIENT_ID:
375 1 : g_value_set_string (value, gst_mqtt_src_get_client_id (self));
376 1 : break;
377 1 : case PROP_MQTT_HOST_ADDRESS:
378 1 : g_value_set_string (value, gst_mqtt_src_get_host_address (self));
379 1 : break;
380 1 : case PROP_MQTT_HOST_PORT:
381 1 : g_value_set_string (value, gst_mqtt_src_get_host_port (self));
382 1 : break;
383 2 : case PROP_MQTT_SUB_TIMEOUT:
384 2 : g_value_set_int64 (value, gst_mqtt_src_get_sub_timeout (self));
385 2 : break;
386 1 : case PROP_MQTT_SUB_TOPIC:
387 1 : g_value_set_string (value, gst_mqtt_src_get_sub_topic (self));
388 1 : break;
389 1 : case PROP_MQTT_OPT_CLEANSESSION:
390 1 : g_value_set_boolean (value, gst_mqtt_src_get_opt_cleansession (self));
391 1 : break;
392 2 : case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
393 2 : g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self));
394 2 : break;
395 2 : case PROP_MQTT_QOS:
396 2 : g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self));
397 2 : break;
398 0 : default:
399 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
400 0 : break;
401 : }
402 14 : }
403 :
404 : /**
405 : * @brief Finalize GstMqttSrcClass object
406 : */
407 : static void
408 7 : gst_mqtt_src_class_finalize (GObject * object)
409 : {
410 7 : GstMqttSrc *self = GST_MQTT_SRC (object);
411 : GstBuffer *remained;
412 :
413 7 : if (self->mqtt_client_handle) {
414 0 : MQTTAsync_destroy (&self->mqtt_client_handle);
415 0 : self->mqtt_client_handle = NULL;
416 : }
417 :
418 7 : g_free (self->mqtt_client_id);
419 7 : g_free (self->mqtt_host_address);
420 7 : g_free (self->mqtt_host_port);
421 7 : g_free (self->mqtt_topic);
422 7 : gst_caps_replace (&self->caps, NULL);
423 :
424 7 : if (self->err)
425 2 : g_error_free (self->err);
426 :
427 7 : while ((remained = g_async_queue_try_pop (self->aqueue))) {
428 0 : gst_buffer_unref (remained);
429 : }
430 7 : g_clear_pointer (&self->aqueue, g_async_queue_unref);
431 :
432 7 : g_mutex_clear (&self->mqtt_src_mutex);
433 7 : G_OBJECT_CLASS (parent_class)->finalize (object);
434 7 : }
435 :
436 : /**
437 : * @brief Handle mqttsrc's state change
438 : */
439 : static GstStateChangeReturn
440 32 : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition)
441 : {
442 32 : GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
443 32 : GstMqttSrc *self = GST_MQTT_SRC (element);
444 32 : gboolean no_preroll = FALSE;
445 : GstClock *elem_clock;
446 : GstClockTime base_time;
447 : GstClockTime cur_time;
448 : GstClockTimeDiff diff;
449 :
450 32 : switch (transition) {
451 5 : case GST_STATE_CHANGE_NULL_TO_READY:
452 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
453 5 : if (self->err) {
454 0 : g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
455 0 : self->err->message);
456 0 : return GST_STATE_CHANGE_FAILURE;
457 : }
458 5 : break;
459 5 : case GST_STATE_CHANGE_READY_TO_PAUSED:
460 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
461 : /* Regardless of the 'is-live''s value, prerolling is not supported */
462 5 : no_preroll = TRUE;
463 5 : break;
464 5 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
465 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
466 5 : self->base_time_epoch = GST_CLOCK_TIME_NONE;
467 5 : elem_clock = gst_element_get_clock (element);
468 5 : if (!elem_clock)
469 0 : break;
470 5 : base_time = gst_element_get_base_time (element);
471 5 : cur_time = gst_clock_get_time (elem_clock);
472 5 : gst_object_unref (elem_clock);
473 5 : diff = GST_CLOCK_DIFF (base_time, cur_time);
474 5 : self->base_time_epoch =
475 5 : g_get_real_time () * GST_US_TO_NS_MULTIPLIER - diff;
476 :
477 : /** This handles the case when the state is changed to PLAYING again */
478 5 : if (GST_BASE_SRC_IS_STARTED (GST_BASE_SRC (self)) &&
479 5 : (self->is_connected == FALSE)) {
480 0 : int conn = MQTTAsync_reconnect (self->mqtt_client_handle);
481 :
482 0 : if (conn != MQTTASYNC_SUCCESS) {
483 0 : GST_ERROR_OBJECT (self, "Failed to re-subscribe to %s",
484 : self->mqtt_topic);
485 :
486 0 : return GST_STATE_CHANGE_FAILURE;
487 : }
488 : }
489 5 : break;
490 17 : default:
491 17 : break;
492 : }
493 :
494 32 : ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
495 :
496 32 : switch (transition) {
497 5 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
498 5 : if (self->is_subscribed && !_unsubscribe (self)) {
499 0 : GST_ERROR_OBJECT (self, "Cannot unsubscribe to %s", self->mqtt_topic);
500 : }
501 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
502 5 : break;
503 5 : case GST_STATE_CHANGE_PAUSED_TO_READY:
504 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
505 5 : break;
506 5 : case GST_STATE_CHANGE_READY_TO_NULL:
507 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
508 : default:
509 22 : break;
510 : }
511 :
512 32 : if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
513 0 : ret = GST_STATE_CHANGE_NO_PREROLL;
514 :
515 32 : return ret;
516 : }
517 :
518 : /**
519 : * @brief Start mqttsrc, called when state changed null to ready
520 : */
521 : static gboolean
522 5 : gst_mqtt_src_start (GstBaseSrc * basesrc)
523 : {
524 5 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
525 5 : gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
526 : self->mqtt_host_port);
527 : int ret;
528 : gint64 end_time;
529 :
530 5 : if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
531 5 : g_free (self->mqtt_client_id);
532 5 : self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
533 5 : g_get_host_name (), getpid (), src_client_id++);
534 : }
535 :
536 : /**
537 : * @todo Support other persistence mechanisms
538 : * MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
539 : * MQTTCLIENT_PERSISTENCE_DEFAULT: The default file system-based
540 : * persistence mechanism
541 : * MQTTCLIENT_PERSISTENCE_USER: An application-specific persistence
542 : * mechanism
543 : */
544 10 : ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
545 5 : self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
546 5 : g_free (haddr);
547 5 : if (ret != MQTTASYNC_SUCCESS)
548 0 : return FALSE;
549 :
550 5 : MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
551 : cb_mqtt_on_connection_lost, cb_mqtt_on_message_arrived, NULL);
552 :
553 5 : ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
554 5 : if (ret != MQTTASYNC_SUCCESS)
555 0 : goto error;
556 :
557 : /* Waiting for the connection */
558 5 : end_time = g_get_monotonic_time () +
559 : DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
560 5 : g_mutex_lock (&self->mqtt_src_mutex);
561 5 : while (!self->is_connected) {
562 0 : if (!g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex,
563 : end_time)) {
564 0 : g_mutex_unlock (&self->mqtt_src_mutex);
565 0 : g_critical ("Failed to connect to MQTT broker from mqttsrc."
566 : "Please check broker is running status or broker host address.");
567 0 : goto error;
568 : }
569 : }
570 5 : g_mutex_unlock (&self->mqtt_src_mutex);
571 5 : return TRUE;
572 :
573 0 : error:
574 0 : MQTTAsync_destroy (&self->mqtt_client_handle);
575 0 : self->mqtt_client_handle = NULL;
576 0 : return FALSE;
577 : }
578 :
579 : /**
580 : * @brief Stop mqttsrc, called when state changed ready to null
581 : */
582 : static gboolean
583 5 : gst_mqtt_src_stop (GstBaseSrc * basesrc)
584 : {
585 5 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
586 :
587 : /* todo */
588 5 : MQTTAsync_disconnect (self->mqtt_client_handle, NULL);
589 5 : g_mutex_lock (&self->mqtt_src_mutex);
590 5 : self->is_connected = FALSE;
591 5 : g_mutex_unlock (&self->mqtt_src_mutex);
592 5 : MQTTAsync_destroy (&self->mqtt_client_handle);
593 5 : self->mqtt_client_handle = NULL;
594 5 : return TRUE;
595 : }
596 :
597 : /**
598 : * @brief Get caps of subclass
599 : */
600 : static GstCaps *
601 47 : gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
602 : {
603 47 : GstPad *pad = GST_BASE_SRC_PAD (basesrc);
604 47 : GstCaps *cur_caps = gst_pad_get_current_caps (pad);
605 47 : GstCaps *caps = gst_caps_new_any ();
606 : UNUSED (filter);
607 :
608 47 : if (cur_caps) {
609 : GstCaps *intersection =
610 0 : gst_caps_intersect_full (cur_caps, caps, GST_CAPS_INTERSECT_FIRST);
611 :
612 0 : gst_caps_unref (cur_caps);
613 0 : gst_caps_unref (caps);
614 0 : caps = intersection;
615 : }
616 :
617 47 : return caps;
618 : }
619 :
620 : /**
621 : * @brief Do negotiation procedure again if it needed
622 : */
623 : static gboolean
624 4 : gst_mqtt_src_renegotiate (GstBaseSrc * basesrc)
625 : {
626 4 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
627 4 : GstCaps *peercaps = NULL;
628 : GstCaps *thiscaps;
629 4 : gboolean result = FALSE;
630 :
631 4 : if (self->caps == NULL || gst_caps_is_any (self->caps))
632 0 : goto no_nego_needed;
633 :
634 4 : thiscaps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (basesrc));
635 4 : if (thiscaps && gst_caps_is_equal (self->caps, thiscaps)) {
636 0 : gst_caps_unref (thiscaps);
637 0 : goto no_nego_needed;
638 : }
639 :
640 4 : if (thiscaps)
641 1 : gst_caps_unref (thiscaps);
642 :
643 4 : peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), self->caps);
644 4 : if (gst_caps_is_empty (peercaps) || peercaps == self->caps) {
645 1 : gst_caps_unref (peercaps);
646 1 : goto no_nego_needed;
647 : }
648 :
649 3 : if (gst_caps_is_any (peercaps)) {
650 0 : result = TRUE;
651 : } else {
652 3 : peercaps = gst_caps_fixate (peercaps);
653 3 : if (gst_caps_is_fixed (peercaps)) {
654 3 : result = gst_base_src_set_caps (basesrc, peercaps);
655 : }
656 : }
657 :
658 3 : gst_caps_unref (peercaps);
659 :
660 3 : return result;
661 :
662 1 : no_nego_needed:
663 : {
664 1 : GST_DEBUG_OBJECT (self, "no negotiation needed");
665 :
666 1 : return TRUE;
667 : }
668 : }
669 :
670 : /**
671 : * @brief Return the time information of the given buffer
672 : */
673 : static void
674 4 : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
675 : GstClockTime * start, GstClockTime * end)
676 : {
677 : GstClockTime sync_ts;
678 : GstClockTime duration;
679 : UNUSED (basesrc);
680 :
681 4 : sync_ts = GST_BUFFER_DTS (buffer);
682 4 : duration = GST_BUFFER_DURATION (buffer);
683 :
684 4 : if (!GST_CLOCK_TIME_IS_VALID (sync_ts))
685 0 : sync_ts = GST_BUFFER_PTS (buffer);
686 :
687 4 : if (GST_CLOCK_TIME_IS_VALID (sync_ts)) {
688 4 : *start = sync_ts;
689 4 : if (GST_CLOCK_TIME_IS_VALID (duration)) {
690 4 : *end = sync_ts + duration;
691 : }
692 : }
693 4 : }
694 :
695 : /**
696 : * @brief Check if source supports seeking
697 : * @note Seeking is not supported since this element handles live subscription data.
698 : */
699 : static gboolean
700 5 : gst_mqtt_src_is_seekable (GstBaseSrc * basesrc)
701 : {
702 : UNUSED (basesrc);
703 5 : return FALSE;
704 : }
705 :
706 : /**
707 : * @brief Create a buffer containing the subscribed data
708 : */
709 : static GstFlowReturn
710 6 : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
711 : GstBuffer ** buf)
712 : {
713 6 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
714 6 : gint64 elapsed = self->mqtt_sub_timeout;
715 : UNUSED (offset);
716 : UNUSED (size);
717 :
718 6 : g_mutex_lock (&self->mqtt_src_mutex);
719 6 : while ((!self->is_connected) || (!self->is_subscribed)) {
720 2 : gint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
721 :
722 2 : g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex, end_time);
723 2 : if (self->err) {
724 2 : g_mutex_unlock (&self->mqtt_src_mutex);
725 2 : goto ret_flow_err;
726 : }
727 : }
728 4 : g_mutex_unlock (&self->mqtt_src_mutex);
729 :
730 4 : while (elapsed > 0) {
731 : /** @todo DEFAULT_MQTT_SUB_TIMEOUT_MIN is too long */
732 4 : *buf = g_async_queue_timeout_pop (self->aqueue,
733 : DEFAULT_MQTT_SUB_TIMEOUT_MIN);
734 4 : if (*buf) {
735 4 : GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
736 4 : GstClockTime ulatency = GST_CLOCK_TIME_NONE;
737 : GstClock *clock;
738 :
739 : /** This buffer is coming from the past. Drop it. */
740 4 : if (!_is_gst_buffer_timestamp_valid (*buf)) {
741 0 : if (self->debug) {
742 0 : GST_DEBUG_OBJECT (self,
743 : "%s: Dumped the received buffer! (total: %" G_GUINT64_FORMAT ")",
744 : self->mqtt_topic, ++self->num_dumped);
745 : }
746 0 : elapsed = self->mqtt_sub_timeout;
747 0 : gst_buffer_unref (*buf);
748 0 : continue;
749 : }
750 :
751 : /** Update latency */
752 4 : clock = gst_element_get_clock (GST_ELEMENT (self));
753 4 : if (clock) {
754 4 : GstClockTime cur_time = gst_clock_get_time (clock);
755 4 : GstClockTime buf_ts = GST_BUFFER_TIMESTAMP (*buf);
756 4 : GstClockTimeDiff latency = 0;
757 :
758 4 : if ((base_time != GST_CLOCK_TIME_NONE) &&
759 4 : (cur_time != GST_CLOCK_TIME_NONE) &&
760 : (buf_ts != GST_CLOCK_TIME_NONE)) {
761 4 : GstClockTimeDiff now = GST_CLOCK_DIFF (base_time, cur_time);
762 :
763 4 : latency = GST_CLOCK_DIFF (buf_ts, (GstClockTime) now);
764 : }
765 :
766 4 : if (latency > 0) {
767 0 : ulatency = (GstClockTime) latency;
768 :
769 0 : if (GST_BUFFER_DURATION_IS_VALID (*buf)) {
770 0 : GstClockTime duration = GST_BUFFER_DURATION (*buf);
771 :
772 0 : if (duration >= ulatency) {
773 0 : ulatency = GST_CLOCK_TIME_NONE;
774 : }
775 : }
776 : }
777 4 : gst_object_unref (clock);
778 : }
779 :
780 4 : g_mutex_lock (&self->mqtt_src_mutex);
781 4 : self->latency = ulatency;
782 4 : g_mutex_unlock (&self->mqtt_src_mutex);
783 : /**
784 : * @todo If the difference between new latency and old latency,
785 : * gst_element_post_message (GST_ELEMENT_CAST (self),
786 : * gst_message_new_latency (GST_OBJECT_CAST (self)));
787 : * is needed.
788 : */
789 4 : break;
790 0 : } else if (self->err) {
791 0 : break;
792 : }
793 0 : elapsed = elapsed - DEFAULT_MQTT_SUB_TIMEOUT_MIN;
794 : }
795 :
796 4 : if (*buf == NULL) {
797 : /** @todo: Send EoS here */
798 0 : if (!self->err)
799 0 : self->err = g_error_new (self->gquark_err_tag, GST_FLOW_EOS,
800 : "%s: Timeout for receiving a message has been expired. Regarding as an error",
801 : __func__);
802 0 : goto ret_flow_err;
803 : }
804 :
805 4 : return GST_FLOW_OK;
806 :
807 2 : ret_flow_err:
808 2 : if (self->err) {
809 2 : g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
810 2 : self->err->message);
811 : }
812 2 : return GST_FLOW_ERROR;
813 : }
814 :
815 : /**
816 : * @brief An implementation of the GstBaseSrc vmethod that handles queries
817 : */
818 : static gboolean
819 55 : gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query)
820 : {
821 55 : GstQueryType type = GST_QUERY_TYPE (query);
822 55 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
823 55 : gboolean res = FALSE;
824 :
825 55 : if (self->debug)
826 53 : GST_DEBUG_OBJECT (self, "Got %s event", gst_query_type_get_name (type));
827 :
828 55 : switch (type) {
829 3 : case GST_QUERY_LATENCY:{
830 3 : GstClockTime min_latency = 0;
831 3 : GstClockTime max_latency = GST_CLOCK_TIME_NONE;
832 :
833 3 : g_mutex_lock (&self->mqtt_src_mutex);
834 3 : if (self->latency != GST_CLOCK_TIME_NONE) {
835 0 : min_latency = self->latency;
836 : }
837 3 : g_mutex_unlock (&self->mqtt_src_mutex);
838 :
839 3 : if (self->debug) {
840 3 : GST_DEBUG_OBJECT (self,
841 : "Reporting latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT,
842 : GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
843 : }
844 : /**
845 : * @brief The second argument of gst_query_set_latency should be always
846 : * TRUE.
847 : */
848 3 : gst_query_set_latency (query, TRUE, min_latency, max_latency);
849 :
850 3 : res = TRUE;
851 3 : break;
852 : }
853 52 : default:{
854 52 : res = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
855 : }
856 : }
857 :
858 55 : return res;
859 : }
860 :
861 : /**
862 : * @brief Getter for the 'debug' property.
863 : */
864 : static gboolean
865 2 : gst_mqtt_src_get_debug (GstMqttSrc * self)
866 : {
867 2 : return self->debug;
868 : }
869 :
870 : /**
871 : * @brief Setter for the 'debug' property.
872 : */
873 : static void
874 6 : gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag)
875 : {
876 6 : self->debug = flag;
877 6 : }
878 :
879 : /**
880 : * @brief Getter for the 'is-live' property.
881 : */
882 : static gboolean
883 1 : gst_mqtt_src_get_is_live (GstMqttSrc * self)
884 : {
885 1 : return self->is_live;
886 : }
887 :
888 : /**
889 : * @brief Setter for the 'is-live' property.
890 : */
891 : static void
892 6 : gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag)
893 : {
894 6 : self->is_live = flag;
895 6 : gst_base_src_set_live (GST_BASE_SRC (self), self->is_live);
896 6 : }
897 :
898 : /**
899 : * @brief Getter for the 'client-id' property.
900 : */
901 : static gchar *
902 1 : gst_mqtt_src_get_client_id (GstMqttSrc * self)
903 : {
904 1 : return self->mqtt_client_id;
905 : }
906 :
907 : /**
908 : * @brief Setter for the 'client-id' property.
909 : */
910 : static void
911 1 : gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id)
912 : {
913 1 : g_free (self->mqtt_client_id);
914 1 : self->mqtt_client_id = g_strdup (id);
915 1 : }
916 :
917 : /**
918 : * @brief Getter for the 'host' property.
919 : */
920 : static gchar *
921 1 : gst_mqtt_src_get_host_address (GstMqttSrc * self)
922 : {
923 1 : return self->mqtt_host_address;
924 : }
925 :
926 : /**
927 : * @brief Setter for the 'host' property
928 : */
929 : static void
930 1 : gst_mqtt_src_set_host_address (GstMqttSrc * self, const gchar * addr)
931 : {
932 : /**
933 : * @todo Handle the case where the addr is changed at runtime
934 : */
935 1 : g_free (self->mqtt_host_address);
936 1 : self->mqtt_host_address = g_strdup (addr);
937 1 : }
938 :
939 : /**
940 : * @brief Getter for the 'port' property.
941 : */
942 : static gchar *
943 1 : gst_mqtt_src_get_host_port (GstMqttSrc * self)
944 : {
945 1 : return self->mqtt_host_port;
946 : }
947 :
948 : /**
949 : * @brief Setter for the 'port' property
950 : */
951 : static void
952 1 : gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port)
953 : {
954 1 : g_free (self->mqtt_host_port);
955 1 : self->mqtt_host_port = g_strdup (port);
956 1 : }
957 :
958 : /**
959 : * @brief Getter for the 'sub-timeout' property
960 : */
961 : static gint64
962 2 : gst_mqtt_src_get_sub_timeout (GstMqttSrc * self)
963 : {
964 2 : return self->mqtt_sub_timeout;
965 : }
966 :
967 : /**
968 : * @brief Setter for the 'sub-timeout' property
969 : */
970 : static void
971 6 : gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t)
972 : {
973 6 : self->mqtt_sub_timeout = t;
974 6 : }
975 :
976 : /**
977 : * @brief Getter for the 'sub-topic' property
978 : */
979 : static gchar *
980 1 : gst_mqtt_src_get_sub_topic (GstMqttSrc * self)
981 : {
982 1 : return self->mqtt_topic;
983 : }
984 :
985 : /**
986 : * @brief Setter for the 'sub-topic' property
987 : */
988 : static void
989 6 : gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic)
990 : {
991 6 : g_free (self->mqtt_topic);
992 6 : self->mqtt_topic = g_strdup (topic);
993 6 : }
994 :
995 : /**
996 : * @brief Getter for the 'cleansession' property.
997 : */
998 : static gboolean
999 1 : gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self)
1000 : {
1001 1 : return self->mqtt_conn_opts.cleansession;
1002 : }
1003 :
1004 : /**
1005 : * @brief Setter for the 'cleansession' property.
1006 : */
1007 : static void
1008 1 : gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self, const gboolean val)
1009 : {
1010 1 : self->mqtt_conn_opts.cleansession = val;
1011 1 : }
1012 :
1013 : /**
1014 : * @brief Getter for the 'keep-alive-interval' property
1015 : */
1016 : static gint
1017 2 : gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self)
1018 : {
1019 2 : return self->mqtt_conn_opts.keepAliveInterval;
1020 : }
1021 :
1022 : /**
1023 : * @brief Setter for the 'keep-alive-interval' property
1024 : */
1025 : static void
1026 1 : gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self, const gint num)
1027 : {
1028 1 : self->mqtt_conn_opts.keepAliveInterval = num;
1029 1 : }
1030 :
1031 : /**
1032 : * @brief Getter for the 'mqtt-qos' property
1033 : */
1034 : static gint
1035 2 : gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self)
1036 : {
1037 2 : return self->mqtt_qos;
1038 : }
1039 :
1040 : /**
1041 : * @brief Setter for the 'mqtt-qos' property
1042 : */
1043 : static void
1044 1 : gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos)
1045 : {
1046 1 : self->mqtt_qos = qos;
1047 1 : }
1048 :
1049 : /**
1050 : * @brief A callback to handle the connection lost to the broker
1051 : */
1052 : static void
1053 0 : cb_mqtt_on_connection_lost (void *context, char *cause)
1054 : {
1055 0 : GstMqttSrc *self = GST_MQTT_SRC_CAST (context);
1056 : UNUSED (cause);
1057 :
1058 0 : g_mutex_lock (&self->mqtt_src_mutex);
1059 0 : self->is_connected = FALSE;
1060 0 : self->is_subscribed = FALSE;
1061 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1062 0 : if (!self->err) {
1063 0 : self->err = g_error_new (self->gquark_err_tag, EHOSTDOWN,
1064 : "Connection to the host (broker) has been lost: %s \n"
1065 : "\t\tfor detail, please check the log message of the broker",
1066 : g_strerror (EHOSTDOWN));
1067 : }
1068 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1069 0 : }
1070 :
1071 : /**
1072 : * @brief A callback to handle the arrived message
1073 : */
1074 : static int
1075 6 : cb_mqtt_on_message_arrived (void *context, char *topic_name, int topic_len,
1076 : MQTTAsync_message * message)
1077 : {
1078 6 : const int size = message->payloadlen;
1079 6 : guint8 *data = message->payload;
1080 : GstMQTTMessageHdr *mqtt_msg_hdr;
1081 : GstMapInfo hdr_map_info;
1082 : GstMemory *received_mem;
1083 : GstMemory *hdr_mem;
1084 : GstBuffer *buffer;
1085 : GstBaseSrc *basesrc;
1086 : GstMqttSrc *self;
1087 : GstClock *clock;
1088 : GstCaps *recv_caps;
1089 : gsize offset;
1090 : guint i;
1091 : UNUSED (topic_name);
1092 : UNUSED (topic_len);
1093 :
1094 6 : self = GST_MQTT_SRC_CAST (context);
1095 6 : g_mutex_lock (&self->mqtt_src_mutex);
1096 6 : if (!self->is_subscribed) {
1097 2 : g_mutex_unlock (&self->mqtt_src_mutex);
1098 :
1099 6 : return TRUE;
1100 : }
1101 4 : g_mutex_unlock (&self->mqtt_src_mutex);
1102 :
1103 4 : basesrc = GST_BASE_SRC (self);
1104 4 : clock = gst_element_get_clock (GST_ELEMENT (self));
1105 4 : received_mem = gst_memory_new_wrapped (0, data, size, 0, size, message,
1106 : (GDestroyNotify) cb_memory_wrapped_destroy);
1107 4 : if (!received_mem) {
1108 0 : if (!self->err) {
1109 0 : self->err = g_error_new (self->gquark_err_tag, ENODATA,
1110 : "%s: failed to wrap the raw data of received message in GstMemory: %s",
1111 : __func__, g_strerror (ENODATA));
1112 : }
1113 0 : return TRUE;
1114 : }
1115 :
1116 4 : mqtt_msg_hdr = _extract_mqtt_msg_hdr_from (received_mem, &hdr_mem,
1117 : &hdr_map_info);
1118 4 : if (!mqtt_msg_hdr) {
1119 0 : if (!self->err) {
1120 0 : self->err = g_error_new (self->gquark_err_tag, ENODATA,
1121 : "%s: failed to extract header information from received message: %s",
1122 : __func__, g_strerror (ENODATA));
1123 : }
1124 0 : goto ret_unref_received_mem;
1125 : }
1126 :
1127 4 : recv_caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
1128 4 : if (recv_caps) {
1129 4 : if (!self->caps || !gst_caps_is_equal (self->caps, recv_caps)) {
1130 4 : gst_caps_replace (&self->caps, recv_caps);
1131 4 : gst_mqtt_src_renegotiate (basesrc);
1132 : }
1133 :
1134 4 : gst_caps_unref (recv_caps);
1135 : }
1136 :
1137 4 : buffer = gst_buffer_new ();
1138 4 : offset = GST_MQTT_LEN_MSG_HDR;
1139 8 : for (i = 0; i < mqtt_msg_hdr->num_mems; ++i) {
1140 : GstMemory *each_memory;
1141 : int each_size;
1142 :
1143 4 : each_size = mqtt_msg_hdr->size_mems[i];
1144 4 : each_memory = gst_memory_share (received_mem, offset, each_size);
1145 4 : gst_buffer_append_memory (buffer, each_memory);
1146 4 : offset += each_size;
1147 : }
1148 :
1149 : /** Timestamp synchronization */
1150 4 : if (self->debug) {
1151 4 : GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
1152 :
1153 4 : if (clock) {
1154 4 : GST_DEBUG_OBJECT (self,
1155 : "A message has been arrived at %" GST_TIME_FORMAT
1156 : " and queue length is %d",
1157 : GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1158 : g_async_queue_length (self->aqueue));
1159 :
1160 4 : gst_object_unref (clock);
1161 : }
1162 : }
1163 4 : _put_timestamp_on_gst_buf (self, mqtt_msg_hdr, buffer);
1164 4 : g_async_queue_push (self->aqueue, buffer);
1165 :
1166 4 : gst_memory_unmap (hdr_mem, &hdr_map_info);
1167 4 : gst_memory_unref (hdr_mem);
1168 :
1169 4 : ret_unref_received_mem:
1170 4 : gst_memory_unref (received_mem);
1171 :
1172 4 : return TRUE;
1173 : }
1174 :
1175 : /**
1176 : * @brief A callback invoked when destroying the GstMemory which wrapped the arrived message
1177 : */
1178 : static void
1179 4 : cb_memory_wrapped_destroy (void *p)
1180 : {
1181 4 : MQTTAsync_message *msg = p;
1182 :
1183 4 : MQTTAsync_freeMessage (&msg);
1184 4 : }
1185 :
1186 : /**
1187 : * @brief A callback invoked when the connection is established
1188 : */
1189 : static void
1190 5 : cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
1191 : {
1192 5 : GstMqttSrc *self = GST_MQTT_SRC (context);
1193 5 : GstBaseSrc *basesrc = GST_BASE_SRC (self);
1194 : int ret;
1195 : UNUSED (response);
1196 :
1197 5 : g_mutex_lock (&self->mqtt_src_mutex);
1198 5 : self->is_connected = TRUE;
1199 5 : g_cond_broadcast (&self->mqtt_src_gcond);
1200 5 : g_mutex_unlock (&self->mqtt_src_mutex);
1201 :
1202 : /** GstFlowReturn is an enum type. It is possible to use int here */
1203 5 : if (gst_base_src_is_async (basesrc) &&
1204 0 : (ret = gst_base_src_start_wait (basesrc)) != GST_FLOW_OK) {
1205 0 : g_mutex_lock (&self->mqtt_src_mutex);
1206 0 : self->err = g_error_new (self->gquark_err_tag, ret,
1207 : "%s: the virtual method, start (), in the GstBaseSrc class fails with return code %d",
1208 : __func__, ret);
1209 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1210 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1211 0 : return;
1212 : }
1213 :
1214 5 : if (!_subscribe (self)) {
1215 2 : GST_ERROR_OBJECT (self, "Failed to subscribe to %s", self->mqtt_topic);
1216 : }
1217 : }
1218 :
1219 : /**
1220 : * @brief A callback invoked when it is failed to connect to the broker
1221 : */
1222 : static void
1223 0 : cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
1224 : {
1225 0 : GstMqttSrc *self = GST_MQTT_SRC (context);
1226 :
1227 0 : g_mutex_lock (&self->mqtt_src_mutex);
1228 0 : self->is_connected = FALSE;
1229 :
1230 0 : if (!self->err) {
1231 0 : self->err = g_error_new (self->gquark_err_tag, response->code,
1232 : "%s: failed to connect to the broker: %s", __func__, response->message);
1233 : }
1234 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1235 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1236 0 : }
1237 :
1238 : /**
1239 : * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_subscribe ()
1240 : */
1241 : static void
1242 3 : cb_mqtt_on_subscribe (void *context, MQTTAsync_successData * response)
1243 : {
1244 3 : GstMqttSrc *self = GST_MQTT_SRC (context);
1245 : UNUSED (response);
1246 :
1247 3 : g_mutex_lock (&self->mqtt_src_mutex);
1248 3 : self->is_subscribed = TRUE;
1249 3 : g_cond_broadcast (&self->mqtt_src_gcond);
1250 3 : g_mutex_unlock (&self->mqtt_src_mutex);
1251 3 : }
1252 :
1253 : /**
1254 : * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_subscribe ()
1255 : */
1256 : static void
1257 2 : cb_mqtt_on_subscribe_failure (void *context, MQTTAsync_failureData * response)
1258 : {
1259 2 : GstMqttSrc *self = GST_MQTT_SRC (context);
1260 :
1261 2 : g_mutex_lock (&self->mqtt_src_mutex);
1262 2 : if (!self->err) {
1263 2 : self->err = g_error_new (self->gquark_err_tag, response->code,
1264 : "%s: failed to subscribe the given topic, %s: %s", __func__,
1265 : self->mqtt_topic, response->message);
1266 : }
1267 2 : g_cond_broadcast (&self->mqtt_src_gcond);
1268 2 : g_mutex_unlock (&self->mqtt_src_mutex);
1269 2 : }
1270 :
1271 : /**
1272 : * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_unsubscribe ()
1273 : */
1274 : static void
1275 3 : cb_mqtt_on_unsubscribe (void *context, MQTTAsync_successData * response)
1276 : {
1277 3 : GstMqttSrc *self = GST_MQTT_SRC (context);
1278 : UNUSED (response);
1279 :
1280 3 : g_mutex_lock (&self->mqtt_src_mutex);
1281 3 : self->is_subscribed = FALSE;
1282 3 : g_cond_broadcast (&self->mqtt_src_gcond);
1283 3 : g_mutex_unlock (&self->mqtt_src_mutex);
1284 3 : }
1285 :
1286 : /**
1287 : * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_unsubscribe ()
1288 : */
1289 : static void
1290 0 : cb_mqtt_on_unsubscribe_failure (void *context, MQTTAsync_failureData * response)
1291 : {
1292 0 : GstMqttSrc *self = GST_MQTT_SRC (context);
1293 :
1294 0 : g_mutex_lock (&self->mqtt_src_mutex);
1295 0 : if (!self->err) {
1296 0 : self->err = g_error_new (self->gquark_err_tag, response->code,
1297 : "%s: failed to unsubscribe the given topic, %s: %s", __func__,
1298 : self->mqtt_topic, response->message);
1299 : }
1300 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1301 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1302 0 : }
1303 :
1304 : /**
1305 : * @brief A helper function to properly invoke MQTTAsync_subscribe ()
1306 : */
1307 : static gboolean
1308 5 : _subscribe (GstMqttSrc * self)
1309 : {
1310 5 : MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
1311 : int mqttasync_ret;
1312 :
1313 5 : opts.onSuccess = cb_mqtt_on_subscribe;
1314 5 : opts.onFailure = cb_mqtt_on_subscribe_failure;
1315 5 : opts.subscribeOptions.retainHandling = 1;
1316 :
1317 10 : mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
1318 5 : self->mqtt_topic, self->mqtt_qos, &opts);
1319 5 : if (mqttasync_ret != MQTTASYNC_SUCCESS)
1320 5 : return FALSE;
1321 3 : return TRUE;
1322 : }
1323 :
1324 : /**
1325 : * @brief A wrapper function that calls MQTTAsync_unsubscribe ()
1326 : */
1327 : static gboolean
1328 3 : _unsubscribe (GstMqttSrc * self)
1329 : {
1330 3 : MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
1331 : int mqttasync_ret;
1332 :
1333 3 : opts.onSuccess = cb_mqtt_on_unsubscribe;
1334 3 : opts.onFailure = cb_mqtt_on_unsubscribe_failure;
1335 :
1336 6 : mqttasync_ret = MQTTAsync_unsubscribe (self->mqtt_client_handle,
1337 3 : self->mqtt_topic, &opts);
1338 3 : if (mqttasync_ret != MQTTASYNC_SUCCESS)
1339 3 : return FALSE;
1340 3 : return TRUE;
1341 : }
1342 :
1343 : /**
1344 : * @brief A utility function to extract header information from a received message
1345 : */
1346 : static GstMQTTMessageHdr *
1347 4 : _extract_mqtt_msg_hdr_from (GstMemory * mem, GstMemory ** hdr_mem,
1348 : GstMapInfo * hdr_map_info)
1349 : {
1350 4 : *hdr_mem = gst_memory_share (mem, 0, GST_MQTT_LEN_MSG_HDR);
1351 4 : g_return_val_if_fail (*hdr_mem != NULL, NULL);
1352 :
1353 4 : if (!gst_memory_map (*hdr_mem, hdr_map_info, GST_MAP_READ)) {
1354 0 : gst_memory_unref (*hdr_mem);
1355 0 : return NULL;
1356 : }
1357 :
1358 4 : return (GstMQTTMessageHdr *) hdr_map_info->data;
1359 : }
1360 :
1361 : /**
1362 : * @brief A utility function to put the timestamp information
1363 : * onto a GstBuffer-typed buffer using the given packet header
1364 : */
1365 : static void
1366 4 : _put_timestamp_on_gst_buf (GstMqttSrc * self, GstMQTTMessageHdr * hdr,
1367 : GstBuffer * buf)
1368 : {
1369 4 : gint64 diff_base_epoch = hdr->base_time_epoch - self->base_time_epoch;
1370 :
1371 4 : buf->pts = GST_CLOCK_TIME_NONE;
1372 4 : buf->dts = GST_CLOCK_TIME_NONE;
1373 4 : buf->duration = GST_CLOCK_TIME_NONE;
1374 :
1375 4 : if (hdr->sent_time_epoch < self->base_time_epoch)
1376 0 : return;
1377 :
1378 4 : if (((GstClockTimeDiff) hdr->pts + diff_base_epoch) < 0)
1379 0 : return;
1380 :
1381 4 : if (hdr->pts != GST_CLOCK_TIME_NONE) {
1382 4 : buf->pts = hdr->pts + diff_base_epoch;
1383 : }
1384 :
1385 4 : if (hdr->dts != GST_CLOCK_TIME_NONE) {
1386 4 : buf->dts = hdr->dts + diff_base_epoch;
1387 : }
1388 :
1389 4 : buf->duration = hdr->duration;
1390 :
1391 4 : if (self->debug) {
1392 4 : GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
1393 : GstClock *clock;
1394 :
1395 4 : clock = gst_element_get_clock (GST_ELEMENT (self));
1396 :
1397 4 : if (clock) {
1398 4 : GST_DEBUG_OBJECT (self,
1399 : "%s diff %" GST_STIME_FORMAT " now %" GST_TIME_FORMAT " ts (%"
1400 : GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ")", self->mqtt_topic,
1401 : GST_STIME_ARGS (diff_base_epoch),
1402 : GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1403 : GST_TIME_ARGS (hdr->pts), GST_TIME_ARGS (buf->pts));
1404 :
1405 4 : gst_object_unref (clock);
1406 : }
1407 : }
1408 : }
|