Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * GStreamer Tensor_Src_gRPC
4 : * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
5 : */
6 : /**
7 : * @file tensor_src_grpc.c
8 : * @date 20 Oct 2020
9 : * @brief GStreamer plugin to support gRPC tensor source
10 : * @see http://github.com/nnstreamer/nnstreamer
11 : * @author Dongju Chae <dongju.chae@samsung.com>
12 : * @bug No known bugs except for NYI items
13 : */
14 :
15 : /**
16 : * SECTION:element-tensor_src_grpc
17 : *
18 : * #tensor_src_grpc extends #gstpushsrc source element to handle gRPC
19 : * requests as either server or client.
20 : *
21 : * <refsect2>
22 : * <title>Example launch line</title>
23 : * |[
24 : * gst-launch -v -m tensor_src_grpc ! 'other/tensor,dimension=(string)1:1:1:1,type=(string)uint8,framerate=(fraction)10/1' ! fakesink
25 : * ]|
26 : * </refsect2>
27 : */
28 :
29 : #ifdef HAVE_CONFIG_H
30 : #include <config.h>
31 : #endif
32 :
33 : #include <string.h>
34 : #include <errno.h>
35 :
36 : #include <gst/gst.h>
37 : #include <glib.h>
38 :
39 : #include <tensor_typedef.h>
40 : #include <nnstreamer_plugin_api.h>
41 : #include <nnstreamer_log.h>
42 :
43 : #include "tensor_src_grpc.h"
44 : #include "nnstreamer_grpc.h"
45 :
46 : /**
47 : * @brief Macro for debug mode.
48 : */
49 : #ifndef DBG
50 : #define DBG (!self->silent)
51 : #endif
52 :
53 : /**
54 : * @brief Macro for debug message.
55 : */
56 : #define silent_debug(...) do { \
57 : if (DBG) { \
58 : GST_DEBUG_OBJECT (self, __VA_ARGS__); \
59 : } \
60 : } while (0)
61 :
62 : GST_DEBUG_CATEGORY_STATIC (gst_tensor_src_grpc_debug);
63 : #define GST_CAT_DEFAULT gst_tensor_src_grpc_debug
64 :
65 : /**
66 : * @brief Flag to print minimized log
67 : */
68 : #define DEFAULT_PROP_SILENT TRUE
69 :
70 : /**
71 : * @brief Default gRPC server mode for tensor source
72 : */
73 : #define DEFAULT_PROP_SERVER TRUE
74 :
75 : /**
76 : * @brief Default gRPC blocking mode for tensor source
77 : */
78 : #define DEFAULT_PROP_BLOCKING TRUE
79 :
80 : /**
81 : * @brief Default IDL for RPC comm.
82 : */
83 : #define DEFAULT_PROP_IDL "protobuf"
84 :
85 : /**
86 : * @brief Default host and port
87 : */
88 : #define DEFAULT_PROP_HOST "localhost"
89 : #define DEFAULT_PROP_PORT 55115
90 :
91 : #define GST_TENSOR_SRC_GRPC_SCALED_TIME(self, count)\
92 : gst_util_uint64_scale (count, \
93 : self->config.rate_d * GST_SECOND, self->config.rate_n)
94 :
95 : #define CAPS_STRING GST_TENSOR_CAP_DEFAULT "; " GST_TENSORS_CAP_DEFAULT
96 :
97 : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
98 : GST_PAD_SRC,
99 : GST_PAD_ALWAYS,
100 : GST_STATIC_CAPS (CAPS_STRING));
101 :
102 : #define GET_GRPC_PRIVATE(arg) (grpc_private *) (arg->priv)
103 :
104 : /** GObject method implementation */
105 : static void gst_tensor_src_grpc_finalize (GObject * object);
106 : static void gst_tensor_src_grpc_set_property (GObject * object, guint prop_id,
107 : const GValue * value, GParamSpec * pspec);
108 : static void gst_tensor_src_grpc_get_property (GObject * object, guint prop_id,
109 : GValue * value, GParamSpec * pspec);
110 :
111 : /** GstBaseSrc method implementation */
112 : static gboolean gst_tensor_src_grpc_start (GstBaseSrc * src);
113 : static gboolean gst_tensor_src_grpc_stop (GstBaseSrc * src);
114 : static gboolean gst_tensor_src_grpc_set_caps (GstBaseSrc * src, GstCaps * caps);
115 : static gboolean gst_tensor_src_grpc_unlock (GstBaseSrc * src);
116 : static gboolean gst_tensor_src_grpc_unlock_stop (GstBaseSrc * src);
117 :
118 : /** GstPushSrc method implementation */
119 : static GstFlowReturn gst_tensor_src_grpc_create (GstPushSrc * psrc,
120 : GstBuffer ** buf);
121 :
122 : /** internal functions */
123 : #define gst_tensor_src_grpc_parent_class parent_class
124 391 : G_DEFINE_TYPE (GstTensorSrcGRPC, gst_tensor_src_grpc, GST_TYPE_PUSH_SRC);
125 :
126 : /**
127 : * @brief initialize the tensor_src_grpc class.
128 : */
129 : static void
130 27 : gst_tensor_src_grpc_class_init (GstTensorSrcGRPCClass * klass)
131 : {
132 27 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
133 27 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
134 27 : GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
135 27 : GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
136 :
137 27 : gobject_class->set_property = gst_tensor_src_grpc_set_property;
138 27 : gobject_class->get_property = gst_tensor_src_grpc_get_property;
139 27 : gobject_class->finalize = gst_tensor_src_grpc_finalize;
140 :
141 : /* install properties */
142 27 : g_object_class_install_property (gobject_class, PROP_SILENT,
143 : g_param_spec_boolean ("silent", "Silent",
144 : "Dont' produce verbose output",
145 : DEFAULT_PROP_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
146 :
147 27 : g_object_class_install_property (gobject_class, PROP_SERVER,
148 : g_param_spec_boolean ("server", "Server",
149 : "Specify its working mode either server or client",
150 : DEFAULT_PROP_SERVER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151 :
152 27 : g_object_class_install_property (gobject_class, PROP_BLOCKING,
153 : g_param_spec_boolean ("blocking", "Blocking",
154 : "Specify its working mode either blocking or non-blocking",
155 : DEFAULT_PROP_BLOCKING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
156 :
157 27 : g_object_class_install_property (gobject_class, PROP_IDL,
158 : g_param_spec_string ("idl", "IDL",
159 : "Specify Interface Description Language (IDL) for communication",
160 : DEFAULT_PROP_IDL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
161 :
162 27 : g_object_class_install_property (gobject_class, PROP_HOST,
163 : g_param_spec_string ("host", "Host",
164 : "The hostname to listen as or connect",
165 : DEFAULT_PROP_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
166 :
167 27 : g_object_class_install_property (gobject_class, PROP_PORT,
168 : g_param_spec_int ("port", "Port",
169 : "The port to listen to (0=random available port) or connect",
170 : 0, G_MAXUSHORT, DEFAULT_PROP_PORT,
171 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172 :
173 27 : g_object_class_install_property (gobject_class, PROP_OUT,
174 : g_param_spec_uint ("out", "Out",
175 : "The number of output buffers generated",
176 : 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
177 :
178 27 : gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
179 :
180 27 : gst_element_class_set_static_metadata (gstelement_class,
181 : "TensorSrcGRPC", "Source/Network",
182 : "Receive nnstreamer protocol buffers as a gRPC server/client",
183 : "Dongju Chae <dongju.chae@samsung.com>");
184 :
185 : /* GstBasrSrcClass */
186 27 : gstbasesrc_class->start = gst_tensor_src_grpc_start;
187 27 : gstbasesrc_class->stop = gst_tensor_src_grpc_stop;
188 27 : gstbasesrc_class->set_caps = gst_tensor_src_grpc_set_caps;
189 27 : gstbasesrc_class->unlock = gst_tensor_src_grpc_unlock;
190 27 : gstbasesrc_class->unlock_stop = gst_tensor_src_grpc_unlock_stop;
191 :
192 : /* GstPushSrcClass */
193 27 : gstpushsrc_class->create = gst_tensor_src_grpc_create;
194 :
195 27 : GST_DEBUG_CATEGORY_INIT (gst_tensor_src_grpc_debug,
196 : "tensor_src_grpc", 0,
197 : "src element to support protocol buffers as a gRPC server/client");
198 27 : }
199 :
200 : /**
201 : * @brief callback for checking data_queue full
202 : */
203 : static gboolean
204 124 : _data_queue_check_full_cb (GstDataQueue * queue, guint visible,
205 : guint bytes, guint64 time, gpointer checkdata)
206 : {
207 : /** it's dummy */
208 124 : return FALSE;
209 : }
210 :
211 : /**
212 : * @brief destroy callback for a data queue item
213 : */
214 : static void
215 4 : _data_queue_item_free (GstDataQueueItem * item)
216 : {
217 4 : if (item->object)
218 4 : gst_buffer_unref (GST_BUFFER (item->object));
219 4 : g_free (item);
220 4 : }
221 :
222 : /**
223 : * @brief send eos event to downstream elements
224 : */
225 : static void
226 0 : _send_eos_event (GstTensorSrcGRPC * self)
227 : {
228 0 : GstPad *srcpad = GST_BASE_SRC_PAD (&self->element);
229 0 : GstEvent *eos = gst_event_new_eos ();
230 0 : gst_pad_push_event (srcpad, eos);
231 0 : }
232 :
233 : /**
234 : * @brief callback function for gRPC requests
235 : */
236 : static void
237 124 : _grpc_callback (void *obj, void *data)
238 : {
239 : GstTensorSrcGRPC *self;
240 :
241 : GstBuffer *buffer;
242 : GstClockTime duration;
243 : GstClockTime timestamp;
244 : GstDataQueueItem *item;
245 :
246 124 : g_return_if_fail (obj != NULL);
247 124 : g_return_if_fail (data != NULL);
248 :
249 124 : self = GST_TENSOR_SRC_GRPC_CAST (obj);
250 124 : buffer = (GstBuffer *) data;
251 :
252 124 : GST_OBJECT_LOCK (self);
253 :
254 124 : if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_STARTED) ||
255 124 : !GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_CONFIGURED)) {
256 0 : gst_buffer_unref (buffer);
257 :
258 0 : GST_OBJECT_UNLOCK (self);
259 0 : return;
260 : }
261 :
262 124 : if (self->config.rate_n != 0) {
263 124 : duration = GST_TENSOR_SRC_GRPC_SCALED_TIME (self, 1);
264 124 : timestamp = GST_TENSOR_SRC_GRPC_SCALED_TIME (self, self->out++);
265 : } else {
266 0 : duration = 0;
267 0 : timestamp = 0;
268 : }
269 :
270 124 : GST_BUFFER_DURATION (buffer) = duration;
271 124 : GST_BUFFER_PTS (buffer) = timestamp;
272 :
273 124 : item = g_new0 (GstDataQueueItem, 1);
274 124 : item->object = GST_MINI_OBJECT (buffer);
275 124 : item->size = gst_buffer_get_size (buffer);
276 124 : item->visible = TRUE;
277 124 : item->destroy = (GDestroyNotify) _data_queue_item_free;
278 :
279 124 : if (!gst_data_queue_push (self->queue, item)) {
280 0 : item->destroy (item);
281 0 : ml_logw ("Failed to push item because we're flushing");
282 : } else {
283 124 : silent_debug ("new buffer: timestamp %" GST_TIME_FORMAT " duration %"
284 : GST_TIME_FORMAT, GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
285 : }
286 :
287 124 : GST_OBJECT_UNLOCK (self);
288 :
289 : /* special case: framerate == (fraction)0/1 */
290 124 : if (duration == 0)
291 0 : _send_eos_event (self);
292 : }
293 :
294 : /**
295 : * @brief initialize grpc config.
296 : */
297 : static void
298 20 : grpc_config_init (GstTensorSrcGRPC * self)
299 : {
300 20 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
301 :
302 20 : grpc->config.is_server = DEFAULT_PROP_SERVER;
303 20 : grpc->config.is_blocking = DEFAULT_PROP_BLOCKING;
304 20 : grpc->config.idl = grpc_get_idl (DEFAULT_PROP_IDL);
305 20 : grpc->config.dir = GRPC_DIRECTION_BUFFER_TO_TENSORS;
306 20 : grpc->config.port = DEFAULT_PROP_PORT;
307 20 : grpc->config.host = g_strdup (DEFAULT_PROP_HOST);
308 20 : grpc->config.cb = _grpc_callback;
309 20 : grpc->config.cb_data = (void *) self;
310 20 : grpc->config.config = &self->config;
311 20 : }
312 :
313 : /**
314 : * @brief initialize tensor_src_grpc element.
315 : */
316 : static void
317 20 : gst_tensor_src_grpc_init (GstTensorSrcGRPC * self)
318 : {
319 20 : gst_tensors_config_init (&self->config);
320 :
321 20 : self->queue = gst_data_queue_new (_data_queue_check_full_cb,
322 : NULL, NULL, NULL);
323 20 : self->silent = DEFAULT_PROP_SILENT;
324 20 : self->out = 0;
325 :
326 20 : self->priv = g_new0 (grpc_private, 1);
327 20 : grpc_config_init (self);
328 :
329 20 : GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SRC_GRPC_CONFIGURED);
330 20 : GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SRC_GRPC_STARTED);
331 20 : }
332 :
333 : /**
334 : * @brief finalize tensor_src_grpc element.
335 : */
336 : static void
337 20 : gst_tensor_src_grpc_finalize (GObject * object)
338 : {
339 20 : GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (object);
340 20 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
341 :
342 20 : g_free (grpc->config.host);
343 20 : g_free (grpc);
344 20 : g_clear_pointer (&self->queue, gst_object_unref);
345 20 : gst_tensors_config_free (&self->config);
346 :
347 20 : G_OBJECT_CLASS (parent_class)->finalize (object);
348 20 : }
349 :
350 : /**
351 : * @brief start function
352 : */
353 : static gboolean
354 16 : gst_tensor_src_grpc_start (GstBaseSrc * src)
355 : {
356 16 : GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
357 16 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
358 : gboolean ret;
359 :
360 16 : if (grpc->instance)
361 0 : grpc_destroy (grpc->instance);
362 :
363 16 : grpc->instance = grpc_new (&grpc->config);
364 16 : if (!grpc->instance)
365 0 : return FALSE;
366 :
367 16 : ret = grpc_start (grpc->instance);
368 16 : if (ret) {
369 16 : GST_OBJECT_FLAG_SET (self, GST_TENSOR_SRC_GRPC_STARTED);
370 :
371 16 : if (grpc->config.is_server) {
372 8 : gint port = grpc_get_listening_port (grpc->instance);
373 8 : if (port > 0)
374 8 : g_object_set (self, "port", port, NULL);
375 : }
376 : }
377 :
378 16 : return ret;
379 : }
380 :
381 : /**
382 : * @brief stop function
383 : */
384 : static gboolean
385 16 : gst_tensor_src_grpc_stop (GstBaseSrc * src)
386 : {
387 16 : GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
388 16 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
389 :
390 16 : if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_STARTED))
391 16 : return TRUE;
392 :
393 0 : _send_eos_event (self);
394 :
395 0 : if (grpc->instance)
396 0 : grpc_destroy (grpc->instance);
397 0 : grpc->instance = NULL;
398 :
399 0 : GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SRC_GRPC_STARTED);
400 :
401 0 : return TRUE;
402 : }
403 :
404 : /**
405 : * @brief unlock function, flush any pending data in the data queue
406 : */
407 : static gboolean
408 16 : gst_tensor_src_grpc_unlock (GstBaseSrc * src)
409 : {
410 16 : GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
411 16 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
412 :
413 : /* notify to gRPC */
414 16 : if (grpc->instance)
415 16 : grpc_stop (grpc->instance);
416 :
417 16 : silent_debug ("Unlocking create");
418 16 : gst_data_queue_set_flushing (self->queue, TRUE);
419 :
420 16 : return TRUE;
421 : }
422 :
423 : /**
424 : * @brief unlock_stop function, clear the previous unlock request
425 : */
426 : static gboolean
427 16 : gst_tensor_src_grpc_unlock_stop (GstBaseSrc * src)
428 : {
429 16 : GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
430 :
431 16 : silent_debug ("Stopping unlock");
432 16 : gst_data_queue_set_flushing (self->queue, FALSE);
433 :
434 16 : return TRUE;
435 : }
436 :
437 : /**
438 : * @brief set caps and configure tensor_src_grpc
439 : */
440 : static gboolean
441 16 : gst_tensor_src_grpc_set_caps (GstBaseSrc * src, GstCaps * caps)
442 : {
443 : GstTensorSrcGRPC *self;
444 : GstStructure *structure;
445 :
446 16 : self = GST_TENSOR_SRC_GRPC (src);
447 :
448 16 : GST_OBJECT_LOCK (self);
449 :
450 16 : structure = gst_caps_get_structure (caps, 0);
451 16 : gst_tensors_config_from_structure (&self->config, structure);
452 :
453 16 : GST_OBJECT_FLAG_SET (self, GST_TENSOR_SRC_GRPC_CONFIGURED);
454 :
455 16 : GST_OBJECT_UNLOCK (self);
456 :
457 16 : return gst_tensors_config_validate (&self->config);
458 : }
459 :
460 : /**
461 : * @brief set a buffer which is a head item in data queue
462 : */
463 : static GstFlowReturn
464 120 : gst_tensor_src_grpc_create (GstPushSrc * src, GstBuffer ** buf)
465 : {
466 120 : GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC_CAST (src);
467 : GstDataQueueItem *item;
468 :
469 120 : if (!gst_data_queue_pop (self->queue, &item)) {
470 0 : silent_debug ("We're flushing");
471 120 : return GST_FLOW_FLUSHING;
472 : }
473 :
474 120 : *buf = GST_BUFFER (item->object);
475 120 : g_free (item);
476 :
477 120 : return GST_FLOW_OK;
478 : }
479 :
480 : /**
481 : * @brief set tensor_src_grpc properties
482 : */
483 : static void
484 81 : gst_tensor_src_grpc_set_property (GObject * object, guint prop_id,
485 : const GValue * value, GParamSpec * pspec)
486 : {
487 : GstTensorSrcGRPC *self;
488 : grpc_private *grpc;
489 :
490 81 : g_return_if_fail (GST_IS_TENSOR_SRC_GRPC (object));
491 :
492 81 : self = GST_TENSOR_SRC_GRPC (object);
493 81 : grpc = GET_GRPC_PRIVATE (self);
494 :
495 81 : grpc_common_set_property (object, &self->silent, grpc, prop_id, value, pspec);
496 : }
497 :
498 : /**
499 : * @brief get tensor_src_grpc properties
500 : */
501 : static void
502 12 : gst_tensor_src_grpc_get_property (GObject * object, guint prop_id,
503 : GValue * value, GParamSpec * pspec)
504 : {
505 : GstTensorSrcGRPC *self;
506 : grpc_private *grpc;
507 :
508 12 : g_return_if_fail (GST_IS_TENSOR_SRC_GRPC (object));
509 :
510 12 : self = GST_TENSOR_SRC_GRPC (object);
511 12 : grpc = GET_GRPC_PRIVATE (self);
512 :
513 12 : grpc_common_get_property (object, self->silent, self->out, grpc, prop_id,
514 : value, pspec);
515 : }
|