Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2022 Samsung Electronics Co., Ltd.
4 : *
5 : * @file edge_sink.c
6 : * @date 01 Aug 2022
7 : * @brief Publish incoming streams
8 : * @author Yechan Choi <yechan9.choi@samsung.com>
9 : * @see http://github.com/nnstreamer/nnstreamer
10 : * @bug No known bugs
11 : *
12 : */
13 : #ifdef HAVE_CONFIG_H
14 : #include <config.h>
15 : #endif
16 :
17 : #include "edge_sink.h"
18 :
19 : GST_DEBUG_CATEGORY_STATIC (gst_edgesink_debug);
20 : #define GST_CAT_DEFAULT gst_edgesink_debug
21 :
22 : /**
23 : * @brief the capabilities of the inputs.
24 : */
25 : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
26 : GST_PAD_SINK,
27 : GST_PAD_ALWAYS,
28 : GST_STATIC_CAPS_ANY);
29 :
30 : /**
31 : * @brief edgesink properties
32 : */
33 : enum
34 : {
35 : PROP_0,
36 :
37 : PROP_HOST,
38 : PROP_PORT,
39 : PROP_DEST_HOST,
40 : PROP_DEST_PORT,
41 : PROP_CONNECT_TYPE,
42 : PROP_TOPIC,
43 : PROP_WAIT_CONNECTION,
44 : PROP_CONNECTION_TIMEOUT,
45 : PROP_CUSTOM_LIB,
46 :
47 : PROP_LAST
48 : };
49 : #define DEFAULT_MQTT_HOST "127.0.0.1"
50 : #define DEFAULT_MQTT_PORT 1883
51 :
52 : #define gst_edgesink_parent_class parent_class
53 86 : G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
54 :
55 : static void gst_edgesink_set_property (GObject * object,
56 : guint prop_id, const GValue * value, GParamSpec * pspec);
57 :
58 : static void gst_edgesink_get_property (GObject * object,
59 : guint prop_id, GValue * value, GParamSpec * pspec);
60 :
61 : static void gst_edgesink_finalize (GObject * object);
62 :
63 : static gboolean gst_edgesink_start (GstBaseSink * basesink);
64 : static gboolean gst_edgesink_stop (GstBaseSink * basesink);
65 : static GstFlowReturn gst_edgesink_render (GstBaseSink * basesink,
66 : GstBuffer * buffer);
67 : static gboolean gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps);
68 :
69 : static gchar *gst_edgesink_get_host (GstEdgeSink * self);
70 : static void gst_edgesink_set_host (GstEdgeSink * self, const gchar * host);
71 :
72 : static guint16 gst_edgesink_get_port (GstEdgeSink * self);
73 : static void gst_edgesink_set_port (GstEdgeSink * self, const guint16 port);
74 :
75 : static nns_edge_connect_type_e gst_edgesink_get_connect_type (GstEdgeSink *
76 : self);
77 : static void gst_edgesink_set_connect_type (GstEdgeSink * self,
78 : const nns_edge_connect_type_e connect_type);
79 :
80 : /**
81 : * @brief initialize the class
82 : */
83 : static void
84 2 : gst_edgesink_class_init (GstEdgeSinkClass * klass)
85 : {
86 2 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
87 2 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
88 2 : GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
89 :
90 2 : gobject_class->set_property = gst_edgesink_set_property;
91 2 : gobject_class->get_property = gst_edgesink_get_property;
92 2 : gobject_class->finalize = gst_edgesink_finalize;
93 :
94 2 : g_object_class_install_property (gobject_class, PROP_HOST,
95 : g_param_spec_string ("host", "Host",
96 : "A self host address to accept connection from edgesrc", DEFAULT_HOST,
97 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98 2 : g_object_class_install_property (gobject_class, PROP_PORT,
99 : g_param_spec_uint ("port", "Port",
100 : "A self port address to accept connection from edgesrc. "
101 : "If the port is set to 0 then the available port is allocated. ",
102 : 0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
103 2 : g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
104 : g_param_spec_enum ("connect-type", "Connect Type",
105 : "The connections type between edgesink and edgesrc.",
106 : GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
107 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
108 2 : g_object_class_install_property (gobject_class, PROP_DEST_HOST,
109 : g_param_spec_string ("dest-host", "Destination Host",
110 : "The destination hostname of the broker", DEFAULT_MQTT_HOST,
111 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
112 2 : g_object_class_install_property (gobject_class, PROP_DEST_PORT,
113 : g_param_spec_uint ("dest-port", "Destination Port",
114 : "The destination port of the broker", 0,
115 : 65535, DEFAULT_MQTT_PORT,
116 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
117 2 : g_object_class_install_property (gobject_class, PROP_TOPIC,
118 : g_param_spec_string ("topic", "Topic",
119 : "The main topic of the host and option if necessary. "
120 : "(topic)/(optional topic for main topic).", "",
121 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
122 2 : g_object_class_install_property (gobject_class, PROP_WAIT_CONNECTION,
123 : g_param_spec_boolean ("wait-connection", "Wait connection to edgesrc",
124 : "Wait until edgesink is connected to edgesrc. "
125 : "In case of false(default), the buffers entering the edgesink are dropped.",
126 : FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127 2 : g_object_class_install_property (gobject_class, PROP_CONNECTION_TIMEOUT,
128 : g_param_spec_uint64 ("connection-timeout",
129 : "Timeout for waiting a connection",
130 : "The timeout (in milliseconds) for waiting a connection to receiver. "
131 : "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
132 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133 2 : g_object_class_install_property (gobject_class, PROP_CUSTOM_LIB,
134 : g_param_spec_string ("custom-lib", "Custom connection lib path",
135 : "User defined custom connection lib path.",
136 : "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137 :
138 2 : gst_element_class_add_pad_template (gstelement_class,
139 : gst_static_pad_template_get (&sinktemplate));
140 :
141 2 : gst_element_class_set_static_metadata (gstelement_class,
142 : "EdgeSink", "Sink/Edge",
143 : "Publish incoming streams", "Samsung Electronics Co., Ltd.");
144 :
145 2 : gstbasesink_class->start = gst_edgesink_start;
146 2 : gstbasesink_class->stop = gst_edgesink_stop;
147 2 : gstbasesink_class->render = gst_edgesink_render;
148 2 : gstbasesink_class->set_caps = gst_edgesink_set_caps;
149 :
150 2 : GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
151 : GST_EDGE_ELEM_NAME_SINK, 0, "Edge sink");
152 2 : }
153 :
154 : /**
155 : * @brief initialize the new element
156 : */
157 : static void
158 7 : gst_edgesink_init (GstEdgeSink * self)
159 : {
160 7 : self->host = g_strdup (DEFAULT_HOST);
161 7 : self->port = DEFAULT_PORT;
162 7 : self->dest_host = g_strdup (DEFAULT_HOST);
163 7 : self->dest_port = DEFAULT_PORT;
164 7 : self->topic = NULL;
165 7 : self->connect_type = DEFAULT_CONNECT_TYPE;
166 7 : self->wait_connection = FALSE;
167 7 : self->connection_timeout = 0;
168 7 : self->custom_lib = NULL;
169 7 : self->is_connected = FALSE;
170 7 : g_mutex_init (&self->lock);
171 7 : g_cond_init (&self->cond);
172 7 : }
173 :
174 : /**
175 : * @brief set property
176 : */
177 : static void
178 18 : gst_edgesink_set_property (GObject * object, guint prop_id,
179 : const GValue * value, GParamSpec * pspec)
180 : {
181 18 : GstEdgeSink *self = GST_EDGESINK (object);
182 :
183 18 : switch (prop_id) {
184 2 : case PROP_HOST:
185 2 : gst_edgesink_set_host (self, g_value_get_string (value));
186 2 : break;
187 7 : case PROP_PORT:
188 7 : gst_edgesink_set_port (self, g_value_get_uint (value));
189 7 : break;
190 1 : case PROP_DEST_HOST:
191 1 : if (!g_value_get_string (value)) {
192 0 : nns_logw ("dest host property cannot be NULL");
193 0 : break;
194 : }
195 1 : g_free (self->dest_host);
196 1 : self->dest_host = g_value_dup_string (value);
197 1 : break;
198 1 : case PROP_DEST_PORT:
199 1 : self->dest_port = g_value_get_uint (value);
200 1 : break;
201 4 : case PROP_CONNECT_TYPE:
202 4 : gst_edgesink_set_connect_type (self, g_value_get_enum (value));
203 4 : break;
204 1 : case PROP_TOPIC:
205 1 : if (!g_value_get_string (value)) {
206 0 : nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
207 0 : break;
208 : }
209 1 : g_free (self->topic);
210 1 : self->topic = g_value_dup_string (value);
211 1 : break;
212 0 : case PROP_WAIT_CONNECTION:
213 0 : self->wait_connection = g_value_get_boolean (value);
214 0 : break;
215 0 : case PROP_CONNECTION_TIMEOUT:
216 0 : self->connection_timeout = g_value_get_uint64 (value);
217 0 : break;
218 2 : case PROP_CUSTOM_LIB:
219 2 : g_free (self->custom_lib);
220 2 : self->custom_lib = g_value_dup_string (value);
221 2 : break;
222 0 : default:
223 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
224 0 : break;
225 : }
226 18 : }
227 :
228 : /**
229 : * @brief get property
230 : */
231 : static void
232 16 : gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
233 : GParamSpec * pspec)
234 : {
235 16 : GstEdgeSink *self = GST_EDGESINK (object);
236 :
237 16 : switch (prop_id) {
238 2 : case PROP_HOST:
239 2 : g_value_set_string (value, gst_edgesink_get_host (self));
240 2 : break;
241 3 : case PROP_PORT:
242 3 : g_value_set_uint (value, gst_edgesink_get_port (self));
243 3 : break;
244 2 : case PROP_DEST_HOST:
245 2 : g_value_set_string (value, self->dest_host);
246 2 : break;
247 2 : case PROP_DEST_PORT:
248 2 : g_value_set_uint (value, self->dest_port);
249 2 : break;
250 2 : case PROP_CONNECT_TYPE:
251 2 : g_value_set_enum (value, gst_edgesink_get_connect_type (self));
252 2 : break;
253 2 : case PROP_TOPIC:
254 2 : g_value_set_string (value, self->topic);
255 2 : break;
256 1 : case PROP_WAIT_CONNECTION:
257 1 : g_value_set_boolean (value, self->wait_connection);
258 1 : break;
259 1 : case PROP_CONNECTION_TIMEOUT:
260 1 : g_value_set_uint64 (value, self->connection_timeout);
261 1 : break;
262 1 : case PROP_CUSTOM_LIB:
263 1 : g_value_set_string (value, self->custom_lib);
264 1 : break;
265 0 : default:
266 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
267 0 : break;
268 : }
269 16 : }
270 :
271 : /**
272 : * @brief finalize the object
273 : */
274 : static void
275 6 : gst_edgesink_finalize (GObject * object)
276 : {
277 6 : GstEdgeSink *self = GST_EDGESINK (object);
278 :
279 6 : g_free (self->host);
280 6 : self->host = NULL;
281 :
282 6 : g_free (self->dest_host);
283 6 : self->dest_host = NULL;
284 :
285 6 : g_free (self->topic);
286 6 : self->topic = NULL;
287 :
288 6 : g_free (self->custom_lib);
289 6 : self->custom_lib = NULL;
290 6 : g_mutex_clear (&self->lock);
291 6 : g_cond_clear (&self->cond);
292 :
293 6 : if (self->edge_h) {
294 2 : nns_edge_release_handle (self->edge_h);
295 2 : self->edge_h = NULL;
296 : }
297 :
298 6 : G_OBJECT_CLASS (parent_class)->finalize (object);
299 6 : }
300 :
301 :
302 : /**
303 : * @brief nnstreamer-edge event callback.
304 : */
305 : static int
306 1 : _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
307 : {
308 : nns_edge_event_e event_type;
309 1 : int ret = NNS_EDGE_ERROR_NONE;
310 :
311 1 : GstEdgeSink *self = GST_EDGESINK (user_data);
312 1 : ret = nns_edge_event_get_type (event_h, &event_type);
313 1 : if (NNS_EDGE_ERROR_NONE != ret) {
314 0 : nns_loge ("Failed to get event type!");
315 1 : return ret;
316 : }
317 :
318 1 : switch (event_type) {
319 1 : case NNS_EDGE_EVENT_CONNECTION_COMPLETED:
320 : {
321 1 : g_mutex_lock (&self->lock);
322 1 : self->is_connected = TRUE;
323 1 : g_cond_broadcast (&self->cond);
324 1 : g_mutex_unlock (&self->lock);
325 1 : break;
326 : }
327 0 : default:
328 0 : break;
329 : }
330 :
331 1 : return ret;
332 : }
333 :
334 : /**
335 : * @brief start processing of edgesink
336 : */
337 : static gboolean
338 5 : gst_edgesink_start (GstBaseSink * basesink)
339 : {
340 5 : GstEdgeSink *self = GST_EDGESINK (basesink);
341 :
342 : int ret;
343 5 : char *port = NULL;
344 :
345 5 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM != self->connect_type) {
346 2 : ret = nns_edge_create_handle (NULL, self->connect_type,
347 : NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
348 : } else {
349 3 : if (!self->custom_lib) {
350 1 : nns_loge ("Failed to start edgesink. Custom library is not set.");
351 1 : return FALSE;
352 : }
353 2 : ret = nns_edge_custom_create_handle (NULL, self->custom_lib,
354 : NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
355 : }
356 :
357 4 : if (NNS_EDGE_ERROR_NONE != ret) {
358 1 : nns_loge ("Failed to get nnstreamer edge handle.");
359 :
360 1 : if (self->edge_h) {
361 1 : nns_edge_release_handle (self->edge_h);
362 1 : self->edge_h = NULL;
363 : }
364 :
365 1 : return FALSE;
366 : }
367 :
368 3 : if (self->host)
369 3 : nns_edge_set_info (self->edge_h, "HOST", self->host);
370 3 : if (self->port > 0) {
371 1 : port = g_strdup_printf ("%u", self->port);
372 1 : nns_edge_set_info (self->edge_h, "PORT", port);
373 1 : g_free (port);
374 : }
375 3 : if (self->dest_host)
376 3 : nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
377 3 : if (self->dest_port > 0) {
378 3 : port = g_strdup_printf ("%u", self->dest_port);
379 3 : nns_edge_set_info (self->edge_h, "DEST_PORT", port);
380 3 : g_free (port);
381 : }
382 3 : if (self->topic)
383 0 : nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
384 :
385 3 : nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
386 :
387 3 : if (0 != nns_edge_start (self->edge_h)) {
388 1 : nns_loge
389 : ("Failed to start NNStreamer-edge. Please check server IP and port");
390 1 : return FALSE;
391 : }
392 :
393 2 : return TRUE;
394 : }
395 :
396 : /**
397 : * @brief If wait-connection is enabled, wait for connection until the connection is established or timeout occurs. Otherwise, return immediately.
398 : */
399 : static gboolean
400 13 : _wait_connection (GstEdgeSink *sink)
401 : {
402 : gint64 end_time;
403 : gboolean connected;
404 :
405 13 : if (!sink->wait_connection)
406 13 : return TRUE;
407 :
408 0 : if (0 == sink->connection_timeout) {
409 0 : end_time = G_MAXINT64;
410 : } else {
411 0 : end_time = g_get_monotonic_time ()
412 0 : + sink->connection_timeout * G_TIME_SPAN_MILLISECOND;
413 : }
414 :
415 0 : g_mutex_lock (&sink->lock);
416 0 : while (!sink->is_connected) {
417 0 : if (!g_cond_wait_until (&sink->cond, &sink->lock, end_time)) {
418 0 : nns_loge ("Failed to wait connection.");
419 0 : break;
420 : }
421 : }
422 0 : connected = sink->is_connected;
423 0 : g_mutex_unlock (&sink->lock);
424 :
425 0 : return connected;
426 : }
427 :
428 : /**
429 : * @brief Stop processing of edgesink
430 : */
431 : static gboolean
432 1 : gst_edgesink_stop (GstBaseSink * basesink)
433 : {
434 1 : GstEdgeSink *self = GST_EDGESINK (basesink);
435 : int ret;
436 :
437 1 : ret = nns_edge_stop (self->edge_h);
438 1 : if (NNS_EDGE_ERROR_NONE != ret) {
439 0 : nns_loge ("Failed to stop edge. error code(%d)", ret);
440 0 : return FALSE;
441 : }
442 :
443 1 : return TRUE;
444 : }
445 :
446 : /**
447 : * @brief render buffer, send buffer
448 : */
449 : static GstFlowReturn
450 13 : gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buffer)
451 : {
452 13 : GstEdgeSink *self = GST_EDGESINK (basesink);
453 : GstCaps *caps;
454 : GstStructure *structure;
455 : gboolean is_tensor;
456 : nns_edge_data_h data_h;
457 : guint i, num_mems;
458 : int ret;
459 : GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
460 : GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
461 :
462 13 : if (!_wait_connection (self)) {
463 0 : nns_loge ("Failed to send buffer.");
464 13 : return GST_FLOW_ERROR;
465 : }
466 :
467 13 : ret = nns_edge_data_create (&data_h);
468 13 : if (ret != NNS_EDGE_ERROR_NONE) {
469 0 : nns_loge ("Failed to create data handle in edgesink");
470 0 : return GST_FLOW_ERROR;
471 : }
472 :
473 13 : caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (basesink));
474 13 : structure = gst_caps_get_structure (caps, 0);
475 13 : is_tensor = gst_structure_is_tensor_stream (structure);
476 13 : gst_caps_unref (caps);
477 :
478 13 : if (is_tensor)
479 13 : num_mems = gst_tensor_buffer_get_count (buffer);
480 : else
481 0 : num_mems = gst_buffer_n_memory (buffer);
482 :
483 26 : for (i = 0; i < num_mems; i++) {
484 13 : if (is_tensor)
485 13 : mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
486 : else
487 0 : mem[i] = gst_buffer_get_memory (buffer, i);
488 :
489 13 : if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
490 0 : nns_loge ("Cannot map the %uth memory in gst-buffer.", i);
491 0 : gst_memory_unref (mem[i]);
492 0 : num_mems = i;
493 0 : goto done;
494 : }
495 :
496 13 : ret = nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
497 13 : if (ret != NNS_EDGE_ERROR_NONE) {
498 0 : nns_loge ("Failed to append %u-th memory into edge data.", i);
499 0 : num_mems = i + 1;
500 0 : goto done;
501 : }
502 : }
503 :
504 13 : ret = nns_edge_send (self->edge_h, data_h);
505 13 : if (ret != NNS_EDGE_ERROR_NONE)
506 12 : nns_loge ("Failed to send edge data, connection lost or internal error.");
507 :
508 1 : done:
509 13 : if (data_h)
510 13 : nns_edge_data_destroy (data_h);
511 :
512 26 : for (i = 0; i < num_mems; i++) {
513 13 : gst_memory_unmap (mem[i], &map[i]);
514 13 : gst_memory_unref (mem[i]);
515 : }
516 :
517 13 : return GST_FLOW_OK;
518 : }
519 :
520 : /**
521 : * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
522 : */
523 : static gboolean
524 2 : gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps)
525 : {
526 2 : GstEdgeSink *sink = GST_EDGESINK (basesink);
527 : gchar *caps_str, *prev_caps_str, *new_caps_str;
528 : int set_rst;
529 :
530 2 : caps_str = gst_caps_to_string (caps);
531 :
532 2 : nns_edge_get_info (sink->edge_h, "CAPS", &prev_caps_str);
533 2 : if (!prev_caps_str) {
534 0 : prev_caps_str = g_strdup ("");
535 : }
536 : new_caps_str =
537 2 : g_strdup_printf ("%s@edge_sink_caps@%s", prev_caps_str, caps_str);
538 2 : set_rst = nns_edge_set_info (sink->edge_h, "CAPS", new_caps_str);
539 :
540 2 : g_free (prev_caps_str);
541 2 : g_free (new_caps_str);
542 2 : g_free (caps_str);
543 :
544 2 : return set_rst == NNS_EDGE_ERROR_NONE;
545 : }
546 :
547 : /**
548 : * @brief getter for the 'host' property.
549 : */
550 : static gchar *
551 2 : gst_edgesink_get_host (GstEdgeSink * self)
552 : {
553 2 : return self->host;
554 : }
555 :
556 : /**
557 : * @brief setter for the 'host' property.
558 : */
559 : static void
560 2 : gst_edgesink_set_host (GstEdgeSink * self, const gchar * host)
561 : {
562 2 : if (self->host)
563 2 : g_free (self->host);
564 2 : self->host = g_strdup (host);
565 2 : }
566 :
567 : /**
568 : * @brief getter for the 'port' property.
569 : */
570 : static guint16
571 3 : gst_edgesink_get_port (GstEdgeSink * self)
572 : {
573 3 : return self->port;
574 : }
575 :
576 : /**
577 : * @brief setter for the 'port' property.
578 : */
579 : static void
580 7 : gst_edgesink_set_port (GstEdgeSink * self, const guint16 port)
581 : {
582 7 : self->port = port;
583 7 : }
584 :
585 : /**
586 : * @brief getter for the 'connect_type' property.
587 : */
588 : static nns_edge_connect_type_e
589 2 : gst_edgesink_get_connect_type (GstEdgeSink * self)
590 : {
591 2 : return self->connect_type;
592 : }
593 :
594 : /**
595 : * @brief setter for the 'connect_type' property.
596 : */
597 : static void
598 4 : gst_edgesink_set_connect_type (GstEdgeSink * self,
599 : const nns_edge_connect_type_e connect_type)
600 : {
601 4 : self->connect_type = connect_type;
602 4 : }
|