LCOV - code coverage report
Current view: top level - nnstreamer-2.4.2/gst/mqtt - mqttsrc.c (source / functions) Coverage Total Hit
Test: nnstreamer 2.4.2-0 nnstreamer/nnstreamer#eca68b8d050408568af95d831a8eef62aaee7784 Lines: 83.3 % 600 500
Test Date: 2025-03-13 05:38:21 Functions: 94.1 % 51 48

            Line data    Source code
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
       4              :  */
       5              : /**
       6              :  * @file    mqttsrc.c
       7              :  * @date    08 Mar 2021
       8              :  * @brief   Subscribe a MQTT topic and push incoming data to the GStreamer pipeline
       9              :  * @see     https://github.com/nnstreamer/nnstreamer
      10              :  * @author  Wook Song <wook16.song@samsung.com>
      11              :  * @bug     No known bugs except for NYI items
      12              :  */
      13              : 
      14              : #ifdef HAVE_CONFIG_H
      15              : #include <config.h>
      16              : #endif
      17              : 
      18              : #ifdef G_OS_WIN32
      19              : #include <process.h>
      20              : #else
      21              : #include <sys/types.h>
      22              : #include <unistd.h>
      23              : #endif
      24              : 
      25              : #include <gst/base/gstbasesrc.h>
      26              : #include <MQTTAsync.h>
      27              : #include <nnstreamer_util.h>
      28              : 
      29              : #include "mqttsrc.h"
      30              : 
      31              : static GstStaticPadTemplate src_pad_template = GST_STATIC_PAD_TEMPLATE ("src",
      32              :     GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
      33              : 
      34              : #define gst_mqtt_src_parent_class parent_class
      35          176 : G_DEFINE_TYPE (GstMqttSrc, gst_mqtt_src, GST_TYPE_BASE_SRC);
      36              : 
      37              : GST_DEBUG_CATEGORY_STATIC (gst_mqtt_src_debug);
      38              : #define GST_CAT_DEFAULT gst_mqtt_src_debug
      39              : 
      40              : enum
      41              : {
      42              :   PROP_0,
      43              : 
      44              :   PROP_DEBUG,
      45              :   PROP_IS_LIVE,
      46              :   PROP_MQTT_CLIENT_ID,
      47              :   PROP_MQTT_HOST_ADDRESS,
      48              :   PROP_MQTT_HOST_PORT,
      49              :   PROP_MQTT_SUB_TOPIC,
      50              :   PROP_MQTT_SUB_TIMEOUT,
      51              :   PROP_MQTT_OPT_CLEANSESSION,
      52              :   PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
      53              :   PROP_MQTT_QOS,
      54              : 
      55              :   PROP_LAST
      56              : };
      57              : 
      58              : enum
      59              : {
      60              :   DEFAULT_DEBUG = FALSE,
      61              :   DEFAULT_IS_LIVE = TRUE,
      62              :   DEFAULT_MQTT_OPT_CLEANSESSION = TRUE,
      63              :   DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60,    /* 1 minute */
      64              :   DEFAULT_MQTT_SUB_TIMEOUT = 10000000,  /* 10 seconds */
      65              :   DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000,       /* 1 seconds */
      66              :   DEFAULT_MQTT_QOS = 2,         /* Once and one only */
      67              : };
      68              : 
      69              : static guint8 src_client_id = 0;
      70              : static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
      71              : static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
      72              : static const gchar TAG_ERR_MQTTSRC[] = "ERROR: MQTTSrc";
      73              : static const gchar DEFAULT_MQTT_CLIENT_ID[] =
      74              :     "$HOSTNAME_$PID_^[0-9][0-9]?$|^255$";
      75              : static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_src%u";
      76              : 
      77              : /** Function prototype declarations */
      78              : static void
      79              : gst_mqtt_src_set_property (GObject * object, guint prop_id,
      80              :     const GValue * value, GParamSpec * pspec);
      81              : static void
      82              : gst_mqtt_src_get_property (GObject * object, guint prop_id,
      83              :     GValue * value, GParamSpec * pspec);
      84              : static void gst_mqtt_src_class_finalize (GObject * object);
      85              : 
      86              : static GstStateChangeReturn
      87              : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition);
      88              : 
      89              : static gboolean gst_mqtt_src_start (GstBaseSrc * basesrc);
      90              : static gboolean gst_mqtt_src_stop (GstBaseSrc * basesrc);
      91              : static GstCaps *gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
      92              : static gboolean gst_mqtt_src_renegotiate (GstBaseSrc * basesrc);
      93              : 
      94              : static void
      95              : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
      96              :     GstClockTime * start, GstClockTime * end);
      97              : static gboolean gst_mqtt_src_is_seekable (GstBaseSrc * basesrc);
      98              : static GstFlowReturn
      99              : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
     100              :     GstBuffer ** buf);
     101              : static gboolean gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query);
     102              : 
     103              : static gboolean gst_mqtt_src_get_debug (GstMqttSrc * self);
     104              : static void gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag);
     105              : static gboolean gst_mqtt_src_get_is_live (GstMqttSrc * self);
     106              : static void gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag);
     107              : static gchar *gst_mqtt_src_get_client_id (GstMqttSrc * self);
     108              : static void gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id);
     109              : static gchar *gst_mqtt_src_get_host_address (GstMqttSrc * self);
     110              : static void gst_mqtt_src_set_host_address (GstMqttSrc * self,
     111              :     const gchar * addr);
     112              : static gchar *gst_mqtt_src_get_host_port (GstMqttSrc * self);
     113              : static void gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port);
     114              : static gint64 gst_mqtt_src_get_sub_timeout (GstMqttSrc * self);
     115              : static void gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t);
     116              : static gchar *gst_mqtt_src_get_sub_topic (GstMqttSrc * self);
     117              : static void gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic);
     118              : static gboolean gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self);
     119              : static void gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self,
     120              :     const gboolean val);
     121              : static gint gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self);
     122              : static void gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self,
     123              :     const gint num);
     124              : static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self);
     125              : static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos);
     126              : 
     127              : static void cb_mqtt_on_connection_lost (void *context, char *cause);
     128              : static int cb_mqtt_on_message_arrived (void *context, char *topic_name,
     129              :     int topic_len, MQTTAsync_message * message);
     130              : static void cb_mqtt_on_connect (void *context,
     131              :     MQTTAsync_successData * response);
     132              : static void cb_mqtt_on_connect_failure (void *context,
     133              :     MQTTAsync_failureData * response);
     134              : static void cb_mqtt_on_subscribe (void *context,
     135              :     MQTTAsync_successData * response);
     136              : static void cb_mqtt_on_subscribe_failure (void *context,
     137              :     MQTTAsync_failureData * response);
     138              : static void cb_mqtt_on_unsubscribe (void *context,
     139              :     MQTTAsync_successData * response);
     140              : static void cb_mqtt_on_unsubscribe_failure (void *context,
     141              :     MQTTAsync_failureData * response);
     142              : 
     143              : static void cb_memory_wrapped_destroy (void *p);
     144              : 
     145              : static GstMQTTMessageHdr *_extract_mqtt_msg_hdr_from (GstMemory * mem,
     146              :     GstMemory ** hdr_mem, GstMapInfo * hdr_map_info);
     147              : static void _put_timestamp_on_gst_buf (GstMqttSrc * self,
     148              :     GstMQTTMessageHdr * hdr, GstBuffer * buf);
     149              : static gboolean _subscribe (GstMqttSrc * self);
     150              : static gboolean _unsubscribe (GstMqttSrc * self);
     151              : 
     152              : /**
     153              :  * @brief A utility function to check whether the timestamp marked by _put_timestamp_on_gst_buf () is valid or not
     154              :  */
     155              : static inline gboolean
     156            4 : _is_gst_buffer_timestamp_valid (GstBuffer * buf)
     157              : {
     158            4 :   if (!GST_BUFFER_PTS_IS_VALID (buf) && !GST_BUFFER_DTS_IS_VALID (buf) &&
     159            0 :       !GST_BUFFER_DURATION_IS_VALID (buf))
     160            0 :     return FALSE;
     161            4 :   return TRUE;
     162              : }
     163              : 
     164              : /** Function definitions */
     165              : /**
     166              :  * @brief Initialize GstMqttSrc object
     167              :  */
     168              : static void
     169            7 : gst_mqtt_src_init (GstMqttSrc * self)
     170              : {
     171            7 :   MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
     172            7 :   MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
     173            7 :   GstBaseSrc *basesrc = GST_BASE_SRC (self);
     174              : 
     175            7 :   self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSRC);
     176              : 
     177            7 :   gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
     178            7 :   gst_base_src_set_async (basesrc, FALSE);
     179              : 
     180              :   /** init mqttsrc properties */
     181            7 :   self->mqtt_client_handle = NULL;
     182            7 :   self->debug = DEFAULT_DEBUG;
     183            7 :   self->is_live = DEFAULT_IS_LIVE;
     184            7 :   self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
     185            7 :   self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
     186            7 :   self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
     187            7 :   self->mqtt_topic = NULL;
     188            7 :   self->mqtt_sub_timeout = (gint64) DEFAULT_MQTT_SUB_TIMEOUT;
     189            7 :   self->mqtt_conn_opts = conn_opts;
     190            7 :   self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
     191            7 :   self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
     192            7 :   self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
     193            7 :   self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
     194            7 :   self->mqtt_conn_opts.context = self;
     195            7 :   self->mqtt_respn_opts = respn_opts;
     196            7 :   self->mqtt_respn_opts.onSuccess = NULL;
     197            7 :   self->mqtt_respn_opts.onFailure = NULL;
     198            7 :   self->mqtt_respn_opts.context = self;
     199            7 :   self->mqtt_qos = DEFAULT_MQTT_QOS;
     200              : 
     201              :   /** init private member variables */
     202            7 :   self->err = NULL;
     203            7 :   self->aqueue = g_async_queue_new ();
     204            7 :   g_cond_init (&self->mqtt_src_gcond);
     205            7 :   g_mutex_init (&self->mqtt_src_mutex);
     206            7 :   g_mutex_lock (&self->mqtt_src_mutex);
     207            7 :   self->is_connected = FALSE;
     208            7 :   self->is_subscribed = FALSE;
     209            7 :   self->latency = GST_CLOCK_TIME_NONE;
     210            7 :   g_mutex_unlock (&self->mqtt_src_mutex);
     211            7 :   self->base_time_epoch = GST_CLOCK_TIME_NONE;
     212            7 :   self->caps = NULL;
     213            7 :   self->num_dumped = 0;
     214              : 
     215            7 :   gst_base_src_set_live (basesrc, self->is_live);
     216            7 : }
     217              : 
     218              : /**
     219              :  * @brief Initialize GstMqttSrcClass object
     220              :  */
     221              : static void
     222            1 : gst_mqtt_src_class_init (GstMqttSrcClass * klass)
     223              : {
     224            1 :   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
     225            1 :   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
     226            1 :   GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
     227              : 
     228            1 :   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SRC, 0,
     229              :       "MQTT src");
     230              : 
     231            1 :   gobject_class->set_property = gst_mqtt_src_set_property;
     232            1 :   gobject_class->get_property = gst_mqtt_src_get_property;
     233            1 :   gobject_class->finalize = gst_mqtt_src_class_finalize;
     234              : 
     235            1 :   g_object_class_install_property (gobject_class, PROP_DEBUG,
     236              :       g_param_spec_boolean ("debug", "Debug",
     237              :           "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
     238              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     239              : 
     240            1 :   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
     241              :       g_param_spec_boolean ("is-live", "Is Live",
     242              :           "Synchronize the incoming buffers' timestamp with the current running time",
     243              :           DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     244              : 
     245            1 :   g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
     246              :       g_param_spec_string ("client-id", "Client ID",
     247              :           "The client identifier passed to the server (broker)",
     248              :           DEFAULT_MQTT_CLIENT_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     249              : 
     250            1 :   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
     251              :       g_param_spec_string ("host", "Host", "Host (broker) to connect to",
     252              :           DEFAULT_MQTT_HOST_ADDRESS,
     253              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     254              : 
     255            1 :   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
     256              :       g_param_spec_string ("port", "Port",
     257              :           "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
     258              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     259              : 
     260            1 :   g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TIMEOUT,
     261              :       g_param_spec_int64 ("sub-timeout", "Timeout for receiving a message",
     262              :           "The timeout (in microseconds) for receiving a message from subscribed topic",
     263              :           1000000, G_MAXINT64, DEFAULT_MQTT_SUB_TIMEOUT,
     264              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     265              : 
     266            1 :   g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TOPIC,
     267              :       g_param_spec_string ("sub-topic", "Topic to Subscribe (mandatory)",
     268              :           "The topic's name to subscribe", NULL,
     269              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     270              : 
     271            1 :   g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
     272              :       g_param_spec_boolean ("cleansession", "Cleansession",
     273              :           "When it is TRUE, the state information is discarded at connect and disconnect.",
     274              :           DEFAULT_MQTT_OPT_CLEANSESSION,
     275              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     276              : 
     277            1 :   g_object_class_install_property (gobject_class,
     278              :       PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
     279              :       g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
     280              :           "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
     281              :           1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
     282              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     283              : 
     284            1 :   g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
     285              :       g_param_spec_int ("mqtt-qos", "mqtt QoS level",
     286              :           "The QoS level of MQTT.\n"
     287              :           "\t\t\t  0: At most once\n"
     288              :           "\t\t\t  1: At least once\n"
     289              :           "\t\t\t  2: Exactly once\n"
     290              :           "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
     291              :           0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     292              : 
     293            1 :   gstelement_class->change_state =
     294            1 :       GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
     295              : 
     296            1 :   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_src_start);
     297            1 :   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_src_stop);
     298            1 :   gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_caps);
     299            1 :   gstbasesrc_class->get_times = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_times);
     300            1 :   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_mqtt_src_is_seekable);
     301            1 :   gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_mqtt_src_create);
     302            1 :   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_src_query);
     303              : 
     304            1 :   gst_element_class_set_static_metadata (gstelement_class,
     305              :       "MQTT source", "Source/MQTT",
     306              :       "Subscribe a MQTT topic and push incoming data to the GStreamer pipeline",
     307              :       "Wook Song <wook16.song@samsung.com>");
     308            1 :   gst_element_class_add_static_pad_template (gstelement_class,
     309              :       &src_pad_template);
     310            1 : }
     311              : 
     312              : /**
     313              :  * @brief The setter for the mqttsrc's properties
     314              :  */
     315              : static void
     316           30 : gst_mqtt_src_set_property (GObject * object, guint prop_id,
     317              :     const GValue * value, GParamSpec * pspec)
     318              : {
     319           30 :   GstMqttSrc *self = GST_MQTT_SRC (object);
     320              : 
     321           30 :   switch (prop_id) {
     322            6 :     case PROP_DEBUG:
     323            6 :       gst_mqtt_src_set_debug (self, g_value_get_boolean (value));
     324            6 :       break;
     325            6 :     case PROP_IS_LIVE:
     326            6 :       gst_mqtt_src_set_is_live (self, g_value_get_boolean (value));
     327            6 :       break;
     328            1 :     case PROP_MQTT_CLIENT_ID:
     329            1 :       gst_mqtt_src_set_client_id (self, g_value_get_string (value));
     330            1 :       break;
     331            1 :     case PROP_MQTT_HOST_ADDRESS:
     332            1 :       gst_mqtt_src_set_host_address (self, g_value_get_string (value));
     333            1 :       break;
     334            1 :     case PROP_MQTT_HOST_PORT:
     335            1 :       gst_mqtt_src_set_host_port (self, g_value_get_string (value));
     336            1 :       break;
     337            6 :     case PROP_MQTT_SUB_TIMEOUT:
     338            6 :       gst_mqtt_src_set_sub_timeout (self, g_value_get_int64 (value));
     339            6 :       break;
     340            6 :     case PROP_MQTT_SUB_TOPIC:
     341            6 :       gst_mqtt_src_set_sub_topic (self, g_value_get_string (value));
     342            6 :       break;
     343            1 :     case PROP_MQTT_OPT_CLEANSESSION:
     344            1 :       gst_mqtt_src_set_opt_cleansession (self, g_value_get_boolean (value));
     345            1 :       break;
     346            1 :     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
     347            1 :       gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value));
     348            1 :       break;
     349            1 :     case PROP_MQTT_QOS:
     350            1 :       gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value));
     351            1 :       break;
     352            0 :     default:
     353            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     354            0 :       break;
     355              :   }
     356           30 : }
     357              : 
     358              : /**
     359              :  * @brief The getter for the mqttsrc's properties
     360              :  */
     361              : static void
     362           14 : gst_mqtt_src_get_property (GObject * object, guint prop_id,
     363              :     GValue * value, GParamSpec * pspec)
     364              : {
     365           14 :   GstMqttSrc *self = GST_MQTT_SRC (object);
     366              : 
     367           14 :   switch (prop_id) {
     368            2 :     case PROP_DEBUG:
     369            2 :       g_value_set_boolean (value, gst_mqtt_src_get_debug (self));
     370            2 :       break;
     371            1 :     case PROP_IS_LIVE:
     372            1 :       g_value_set_boolean (value, gst_mqtt_src_get_is_live (self));
     373            1 :       break;
     374            1 :     case PROP_MQTT_CLIENT_ID:
     375            1 :       g_value_set_string (value, gst_mqtt_src_get_client_id (self));
     376            1 :       break;
     377            1 :     case PROP_MQTT_HOST_ADDRESS:
     378            1 :       g_value_set_string (value, gst_mqtt_src_get_host_address (self));
     379            1 :       break;
     380            1 :     case PROP_MQTT_HOST_PORT:
     381            1 :       g_value_set_string (value, gst_mqtt_src_get_host_port (self));
     382            1 :       break;
     383            2 :     case PROP_MQTT_SUB_TIMEOUT:
     384            2 :       g_value_set_int64 (value, gst_mqtt_src_get_sub_timeout (self));
     385            2 :       break;
     386            1 :     case PROP_MQTT_SUB_TOPIC:
     387            1 :       g_value_set_string (value, gst_mqtt_src_get_sub_topic (self));
     388            1 :       break;
     389            1 :     case PROP_MQTT_OPT_CLEANSESSION:
     390            1 :       g_value_set_boolean (value, gst_mqtt_src_get_opt_cleansession (self));
     391            1 :       break;
     392            2 :     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
     393            2 :       g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self));
     394            2 :       break;
     395            2 :     case PROP_MQTT_QOS:
     396            2 :       g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self));
     397            2 :       break;
     398            0 :     default:
     399            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     400            0 :       break;
     401              :   }
     402           14 : }
     403              : 
     404              : /**
     405              :  * @brief Finalize GstMqttSrcClass object
     406              :  */
     407              : static void
     408            7 : gst_mqtt_src_class_finalize (GObject * object)
     409              : {
     410            7 :   GstMqttSrc *self = GST_MQTT_SRC (object);
     411              :   GstBuffer *remained;
     412              : 
     413            7 :   if (self->mqtt_client_handle) {
     414            0 :     MQTTAsync_destroy (&self->mqtt_client_handle);
     415            0 :     self->mqtt_client_handle = NULL;
     416              :   }
     417              : 
     418            7 :   g_free (self->mqtt_client_id);
     419            7 :   g_free (self->mqtt_host_address);
     420            7 :   g_free (self->mqtt_host_port);
     421            7 :   g_free (self->mqtt_topic);
     422            7 :   gst_caps_replace (&self->caps, NULL);
     423              : 
     424            7 :   if (self->err)
     425            2 :     g_error_free (self->err);
     426              : 
     427            7 :   while ((remained = g_async_queue_try_pop (self->aqueue))) {
     428            0 :     gst_buffer_unref (remained);
     429              :   }
     430            7 :   g_clear_pointer (&self->aqueue, g_async_queue_unref);
     431              : 
     432            7 :   g_mutex_clear (&self->mqtt_src_mutex);
     433            7 :   G_OBJECT_CLASS (parent_class)->finalize (object);
     434            7 : }
     435              : 
     436              : /**
     437              :  * @brief Handle mqttsrc's state change
     438              :  */
     439              : static GstStateChangeReturn
     440           32 : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition)
     441              : {
     442           32 :   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
     443           32 :   GstMqttSrc *self = GST_MQTT_SRC (element);
     444           32 :   gboolean no_preroll = FALSE;
     445              :   GstClock *elem_clock;
     446              :   GstClockTime base_time;
     447              :   GstClockTime cur_time;
     448              :   GstClockTimeDiff diff;
     449              : 
     450           32 :   switch (transition) {
     451            5 :     case GST_STATE_CHANGE_NULL_TO_READY:
     452            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
     453            5 :       if (self->err) {
     454            0 :         g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
     455            0 :             self->err->message);
     456            0 :         return GST_STATE_CHANGE_FAILURE;
     457              :       }
     458            5 :       break;
     459            5 :     case GST_STATE_CHANGE_READY_TO_PAUSED:
     460            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
     461              :       /* Regardless of the 'is-live''s value, prerolling is not supported */
     462            5 :       no_preroll = TRUE;
     463            5 :       break;
     464            5 :     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
     465            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
     466            5 :       self->base_time_epoch = GST_CLOCK_TIME_NONE;
     467            5 :       elem_clock = gst_element_get_clock (element);
     468            5 :       if (!elem_clock)
     469            0 :         break;
     470            5 :       base_time = gst_element_get_base_time (element);
     471            5 :       cur_time = gst_clock_get_time (elem_clock);
     472            5 :       gst_object_unref (elem_clock);
     473            5 :       diff = GST_CLOCK_DIFF (base_time, cur_time);
     474            5 :       self->base_time_epoch =
     475            5 :           g_get_real_time () * GST_US_TO_NS_MULTIPLIER - diff;
     476              : 
     477              :       /** This handles the case when the state is changed to PLAYING again */
     478            5 :       if (GST_BASE_SRC_IS_STARTED (GST_BASE_SRC (self)) &&
     479            5 :           (self->is_connected == FALSE)) {
     480            0 :         int conn = MQTTAsync_reconnect (self->mqtt_client_handle);
     481              : 
     482            0 :         if (conn != MQTTASYNC_SUCCESS) {
     483            0 :           GST_ERROR_OBJECT (self, "Failed to re-subscribe to %s",
     484              :               self->mqtt_topic);
     485              : 
     486            0 :           return GST_STATE_CHANGE_FAILURE;
     487              :         }
     488              :       }
     489            5 :       break;
     490           17 :     default:
     491           17 :       break;
     492              :   }
     493              : 
     494           32 :   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
     495              : 
     496           32 :   switch (transition) {
     497            5 :     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
     498            5 :       if (self->is_subscribed && !_unsubscribe (self)) {
     499            0 :         GST_ERROR_OBJECT (self, "Cannot unsubscribe to %s", self->mqtt_topic);
     500              :       }
     501            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
     502            5 :       break;
     503            5 :     case GST_STATE_CHANGE_PAUSED_TO_READY:
     504            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
     505            5 :       break;
     506            5 :     case GST_STATE_CHANGE_READY_TO_NULL:
     507            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
     508              :     default:
     509           22 :       break;
     510              :   }
     511              : 
     512           32 :   if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
     513            0 :     ret = GST_STATE_CHANGE_NO_PREROLL;
     514              : 
     515           32 :   return ret;
     516              : }
     517              : 
     518              : /**
     519              :  * @brief Start mqttsrc, called when state changed null to ready
     520              :  */
     521              : static gboolean
     522            5 : gst_mqtt_src_start (GstBaseSrc * basesrc)
     523              : {
     524            5 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     525            5 :   gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
     526              :       self->mqtt_host_port);
     527              :   int ret;
     528              :   gint64 end_time;
     529              : 
     530            5 :   if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
     531            5 :     g_free (self->mqtt_client_id);
     532            5 :     self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
     533            5 :         g_get_host_name (), getpid (), src_client_id++);
     534              :   }
     535              : 
     536              :   /**
     537              :    * @todo Support other persistence mechanisms
     538              :    *    MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
     539              :    *    MQTTCLIENT_PERSISTENCE_DEFAULT: The default file system-based
     540              :    *                                    persistence mechanism
     541              :    *    MQTTCLIENT_PERSISTENCE_USER: An application-specific persistence
     542              :    *                                 mechanism
     543              :    */
     544           10 :   ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
     545            5 :       self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
     546            5 :   g_free (haddr);
     547            5 :   if (ret != MQTTASYNC_SUCCESS)
     548            0 :     return FALSE;
     549              : 
     550            5 :   MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
     551              :       cb_mqtt_on_connection_lost, cb_mqtt_on_message_arrived, NULL);
     552              : 
     553            5 :   ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
     554            5 :   if (ret != MQTTASYNC_SUCCESS)
     555            0 :     goto error;
     556              : 
     557              :   /* Waiting for the connection */
     558            5 :   end_time = g_get_monotonic_time () +
     559              :       DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
     560            5 :   g_mutex_lock (&self->mqtt_src_mutex);
     561            5 :   while (!self->is_connected) {
     562            0 :     if (!g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex,
     563              :             end_time)) {
     564            0 :       g_mutex_unlock (&self->mqtt_src_mutex);
     565            0 :       g_critical ("Failed to connect to MQTT broker from mqttsrc."
     566              :           "Please check broker is running status or broker host address.");
     567            0 :       goto error;
     568              :     }
     569              :   }
     570            5 :   g_mutex_unlock (&self->mqtt_src_mutex);
     571            5 :   return TRUE;
     572              : 
     573            0 : error:
     574            0 :   MQTTAsync_destroy (&self->mqtt_client_handle);
     575            0 :   self->mqtt_client_handle = NULL;
     576            0 :   return FALSE;
     577              : }
     578              : 
     579              : /**
     580              :  * @brief Stop mqttsrc, called when state changed ready to null
     581              :  */
     582              : static gboolean
     583            5 : gst_mqtt_src_stop (GstBaseSrc * basesrc)
     584              : {
     585            5 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     586              : 
     587              :   /* todo */
     588            5 :   MQTTAsync_disconnect (self->mqtt_client_handle, NULL);
     589            5 :   g_mutex_lock (&self->mqtt_src_mutex);
     590            5 :   self->is_connected = FALSE;
     591            5 :   g_mutex_unlock (&self->mqtt_src_mutex);
     592            5 :   MQTTAsync_destroy (&self->mqtt_client_handle);
     593            5 :   self->mqtt_client_handle = NULL;
     594            5 :   return TRUE;
     595              : }
     596              : 
     597              : /**
     598              :  * @brief Get caps of subclass
     599              :  */
     600              : static GstCaps *
     601           47 : gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
     602              : {
     603           47 :   GstPad *pad = GST_BASE_SRC_PAD (basesrc);
     604           47 :   GstCaps *cur_caps = gst_pad_get_current_caps (pad);
     605           47 :   GstCaps *caps = gst_caps_new_any ();
     606              :   UNUSED (filter);
     607              : 
     608           47 :   if (cur_caps) {
     609              :     GstCaps *intersection =
     610            0 :         gst_caps_intersect_full (cur_caps, caps, GST_CAPS_INTERSECT_FIRST);
     611              : 
     612            0 :     gst_caps_unref (cur_caps);
     613            0 :     gst_caps_unref (caps);
     614            0 :     caps = intersection;
     615              :   }
     616              : 
     617           47 :   return caps;
     618              : }
     619              : 
     620              : /**
     621              :  * @brief Do negotiation procedure again if it needed
     622              :  */
     623              : static gboolean
     624            4 : gst_mqtt_src_renegotiate (GstBaseSrc * basesrc)
     625              : {
     626            4 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     627            4 :   GstCaps *peercaps = NULL;
     628              :   GstCaps *thiscaps;
     629            4 :   gboolean result = FALSE;
     630              : 
     631            4 :   if (self->caps == NULL || gst_caps_is_any (self->caps))
     632            0 :     goto no_nego_needed;
     633              : 
     634            4 :   thiscaps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (basesrc));
     635            4 :   if (thiscaps && gst_caps_is_equal (self->caps, thiscaps)) {
     636            0 :     gst_caps_unref (thiscaps);
     637            0 :     goto no_nego_needed;
     638              :   }
     639              : 
     640            4 :   if (thiscaps)
     641            1 :     gst_caps_unref (thiscaps);
     642              : 
     643            4 :   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), self->caps);
     644            4 :   if (gst_caps_is_empty (peercaps) || peercaps == self->caps) {
     645            1 :     gst_caps_unref (peercaps);
     646            1 :     goto no_nego_needed;
     647              :   }
     648              : 
     649            3 :   if (gst_caps_is_any (peercaps)) {
     650            0 :     result = TRUE;
     651              :   } else {
     652            3 :     peercaps = gst_caps_fixate (peercaps);
     653            3 :     if (gst_caps_is_fixed (peercaps)) {
     654            3 :       result = gst_base_src_set_caps (basesrc, peercaps);
     655              :     }
     656              :   }
     657              : 
     658            3 :   gst_caps_unref (peercaps);
     659              : 
     660            3 :   return result;
     661              : 
     662            1 : no_nego_needed:
     663              :   {
     664            1 :     GST_DEBUG_OBJECT (self, "no negotiation needed");
     665              : 
     666            1 :     return TRUE;
     667              :   }
     668              : }
     669              : 
     670              : /**
     671              :  * @brief Return the time information of the given buffer
     672              :  */
     673              : static void
     674            4 : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
     675              :     GstClockTime * start, GstClockTime * end)
     676              : {
     677              :   GstClockTime sync_ts;
     678              :   GstClockTime duration;
     679              :   UNUSED (basesrc);
     680              : 
     681            4 :   sync_ts = GST_BUFFER_DTS (buffer);
     682            4 :   duration = GST_BUFFER_DURATION (buffer);
     683              : 
     684            4 :   if (!GST_CLOCK_TIME_IS_VALID (sync_ts))
     685            0 :     sync_ts = GST_BUFFER_PTS (buffer);
     686              : 
     687            4 :   if (GST_CLOCK_TIME_IS_VALID (sync_ts)) {
     688            4 :     *start = sync_ts;
     689            4 :     if (GST_CLOCK_TIME_IS_VALID (duration)) {
     690            4 :       *end = sync_ts + duration;
     691              :     }
     692              :   }
     693            4 : }
     694              : 
     695              : /**
     696              :  * @brief Check if source supports seeking
     697              :  * @note Seeking is not supported since this element handles live subscription data.
     698              :  */
     699              : static gboolean
     700            5 : gst_mqtt_src_is_seekable (GstBaseSrc * basesrc)
     701              : {
     702              :   UNUSED (basesrc);
     703            5 :   return FALSE;
     704              : }
     705              : 
     706              : /**
     707              :  * @brief Create a buffer containing the subscribed data
     708              :  */
     709              : static GstFlowReturn
     710            6 : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
     711              :     GstBuffer ** buf)
     712              : {
     713            6 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     714            6 :   gint64 elapsed = self->mqtt_sub_timeout;
     715              :   UNUSED (offset);
     716              :   UNUSED (size);
     717              : 
     718            6 :   g_mutex_lock (&self->mqtt_src_mutex);
     719            6 :   while ((!self->is_connected) || (!self->is_subscribed)) {
     720            2 :     gint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
     721              : 
     722            2 :     g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex, end_time);
     723            2 :     if (self->err) {
     724            2 :       g_mutex_unlock (&self->mqtt_src_mutex);
     725            2 :       goto ret_flow_err;
     726              :     }
     727              :   }
     728            4 :   g_mutex_unlock (&self->mqtt_src_mutex);
     729              : 
     730            4 :   while (elapsed > 0) {
     731              :     /** @todo DEFAULT_MQTT_SUB_TIMEOUT_MIN is too long */
     732            4 :     *buf = g_async_queue_timeout_pop (self->aqueue,
     733              :         DEFAULT_MQTT_SUB_TIMEOUT_MIN);
     734            4 :     if (*buf) {
     735            4 :       GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
     736            4 :       GstClockTime ulatency = GST_CLOCK_TIME_NONE;
     737              :       GstClock *clock;
     738              : 
     739              :       /** This buffer is coming from the past. Drop it. */
     740            4 :       if (!_is_gst_buffer_timestamp_valid (*buf)) {
     741            0 :         if (self->debug) {
     742            0 :           GST_DEBUG_OBJECT (self,
     743              :               "%s: Dumped the received buffer! (total: %" G_GUINT64_FORMAT ")",
     744              :               self->mqtt_topic, ++self->num_dumped);
     745              :         }
     746            0 :         elapsed = self->mqtt_sub_timeout;
     747            0 :         gst_buffer_unref (*buf);
     748            0 :         continue;
     749              :       }
     750              : 
     751              :       /** Update latency */
     752            4 :       clock = gst_element_get_clock (GST_ELEMENT (self));
     753            4 :       if (clock) {
     754            4 :         GstClockTime cur_time = gst_clock_get_time (clock);
     755            4 :         GstClockTime buf_ts = GST_BUFFER_TIMESTAMP (*buf);
     756            4 :         GstClockTimeDiff latency = 0;
     757              : 
     758            4 :         if ((base_time != GST_CLOCK_TIME_NONE) &&
     759            4 :             (cur_time != GST_CLOCK_TIME_NONE) &&
     760              :             (buf_ts != GST_CLOCK_TIME_NONE)) {
     761            4 :           GstClockTimeDiff now = GST_CLOCK_DIFF (base_time, cur_time);
     762              : 
     763            4 :           latency = GST_CLOCK_DIFF (buf_ts, (GstClockTime) now);
     764              :         }
     765              : 
     766            4 :         if (latency > 0) {
     767            0 :           ulatency = (GstClockTime) latency;
     768              : 
     769            0 :           if (GST_BUFFER_DURATION_IS_VALID (*buf)) {
     770            0 :             GstClockTime duration = GST_BUFFER_DURATION (*buf);
     771              : 
     772            0 :             if (duration >= ulatency) {
     773            0 :               ulatency = GST_CLOCK_TIME_NONE;
     774              :             }
     775              :           }
     776              :         }
     777            4 :         gst_object_unref (clock);
     778              :       }
     779              : 
     780            4 :       g_mutex_lock (&self->mqtt_src_mutex);
     781            4 :       self->latency = ulatency;
     782            4 :       g_mutex_unlock (&self->mqtt_src_mutex);
     783              :       /**
     784              :        * @todo If the difference between new latency and old latency,
     785              :        *      gst_element_post_message (GST_ELEMENT_CAST (self),
     786              :        *          gst_message_new_latency (GST_OBJECT_CAST (self)));
     787              :        *      is needed.
     788              :        */
     789            4 :       break;
     790            0 :     } else if (self->err) {
     791            0 :       break;
     792              :     }
     793            0 :     elapsed = elapsed - DEFAULT_MQTT_SUB_TIMEOUT_MIN;
     794              :   }
     795              : 
     796            4 :   if (*buf == NULL) {
     797              :     /** @todo: Send EoS here */
     798            0 :     if (!self->err)
     799            0 :       self->err = g_error_new (self->gquark_err_tag, GST_FLOW_EOS,
     800              :           "%s: Timeout for receiving a message has been expired. Regarding as an error",
     801              :           __func__);
     802            0 :     goto ret_flow_err;
     803              :   }
     804              : 
     805            4 :   return GST_FLOW_OK;
     806              : 
     807            2 : ret_flow_err:
     808            2 :   if (self->err) {
     809            2 :     g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
     810            2 :         self->err->message);
     811              :   }
     812            2 :   return GST_FLOW_ERROR;
     813              : }
     814              : 
     815              : /**
     816              :  * @brief An implementation of the GstBaseSrc vmethod that handles queries
     817              :  */
     818              : static gboolean
     819           55 : gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query)
     820              : {
     821           55 :   GstQueryType type = GST_QUERY_TYPE (query);
     822           55 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     823           55 :   gboolean res = FALSE;
     824              : 
     825           55 :   if (self->debug)
     826           53 :     GST_DEBUG_OBJECT (self, "Got %s event", gst_query_type_get_name (type));
     827              : 
     828           55 :   switch (type) {
     829            3 :     case GST_QUERY_LATENCY:{
     830            3 :       GstClockTime min_latency = 0;
     831            3 :       GstClockTime max_latency = GST_CLOCK_TIME_NONE;
     832              : 
     833            3 :       g_mutex_lock (&self->mqtt_src_mutex);
     834            3 :       if (self->latency != GST_CLOCK_TIME_NONE) {
     835            0 :         min_latency = self->latency;
     836              :       }
     837            3 :       g_mutex_unlock (&self->mqtt_src_mutex);
     838              : 
     839            3 :       if (self->debug) {
     840            3 :         GST_DEBUG_OBJECT (self,
     841              :             "Reporting latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT,
     842              :             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
     843              :       }
     844              :       /**
     845              :        * @brief The second argument of gst_query_set_latency should be always
     846              :        *        TRUE.
     847              :        */
     848            3 :       gst_query_set_latency (query, TRUE, min_latency, max_latency);
     849              : 
     850            3 :       res = TRUE;
     851            3 :       break;
     852              :     }
     853           52 :     default:{
     854           52 :       res = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
     855              :     }
     856              :   }
     857              : 
     858           55 :   return res;
     859              : }
     860              : 
     861              : /**
     862              :  * @brief Getter for the 'debug' property.
     863              :  */
     864              : static gboolean
     865            2 : gst_mqtt_src_get_debug (GstMqttSrc * self)
     866              : {
     867            2 :   return self->debug;
     868              : }
     869              : 
     870              : /**
     871              :  * @brief Setter for the 'debug' property.
     872              :  */
     873              : static void
     874            6 : gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag)
     875              : {
     876            6 :   self->debug = flag;
     877            6 : }
     878              : 
     879              : /**
     880              :  * @brief Getter for the 'is-live' property.
     881              :  */
     882              : static gboolean
     883            1 : gst_mqtt_src_get_is_live (GstMqttSrc * self)
     884              : {
     885            1 :   return self->is_live;
     886              : }
     887              : 
     888              : /**
     889              :  * @brief Setter for the 'is-live' property.
     890              :  */
     891              : static void
     892            6 : gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag)
     893              : {
     894            6 :   self->is_live = flag;
     895            6 :   gst_base_src_set_live (GST_BASE_SRC (self), self->is_live);
     896            6 : }
     897              : 
     898              : /**
     899              :  * @brief Getter for the 'client-id' property.
     900              :  */
     901              : static gchar *
     902            1 : gst_mqtt_src_get_client_id (GstMqttSrc * self)
     903              : {
     904            1 :   return self->mqtt_client_id;
     905              : }
     906              : 
     907              : /**
     908              :  * @brief Setter for the 'client-id' property.
     909              :  */
     910              : static void
     911            1 : gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id)
     912              : {
     913            1 :   g_free (self->mqtt_client_id);
     914            1 :   self->mqtt_client_id = g_strdup (id);
     915            1 : }
     916              : 
     917              : /**
     918              :  * @brief Getter for the 'host' property.
     919              :  */
     920              : static gchar *
     921            1 : gst_mqtt_src_get_host_address (GstMqttSrc * self)
     922              : {
     923            1 :   return self->mqtt_host_address;
     924              : }
     925              : 
     926              : /**
     927              :  * @brief Setter for the 'host' property
     928              :  */
     929              : static void
     930            1 : gst_mqtt_src_set_host_address (GstMqttSrc * self, const gchar * addr)
     931              : {
     932              :   /**
     933              :    * @todo Handle the case where the addr is changed at runtime
     934              :    */
     935            1 :   g_free (self->mqtt_host_address);
     936            1 :   self->mqtt_host_address = g_strdup (addr);
     937            1 : }
     938              : 
     939              : /**
     940              :  * @brief Getter for the 'port' property.
     941              :  */
     942              : static gchar *
     943            1 : gst_mqtt_src_get_host_port (GstMqttSrc * self)
     944              : {
     945            1 :   return self->mqtt_host_port;
     946              : }
     947              : 
     948              : /**
     949              :  * @brief Setter for the 'port' property
     950              :  */
     951              : static void
     952            1 : gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port)
     953              : {
     954            1 :   g_free (self->mqtt_host_port);
     955            1 :   self->mqtt_host_port = g_strdup (port);
     956            1 : }
     957              : 
     958              : /**
     959              :  * @brief Getter for the 'sub-timeout' property
     960              :  */
     961              : static gint64
     962            2 : gst_mqtt_src_get_sub_timeout (GstMqttSrc * self)
     963              : {
     964            2 :   return self->mqtt_sub_timeout;
     965              : }
     966              : 
     967              : /**
     968              :  * @brief Setter for the 'sub-timeout' property
     969              :  */
     970              : static void
     971            6 : gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t)
     972              : {
     973            6 :   self->mqtt_sub_timeout = t;
     974            6 : }
     975              : 
     976              : /**
     977              :  * @brief Getter for the 'sub-topic' property
     978              :  */
     979              : static gchar *
     980            1 : gst_mqtt_src_get_sub_topic (GstMqttSrc * self)
     981              : {
     982            1 :   return self->mqtt_topic;
     983              : }
     984              : 
     985              : /**
     986              :  * @brief Setter for the 'sub-topic' property
     987              :  */
     988              : static void
     989            6 : gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic)
     990              : {
     991            6 :   g_free (self->mqtt_topic);
     992            6 :   self->mqtt_topic = g_strdup (topic);
     993            6 : }
     994              : 
     995              : /**
     996              :  * @brief Getter for the 'cleansession' property.
     997              :  */
     998              : static gboolean
     999            1 : gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self)
    1000              : {
    1001            1 :   return self->mqtt_conn_opts.cleansession;
    1002              : }
    1003              : 
    1004              : /**
    1005              :  * @brief Setter for the 'cleansession' property.
    1006              :  */
    1007              : static void
    1008            1 : gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self, const gboolean val)
    1009              : {
    1010            1 :   self->mqtt_conn_opts.cleansession = val;
    1011            1 : }
    1012              : 
    1013              : /**
    1014              :  * @brief Getter for the 'keep-alive-interval' property
    1015              :  */
    1016              : static gint
    1017            2 : gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self)
    1018              : {
    1019            2 :   return self->mqtt_conn_opts.keepAliveInterval;
    1020              : }
    1021              : 
    1022              : /**
    1023              :  * @brief Setter for the 'keep-alive-interval' property
    1024              :  */
    1025              : static void
    1026            1 : gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self, const gint num)
    1027              : {
    1028            1 :   self->mqtt_conn_opts.keepAliveInterval = num;
    1029            1 : }
    1030              : 
    1031              : /**
    1032              :  * @brief Getter for the 'mqtt-qos' property
    1033              :  */
    1034              : static gint
    1035            2 : gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self)
    1036              : {
    1037            2 :   return self->mqtt_qos;
    1038              : }
    1039              : 
    1040              : /**
    1041              :  * @brief Setter for the 'mqtt-qos' property
    1042              :  */
    1043              : static void
    1044            1 : gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos)
    1045              : {
    1046            1 :   self->mqtt_qos = qos;
    1047            1 : }
    1048              : 
    1049              : /**
    1050              :   * @brief A callback to handle the connection lost to the broker
    1051              :   */
    1052              : static void
    1053            0 : cb_mqtt_on_connection_lost (void *context, char *cause)
    1054              : {
    1055            0 :   GstMqttSrc *self = GST_MQTT_SRC_CAST (context);
    1056              :   UNUSED (cause);
    1057              : 
    1058            0 :   g_mutex_lock (&self->mqtt_src_mutex);
    1059            0 :   self->is_connected = FALSE;
    1060            0 :   self->is_subscribed = FALSE;
    1061            0 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1062            0 :   if (!self->err) {
    1063            0 :     self->err = g_error_new (self->gquark_err_tag, EHOSTDOWN,
    1064              :         "Connection to the host (broker) has been lost: %s \n"
    1065              :         "\t\tfor detail, please check the log message of the broker",
    1066              :         g_strerror (EHOSTDOWN));
    1067              :   }
    1068            0 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1069            0 : }
    1070              : 
    1071              : /**
    1072              :   * @brief A callback to handle the arrived message
    1073              :   */
    1074              : static int
    1075            6 : cb_mqtt_on_message_arrived (void *context, char *topic_name, int topic_len,
    1076              :     MQTTAsync_message * message)
    1077              : {
    1078            6 :   const int size = message->payloadlen;
    1079            6 :   guint8 *data = message->payload;
    1080              :   GstMQTTMessageHdr *mqtt_msg_hdr;
    1081              :   GstMapInfo hdr_map_info;
    1082              :   GstMemory *received_mem;
    1083              :   GstMemory *hdr_mem;
    1084              :   GstBuffer *buffer;
    1085              :   GstBaseSrc *basesrc;
    1086              :   GstMqttSrc *self;
    1087              :   GstClock *clock;
    1088              :   GstCaps *recv_caps;
    1089              :   gsize offset;
    1090              :   guint i;
    1091              :   UNUSED (topic_name);
    1092              :   UNUSED (topic_len);
    1093              : 
    1094            6 :   self = GST_MQTT_SRC_CAST (context);
    1095            6 :   g_mutex_lock (&self->mqtt_src_mutex);
    1096            6 :   if (!self->is_subscribed) {
    1097            2 :     g_mutex_unlock (&self->mqtt_src_mutex);
    1098              : 
    1099            6 :     return TRUE;
    1100              :   }
    1101            4 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1102              : 
    1103            4 :   basesrc = GST_BASE_SRC (self);
    1104            4 :   clock = gst_element_get_clock (GST_ELEMENT (self));
    1105            4 :   received_mem = gst_memory_new_wrapped (0, data, size, 0, size, message,
    1106              :       (GDestroyNotify) cb_memory_wrapped_destroy);
    1107            4 :   if (!received_mem) {
    1108            0 :     if (!self->err) {
    1109            0 :       self->err = g_error_new (self->gquark_err_tag, ENODATA,
    1110              :           "%s: failed to wrap the raw data of received message in GstMemory: %s",
    1111              :           __func__, g_strerror (ENODATA));
    1112              :     }
    1113            0 :     return TRUE;
    1114              :   }
    1115              : 
    1116            4 :   mqtt_msg_hdr = _extract_mqtt_msg_hdr_from (received_mem, &hdr_mem,
    1117              :       &hdr_map_info);
    1118            4 :   if (!mqtt_msg_hdr) {
    1119            0 :     if (!self->err) {
    1120            0 :       self->err = g_error_new (self->gquark_err_tag, ENODATA,
    1121              :           "%s: failed to extract header information from received message: %s",
    1122              :           __func__, g_strerror (ENODATA));
    1123              :     }
    1124            0 :     goto ret_unref_received_mem;
    1125              :   }
    1126              : 
    1127            4 :   recv_caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
    1128            4 :   if (recv_caps) {
    1129            4 :     if (!self->caps || !gst_caps_is_equal (self->caps, recv_caps)) {
    1130            4 :       gst_caps_replace (&self->caps, recv_caps);
    1131            4 :       gst_mqtt_src_renegotiate (basesrc);
    1132              :     }
    1133              : 
    1134            4 :     gst_caps_unref (recv_caps);
    1135              :   }
    1136              : 
    1137            4 :   buffer = gst_buffer_new ();
    1138            4 :   offset = GST_MQTT_LEN_MSG_HDR;
    1139            8 :   for (i = 0; i < mqtt_msg_hdr->num_mems; ++i) {
    1140              :     GstMemory *each_memory;
    1141              :     int each_size;
    1142              : 
    1143            4 :     each_size = mqtt_msg_hdr->size_mems[i];
    1144            4 :     each_memory = gst_memory_share (received_mem, offset, each_size);
    1145            4 :     gst_buffer_append_memory (buffer, each_memory);
    1146            4 :     offset += each_size;
    1147              :   }
    1148              : 
    1149              :   /** Timestamp synchronization */
    1150            4 :   if (self->debug) {
    1151            4 :     GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
    1152              : 
    1153            4 :     if (clock) {
    1154            4 :       GST_DEBUG_OBJECT (self,
    1155              :           "A message has been arrived at %" GST_TIME_FORMAT
    1156              :           " and queue length is %d",
    1157              :           GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
    1158              :           g_async_queue_length (self->aqueue));
    1159              : 
    1160            4 :       gst_object_unref (clock);
    1161              :     }
    1162              :   }
    1163            4 :   _put_timestamp_on_gst_buf (self, mqtt_msg_hdr, buffer);
    1164            4 :   g_async_queue_push (self->aqueue, buffer);
    1165              : 
    1166            4 :   gst_memory_unmap (hdr_mem, &hdr_map_info);
    1167            4 :   gst_memory_unref (hdr_mem);
    1168              : 
    1169            4 : ret_unref_received_mem:
    1170            4 :   gst_memory_unref (received_mem);
    1171              : 
    1172            4 :   return TRUE;
    1173              : }
    1174              : 
    1175              : /**
    1176              :   * @brief A callback invoked when destroying the GstMemory which wrapped the arrived message
    1177              :   */
    1178              : static void
    1179            4 : cb_memory_wrapped_destroy (void *p)
    1180              : {
    1181            4 :   MQTTAsync_message *msg = p;
    1182              : 
    1183            4 :   MQTTAsync_freeMessage (&msg);
    1184            4 : }
    1185              : 
    1186              : /**
    1187              :   * @brief A callback invoked when the connection is established
    1188              :   */
    1189              : static void
    1190            5 : cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
    1191              : {
    1192            5 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1193            5 :   GstBaseSrc *basesrc = GST_BASE_SRC (self);
    1194              :   int ret;
    1195              :   UNUSED (response);
    1196              : 
    1197            5 :   g_mutex_lock (&self->mqtt_src_mutex);
    1198            5 :   self->is_connected = TRUE;
    1199            5 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1200            5 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1201              : 
    1202              :   /** GstFlowReturn is an enum type. It is possible to use int here */
    1203            5 :   if (gst_base_src_is_async (basesrc) &&
    1204            0 :       (ret = gst_base_src_start_wait (basesrc)) != GST_FLOW_OK) {
    1205            0 :     g_mutex_lock (&self->mqtt_src_mutex);
    1206            0 :     self->err = g_error_new (self->gquark_err_tag, ret,
    1207              :         "%s: the virtual method, start (), in the GstBaseSrc class fails with return code %d",
    1208              :         __func__, ret);
    1209            0 :     g_cond_broadcast (&self->mqtt_src_gcond);
    1210            0 :     g_mutex_unlock (&self->mqtt_src_mutex);
    1211            0 :     return;
    1212              :   }
    1213              : 
    1214            5 :   if (!_subscribe (self)) {
    1215            2 :     GST_ERROR_OBJECT (self, "Failed to subscribe to %s", self->mqtt_topic);
    1216              :   }
    1217              : }
    1218              : 
    1219              : /**
    1220              :   * @brief A callback invoked when it is failed to connect to the broker
    1221              :   */
    1222              : static void
    1223            0 : cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
    1224              : {
    1225            0 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1226              : 
    1227            0 :   g_mutex_lock (&self->mqtt_src_mutex);
    1228            0 :   self->is_connected = FALSE;
    1229              : 
    1230            0 :   if (!self->err) {
    1231            0 :     self->err = g_error_new (self->gquark_err_tag, response->code,
    1232              :         "%s: failed to connect to the broker: %s", __func__, response->message);
    1233              :   }
    1234            0 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1235            0 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1236            0 : }
    1237              : 
    1238              : /**
    1239              :  * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_subscribe ()
    1240              :  */
    1241              : static void
    1242            3 : cb_mqtt_on_subscribe (void *context, MQTTAsync_successData * response)
    1243              : {
    1244            3 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1245              :   UNUSED (response);
    1246              : 
    1247            3 :   g_mutex_lock (&self->mqtt_src_mutex);
    1248            3 :   self->is_subscribed = TRUE;
    1249            3 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1250            3 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1251            3 : }
    1252              : 
    1253              : /**
    1254              :  * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_subscribe ()
    1255              :  */
    1256              : static void
    1257            2 : cb_mqtt_on_subscribe_failure (void *context, MQTTAsync_failureData * response)
    1258              : {
    1259            2 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1260              : 
    1261            2 :   g_mutex_lock (&self->mqtt_src_mutex);
    1262            2 :   if (!self->err) {
    1263            2 :     self->err = g_error_new (self->gquark_err_tag, response->code,
    1264              :         "%s: failed to subscribe the given topic, %s: %s", __func__,
    1265              :         self->mqtt_topic, response->message);
    1266              :   }
    1267            2 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1268            2 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1269            2 : }
    1270              : 
    1271              : /**
    1272              :  * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_unsubscribe ()
    1273              :  */
    1274              : static void
    1275            3 : cb_mqtt_on_unsubscribe (void *context, MQTTAsync_successData * response)
    1276              : {
    1277            3 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1278              :   UNUSED (response);
    1279              : 
    1280            3 :   g_mutex_lock (&self->mqtt_src_mutex);
    1281            3 :   self->is_subscribed = FALSE;
    1282            3 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1283            3 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1284            3 : }
    1285              : 
    1286              : /**
    1287              :  * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_unsubscribe ()
    1288              :  */
    1289              : static void
    1290            0 : cb_mqtt_on_unsubscribe_failure (void *context, MQTTAsync_failureData * response)
    1291              : {
    1292            0 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1293              : 
    1294            0 :   g_mutex_lock (&self->mqtt_src_mutex);
    1295            0 :   if (!self->err) {
    1296            0 :     self->err = g_error_new (self->gquark_err_tag, response->code,
    1297              :         "%s: failed to unsubscribe the given topic, %s: %s", __func__,
    1298              :         self->mqtt_topic, response->message);
    1299              :   }
    1300            0 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1301            0 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1302            0 : }
    1303              : 
    1304              : /**
    1305              :  * @brief A helper function to properly invoke MQTTAsync_subscribe ()
    1306              :  */
    1307              : static gboolean
    1308            5 : _subscribe (GstMqttSrc * self)
    1309              : {
    1310            5 :   MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
    1311              :   int mqttasync_ret;
    1312              : 
    1313            5 :   opts.onSuccess = cb_mqtt_on_subscribe;
    1314            5 :   opts.onFailure = cb_mqtt_on_subscribe_failure;
    1315            5 :   opts.subscribeOptions.retainHandling = 1;
    1316              : 
    1317           10 :   mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
    1318            5 :       self->mqtt_topic, self->mqtt_qos, &opts);
    1319            5 :   if (mqttasync_ret != MQTTASYNC_SUCCESS)
    1320            5 :     return FALSE;
    1321            3 :   return TRUE;
    1322              : }
    1323              : 
    1324              : /**
    1325              :  * @brief A wrapper function that calls MQTTAsync_unsubscribe ()
    1326              :  */
    1327              : static gboolean
    1328            3 : _unsubscribe (GstMqttSrc * self)
    1329              : {
    1330            3 :   MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
    1331              :   int mqttasync_ret;
    1332              : 
    1333            3 :   opts.onSuccess = cb_mqtt_on_unsubscribe;
    1334            3 :   opts.onFailure = cb_mqtt_on_unsubscribe_failure;
    1335              : 
    1336            6 :   mqttasync_ret = MQTTAsync_unsubscribe (self->mqtt_client_handle,
    1337            3 :       self->mqtt_topic, &opts);
    1338            3 :   if (mqttasync_ret != MQTTASYNC_SUCCESS)
    1339            3 :     return FALSE;
    1340            3 :   return TRUE;
    1341              : }
    1342              : 
    1343              : /**
    1344              :  * @brief A utility function to extract header information from a received message
    1345              :  */
    1346              : static GstMQTTMessageHdr *
    1347            4 : _extract_mqtt_msg_hdr_from (GstMemory * mem, GstMemory ** hdr_mem,
    1348              :     GstMapInfo * hdr_map_info)
    1349              : {
    1350            4 :   *hdr_mem = gst_memory_share (mem, 0, GST_MQTT_LEN_MSG_HDR);
    1351            4 :   g_return_val_if_fail (*hdr_mem != NULL, NULL);
    1352              : 
    1353            4 :   if (!gst_memory_map (*hdr_mem, hdr_map_info, GST_MAP_READ)) {
    1354            0 :     gst_memory_unref (*hdr_mem);
    1355            0 :     return NULL;
    1356              :   }
    1357              : 
    1358            4 :   return (GstMQTTMessageHdr *) hdr_map_info->data;
    1359              : }
    1360              : 
    1361              : /**
    1362              :   * @brief A utility function to put the timestamp information
    1363              :   *        onto a GstBuffer-typed buffer using the given packet header
    1364              :   */
    1365              : static void
    1366            4 : _put_timestamp_on_gst_buf (GstMqttSrc * self, GstMQTTMessageHdr * hdr,
    1367              :     GstBuffer * buf)
    1368              : {
    1369            4 :   gint64 diff_base_epoch = hdr->base_time_epoch - self->base_time_epoch;
    1370              : 
    1371            4 :   buf->pts = GST_CLOCK_TIME_NONE;
    1372            4 :   buf->dts = GST_CLOCK_TIME_NONE;
    1373            4 :   buf->duration = GST_CLOCK_TIME_NONE;
    1374              : 
    1375            4 :   if (hdr->sent_time_epoch < self->base_time_epoch)
    1376            0 :     return;
    1377              : 
    1378            4 :   if (((GstClockTimeDiff) hdr->pts + diff_base_epoch) < 0)
    1379            0 :     return;
    1380              : 
    1381            4 :   if (hdr->pts != GST_CLOCK_TIME_NONE) {
    1382            4 :     buf->pts = hdr->pts + diff_base_epoch;
    1383              :   }
    1384              : 
    1385            4 :   if (hdr->dts != GST_CLOCK_TIME_NONE) {
    1386            4 :     buf->dts = hdr->dts + diff_base_epoch;
    1387              :   }
    1388              : 
    1389            4 :   buf->duration = hdr->duration;
    1390              : 
    1391            4 :   if (self->debug) {
    1392            4 :     GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
    1393              :     GstClock *clock;
    1394              : 
    1395            4 :     clock = gst_element_get_clock (GST_ELEMENT (self));
    1396              : 
    1397            4 :     if (clock) {
    1398            4 :       GST_DEBUG_OBJECT (self,
    1399              :           "%s diff %" GST_STIME_FORMAT " now %" GST_TIME_FORMAT " ts (%"
    1400              :           GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ")", self->mqtt_topic,
    1401              :           GST_STIME_ARGS (diff_base_epoch),
    1402              :           GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
    1403              :           GST_TIME_ARGS (hdr->pts), GST_TIME_ARGS (buf->pts));
    1404              : 
    1405            4 :       gst_object_unref (clock);
    1406              :     }
    1407              :   }
    1408              : }
        

Generated by: LCOV version 2.0-1