Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2021 Samsung Electronics Co., Ltd.
4 : *
5 : * @file tensor_query_serversrc.c
6 : * @date 09 Jul 2021
7 : * @brief GStreamer plugin to handle tensor query_server src
8 : * @author Junhwan Kim <jejudo.kim@samsung.com>
9 : * @see http://github.com/nnstreamer/nnstreamer
10 : * @bug No known bugs
11 : *
12 : */
13 : #ifdef HAVE_CONFIG_H
14 : #include <config.h>
15 : #endif
16 :
17 : #include <tensor_typedef.h>
18 : #include <tensor_common.h>
19 : #include "tensor_query_serversrc.h"
20 : #include "tensor_query_common.h"
21 : #include "nnstreamer_util.h"
22 :
23 : GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_serversrc_debug);
24 : #define GST_CAT_DEFAULT gst_tensor_query_serversrc_debug
25 :
26 : #define DEFAULT_PORT_SRC 3000
27 : #define DEFAULT_IS_LIVE TRUE
28 : #define DEFAULT_MQTT_HOST "127.0.0.1"
29 : #define DEFAULT_MQTT_PORT 1883
30 : #define DEFAULT_DATA_POP_TIMEOUT 100000U
31 :
32 : /**
33 : * @brief the capabilities of the outputs
34 : */
35 : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
36 : GST_PAD_SRC,
37 : GST_PAD_ALWAYS,
38 : GST_STATIC_CAPS_ANY);
39 :
40 : /**
41 : * @brief query_serversrc properties
42 : */
43 : enum
44 : {
45 : PROP_0,
46 : PROP_HOST,
47 : PROP_PORT,
48 : PROP_DEST_HOST,
49 : PROP_DEST_PORT,
50 : PROP_CONNECT_TYPE,
51 : PROP_TIMEOUT,
52 : PROP_TOPIC,
53 : PROP_ID,
54 : PROP_IS_LIVE
55 : };
56 :
57 : #define gst_tensor_query_serversrc_parent_class parent_class
58 933 : G_DEFINE_TYPE (GstTensorQueryServerSrc, gst_tensor_query_serversrc,
59 : GST_TYPE_PUSH_SRC);
60 :
61 : static GstStateChangeReturn gst_tensor_query_serversrc_change_state (GstElement
62 : * element, GstStateChange transition);
63 : static void gst_tensor_query_serversrc_set_property (GObject * object,
64 : guint prop_id, const GValue * value, GParamSpec * pspec);
65 : static void gst_tensor_query_serversrc_get_property (GObject * object,
66 : guint prop_id, GValue * value, GParamSpec * pspec);
67 : static void gst_tensor_query_serversrc_finalize (GObject * object);
68 : static GstFlowReturn gst_tensor_query_serversrc_create (GstPushSrc * psrc,
69 : GstBuffer ** buf);
70 : static gboolean gst_tensor_query_serversrc_set_caps (GstBaseSrc * bsrc,
71 : GstCaps * caps);
72 :
73 : /**
74 : * @brief initialize the query_serversrc class
75 : */
76 : static void
77 2 : gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass)
78 : {
79 : GObjectClass *gobject_class;
80 : GstElementClass *gstelement_class;
81 : GstBaseSrcClass *gstbasesrc_class;
82 : GstPushSrcClass *gstpushsrc_class;
83 :
84 2 : gstpushsrc_class = (GstPushSrcClass *) klass;
85 2 : gstbasesrc_class = (GstBaseSrcClass *) gstpushsrc_class;
86 2 : gstelement_class = (GstElementClass *) gstbasesrc_class;
87 2 : gobject_class = (GObjectClass *) gstelement_class;
88 :
89 2 : gobject_class->set_property = gst_tensor_query_serversrc_set_property;
90 2 : gobject_class->get_property = gst_tensor_query_serversrc_get_property;
91 2 : gobject_class->finalize = gst_tensor_query_serversrc_finalize;
92 2 : gstelement_class->change_state = gst_tensor_query_serversrc_change_state;
93 2 : gstpushsrc_class->create = gst_tensor_query_serversrc_create;
94 :
95 2 : g_object_class_install_property (gobject_class, PROP_HOST,
96 : g_param_spec_string ("host", "Host", "The hostname to listen as",
97 : DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98 2 : g_object_class_install_property (gobject_class, PROP_PORT,
99 : g_param_spec_uint ("port", "Port",
100 : "The port to listen to (0=random available port)", 0,
101 : 65535, DEFAULT_PORT_SRC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
102 2 : g_object_class_install_property (gobject_class, PROP_DEST_HOST,
103 : g_param_spec_string ("dest-host", "Destination Host",
104 : "The destination hostname to connect", DEFAULT_MQTT_HOST,
105 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
106 2 : g_object_class_install_property (gobject_class, PROP_DEST_PORT,
107 : g_param_spec_uint ("dest-port", "Destination Port",
108 : "The destination port to connect to (0=random available port)", 0,
109 : 65535, DEFAULT_MQTT_PORT,
110 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111 2 : g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
112 : g_param_spec_enum ("connect-type", "Connect Type", "The connection type.",
113 : GST_TYPE_QUERY_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
114 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115 2 : g_object_class_install_property (gobject_class, PROP_TIMEOUT,
116 : g_param_spec_uint ("timeout", "Timeout",
117 : "The timeout as seconds to maintain connection", 0, 3600,
118 : QUERY_DEFAULT_TIMEOUT_SEC,
119 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
120 2 : g_object_class_install_property (gobject_class, PROP_TOPIC,
121 : g_param_spec_string ("topic", "Topic",
122 : "The main topic of the host and option if necessary. "
123 : "(topic)/(optional topic for main topic).", "",
124 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
125 2 : g_object_class_install_property (gobject_class, PROP_ID,
126 : g_param_spec_uint ("id", "ID", "ID for distinguishing query servers.", 0,
127 : G_MAXUINT, DEFAULT_SERVER_ID,
128 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
129 2 : g_object_class_install_property (gobject_class, PROP_IS_LIVE,
130 : g_param_spec_boolean ("is-live", "Is Live",
131 : "Synchronize the incoming buffers' timestamp with the current running time",
132 : DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133 :
134 2 : gst_element_class_add_pad_template (gstelement_class,
135 : gst_static_pad_template_get (&srctemplate));
136 :
137 2 : gst_element_class_set_static_metadata (gstelement_class,
138 : "TensorQueryServerSrc", "Source/Tensor/Query",
139 : "Receive tensor data as a server over the network",
140 : "Samsung Electronics Co., Ltd.");
141 :
142 2 : GST_DEBUG_CATEGORY_INIT (gst_tensor_query_serversrc_debug,
143 : "tensor_query_serversrc", 0, "Tensor Query Server Source");
144 2 : }
145 :
146 : /**
147 : * @brief initialize the new query_serversrc element
148 : */
149 : static void
150 4 : gst_tensor_query_serversrc_init (GstTensorQueryServerSrc * src)
151 : {
152 4 : src->host = g_strdup (DEFAULT_HOST);
153 4 : src->port = DEFAULT_PORT_SRC;
154 4 : src->dest_host = g_strdup (DEFAULT_MQTT_HOST);
155 4 : src->dest_port = DEFAULT_MQTT_PORT;
156 4 : src->connect_type = DEFAULT_CONNECT_TYPE;
157 4 : src->timeout = QUERY_DEFAULT_TIMEOUT_SEC;
158 4 : src->topic = NULL;
159 4 : src->src_id = DEFAULT_SERVER_ID;
160 4 : src->configured = FALSE;
161 4 : src->msg_queue = g_async_queue_new ();
162 4 : src->playing = FALSE;
163 :
164 4 : gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
165 : /** set the timestamps on each buffer */
166 4 : gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE);
167 : /** set the source to be live */
168 4 : gst_base_src_set_live (GST_BASE_SRC (src), DEFAULT_IS_LIVE);
169 4 : }
170 :
171 : /**
172 : * @brief finalize the query_serversrc object
173 : */
174 : static void
175 3 : gst_tensor_query_serversrc_finalize (GObject * object)
176 : {
177 3 : GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (object);
178 : nns_edge_data_h data_h;
179 :
180 3 : g_free (src->host);
181 3 : src->host = NULL;
182 3 : g_free (src->dest_host);
183 3 : src->dest_host = NULL;
184 3 : g_free (src->topic);
185 3 : src->topic = NULL;
186 :
187 3 : while ((data_h = g_async_queue_try_pop (src->msg_queue))) {
188 0 : nns_edge_data_destroy (data_h);
189 : }
190 3 : g_async_queue_unref (src->msg_queue);
191 :
192 3 : G_OBJECT_CLASS (parent_class)->finalize (object);
193 3 : }
194 :
195 : /**
196 : * @brief nnstreamer-edge event callback.
197 : */
198 : static int
199 0 : _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
200 : {
201 : nns_edge_event_e event_type;
202 0 : int ret = NNS_EDGE_ERROR_NONE;
203 :
204 0 : GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) user_data;
205 0 : ret = nns_edge_event_get_type (event_h, &event_type);
206 0 : if (NNS_EDGE_ERROR_NONE != ret) {
207 0 : nns_loge ("Failed to get event type!");
208 0 : return ret;
209 : }
210 :
211 0 : switch (event_type) {
212 0 : case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
213 : {
214 : nns_edge_data_h data;
215 :
216 0 : ret = nns_edge_event_parse_new_data (event_h, &data);
217 0 : if (NNS_EDGE_ERROR_NONE != ret) {
218 0 : nns_loge ("Failed to parse new data received from new data event");
219 0 : return ret;
220 : }
221 0 : g_async_queue_push (src->msg_queue, data);
222 0 : break;
223 : }
224 0 : default:
225 0 : break;
226 : }
227 :
228 0 : return ret;
229 : }
230 :
231 : /**
232 : * @brief start processing of query_serversrc, setting up the server
233 : */
234 : static gboolean
235 5 : _gst_tensor_query_serversrc_start (GstTensorQueryServerSrc * src)
236 : {
237 5 : gboolean ret = FALSE;
238 :
239 5 : if (gst_tensor_query_server_add_data (src->src_id))
240 5 : ret = gst_tensor_query_server_wait_sink (src->src_id);
241 :
242 5 : if (!ret)
243 0 : nns_loge ("Failed to get server information from query server.");
244 :
245 5 : return ret;
246 : }
247 :
248 : /**
249 : * @brief start processing of query_serversrc, setting up the server
250 : */
251 : static gboolean
252 4 : _gst_tensor_query_serversrc_playing (GstTensorQueryServerSrc * src,
253 : nns_edge_connect_type_e connect_type)
254 : {
255 4 : GstTensorQueryEdgeInfo edge_info = { 0 };
256 : gboolean ret;
257 :
258 4 : edge_info.host = src->host;
259 4 : edge_info.port = src->port;
260 4 : edge_info.dest_host = src->dest_host;
261 4 : edge_info.dest_port = src->dest_port;
262 4 : edge_info.topic = src->topic;
263 4 : edge_info.cb = _nns_edge_event_cb;
264 4 : edge_info.pdata = src;
265 :
266 4 : ret = gst_tensor_query_server_prepare (src->src_id, connect_type, &edge_info);
267 :
268 4 : return ret;
269 : }
270 :
271 : /**
272 : * @brief Change state of query server src.
273 : */
274 : static GstStateChangeReturn
275 24 : gst_tensor_query_serversrc_change_state (GstElement * element,
276 : GstStateChange transition)
277 : {
278 24 : GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (element);
279 24 : GstBaseSrc *bsrc = GST_BASE_SRC (element);
280 24 : GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
281 : GstCaps *caps;
282 :
283 24 : switch (transition) {
284 4 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
285 4 : if (!_gst_tensor_query_serversrc_playing (src, src->connect_type)) {
286 1 : nns_loge ("Failed to change state from PAUSED to PLAYING.");
287 1 : return GST_STATE_CHANGE_FAILURE;
288 : }
289 :
290 3 : caps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (bsrc), NULL);
291 3 : gst_tensor_query_serversrc_set_caps (bsrc, caps);
292 3 : gst_caps_unref (caps);
293 :
294 3 : src->playing = TRUE;
295 3 : break;
296 5 : case GST_STATE_CHANGE_READY_TO_PAUSED:
297 5 : if (!_gst_tensor_query_serversrc_start (src)) {
298 0 : nns_loge ("Failed to change state from READY to PAUSED.");
299 0 : return GST_STATE_CHANGE_FAILURE;
300 : }
301 5 : break;
302 15 : default:
303 15 : break;
304 : }
305 :
306 23 : ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
307 23 : if (ret == GST_STATE_CHANGE_FAILURE) {
308 0 : nns_loge ("Failed to change state");
309 0 : return ret;
310 : }
311 :
312 23 : switch (transition) {
313 3 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
314 3 : src->playing = FALSE;
315 3 : gst_tensor_query_server_release_edge_handle (src->src_id);
316 3 : break;
317 4 : case GST_STATE_CHANGE_PAUSED_TO_READY:
318 4 : gst_tensor_query_server_remove_data (src->src_id);
319 4 : break;
320 16 : default:
321 16 : break;
322 : }
323 :
324 23 : return ret;
325 : }
326 :
327 : /**
328 : * @brief set property of query_serversrc
329 : */
330 : static void
331 14 : gst_tensor_query_serversrc_set_property (GObject * object, guint prop_id,
332 : const GValue * value, GParamSpec * pspec)
333 : {
334 14 : GstTensorQueryServerSrc *serversrc = GST_TENSOR_QUERY_SERVERSRC (object);
335 :
336 14 : switch (prop_id) {
337 4 : case PROP_HOST:
338 4 : if (!g_value_get_string (value)) {
339 0 : nns_logw ("host property cannot be NULL");
340 0 : break;
341 : }
342 4 : g_free (serversrc->host);
343 4 : serversrc->host = g_value_dup_string (value);
344 4 : break;
345 4 : case PROP_PORT:
346 4 : serversrc->port = g_value_get_uint (value);
347 4 : break;
348 1 : case PROP_DEST_HOST:
349 1 : if (!g_value_get_string (value)) {
350 0 : nns_logw ("host property cannot be NULL");
351 0 : break;
352 : }
353 1 : g_free (serversrc->dest_host);
354 1 : serversrc->dest_host = g_value_dup_string (value);
355 1 : break;
356 1 : case PROP_DEST_PORT:
357 1 : serversrc->dest_port = g_value_get_uint (value);
358 1 : break;
359 1 : case PROP_CONNECT_TYPE:
360 1 : serversrc->connect_type = g_value_get_enum (value);
361 1 : break;
362 1 : case PROP_TIMEOUT:
363 1 : serversrc->timeout = g_value_get_uint (value);
364 1 : break;
365 1 : case PROP_TOPIC:
366 1 : if (!g_value_get_string (value)) {
367 0 : nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
368 0 : break;
369 : }
370 1 : g_free (serversrc->topic);
371 1 : serversrc->topic = g_value_dup_string (value);
372 1 : break;
373 1 : case PROP_ID:
374 1 : serversrc->src_id = g_value_get_uint (value);
375 1 : break;
376 0 : case PROP_IS_LIVE:
377 0 : gst_base_src_set_live (GST_BASE_SRC (serversrc),
378 : g_value_get_boolean (value));
379 0 : break;
380 0 : default:
381 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
382 0 : break;
383 : }
384 14 : }
385 :
386 : /**
387 : * @brief get property of query_serversrc
388 : */
389 : static void
390 21 : gst_tensor_query_serversrc_get_property (GObject * object, guint prop_id,
391 : GValue * value, GParamSpec * pspec)
392 : {
393 21 : GstTensorQueryServerSrc *serversrc = GST_TENSOR_QUERY_SERVERSRC (object);
394 :
395 21 : switch (prop_id) {
396 3 : case PROP_HOST:
397 3 : g_value_set_string (value, serversrc->host);
398 3 : break;
399 3 : case PROP_PORT:
400 3 : g_value_set_uint (value, serversrc->port);
401 3 : break;
402 2 : case PROP_DEST_HOST:
403 2 : g_value_set_string (value, serversrc->dest_host);
404 2 : break;
405 2 : case PROP_DEST_PORT:
406 2 : g_value_set_uint (value, serversrc->dest_port);
407 2 : break;
408 3 : case PROP_CONNECT_TYPE:
409 3 : g_value_set_enum (value, serversrc->connect_type);
410 3 : break;
411 3 : case PROP_TIMEOUT:
412 3 : g_value_set_uint (value, serversrc->timeout);
413 3 : break;
414 2 : case PROP_TOPIC:
415 2 : g_value_set_string (value, serversrc->topic);
416 2 : break;
417 2 : case PROP_ID:
418 2 : g_value_set_uint (value, serversrc->src_id);
419 2 : break;
420 1 : case PROP_IS_LIVE:
421 1 : g_value_set_boolean (value,
422 1 : gst_base_src_is_live (GST_BASE_SRC (serversrc)));
423 1 : break;
424 0 : default:
425 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
426 0 : break;
427 : }
428 21 : }
429 :
430 : /**
431 : * @brief Get buffer from message queue.
432 : */
433 : static GstBuffer *
434 3 : _gst_tensor_query_serversrc_get_buffer (GstTensorQueryServerSrc * src)
435 : {
436 3 : nns_edge_data_h data_h = NULL;
437 3 : GstBuffer *buffer = NULL;
438 : guint i, num_data;
439 : GstMetaQuery *meta_query;
440 : int ret;
441 :
442 5 : while (src->playing && !data_h) {
443 2 : data_h = g_async_queue_timeout_pop (src->msg_queue,
444 : DEFAULT_DATA_POP_TIMEOUT);
445 : }
446 :
447 3 : if (!data_h) {
448 3 : nns_loge ("Failed to get message from the server message queue.");
449 3 : return NULL;
450 : }
451 :
452 0 : ret = nns_edge_data_get_count (data_h, &num_data);
453 0 : if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
454 0 : nns_loge ("Failed to get the number of memories of the edge data.");
455 0 : goto done;
456 : }
457 :
458 0 : buffer = gst_buffer_new ();
459 0 : for (i = 0; i < num_data; i++) {
460 0 : void *data = NULL;
461 0 : nns_size_t data_len = 0;
462 : gpointer new_data;
463 :
464 0 : nns_edge_data_get (data_h, i, &data, &data_len);
465 0 : new_data = _g_memdup (data, data_len);
466 :
467 0 : gst_buffer_append_memory (buffer,
468 : gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
469 : g_free));
470 : }
471 :
472 0 : meta_query = gst_buffer_add_meta_query (buffer);
473 0 : if (meta_query) {
474 : char *val;
475 :
476 0 : ret = nns_edge_data_get_info (data_h, "client_id", &val);
477 0 : if (NNS_EDGE_ERROR_NONE != ret) {
478 0 : gst_buffer_unref (buffer);
479 0 : buffer = NULL;
480 : } else {
481 0 : meta_query->client_id = g_ascii_strtoll (val, NULL, 10);
482 0 : g_free (val);
483 : }
484 : }
485 :
486 0 : done:
487 0 : nns_edge_data_destroy (data_h);
488 0 : return buffer;
489 : }
490 :
491 : /**
492 : * @brief create query_serversrc, wait on socket and receive data
493 : */
494 : static GstFlowReturn
495 3 : gst_tensor_query_serversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
496 : {
497 3 : GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (psrc);
498 3 : GstBaseSrc *bsrc = GST_BASE_SRC (psrc);
499 : GstStateChangeReturn sret;
500 3 : GstState state = GST_STATE_NULL;
501 :
502 3 : if (!src->configured) {
503 1 : GstCaps *caps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (bsrc), NULL);
504 1 : if (gst_caps_is_fixed (caps)) {
505 1 : gst_base_src_set_caps (bsrc, caps);
506 : }
507 :
508 1 : gst_tensor_query_serversrc_set_caps (bsrc, caps);
509 :
510 1 : gst_caps_unref (caps);
511 1 : src->configured = TRUE;
512 : }
513 :
514 3 : *outbuf = _gst_tensor_query_serversrc_get_buffer (src);
515 3 : if (*outbuf == NULL) {
516 3 : sret = gst_element_get_state (GST_ELEMENT (psrc), &state, NULL, 0);
517 3 : if (sret != GST_STATE_CHANGE_SUCCESS || state != GST_STATE_PLAYING) {
518 3 : nns_logw ("Failed to get buffer for query server, not in PLAYING state.");
519 3 : return GST_FLOW_FLUSHING;
520 : }
521 :
522 0 : nns_loge ("Failed to get buffer to push to the tensor query serversrc.");
523 0 : return GST_FLOW_ERROR;
524 : }
525 :
526 0 : return GST_FLOW_OK;
527 : }
528 :
529 : /**
530 : * @brief An implementation of the set_caps vmethod in GstBaseSrcClass
531 : */
532 : static gboolean
533 4 : gst_tensor_query_serversrc_set_caps (GstBaseSrc * bsrc, GstCaps * caps)
534 : {
535 4 : GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
536 : gchar *caps_str, *new_caps_str;
537 :
538 4 : caps_str = gst_caps_to_string (caps);
539 :
540 4 : new_caps_str = g_strdup_printf ("@query_server_src_caps@%s", caps_str);
541 4 : gst_tensor_query_server_set_caps (src->src_id, new_caps_str);
542 :
543 4 : g_free (new_caps_str);
544 4 : g_free (caps_str);
545 :
546 4 : return TRUE;
547 : }
|