Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * GStreamer Tensor_Sink_gRPC
4 : * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
5 : */
6 : /**
7 : * @file tensor_sink_grpc.c
8 : * @date 22 Oct 2020
9 : * @brief GStreamer plugin to support gRPC tensor sink
10 : * @see http://github.com/nnstreamer/nnstreamer
11 : * @author Dongju Chae <dongju.chae@samsung.com>
12 : * @bug No known bugs except for NYI items
13 : */
14 :
15 : /**
16 : * SECTION:element-tensor_sink_grpc
17 : *
18 : * #tensor_sink_grpc extends #gstbasesink sink element to emit gRPC
19 : * messages as either server or client.
20 : *
21 : * <refsect2>
22 : * <title>Example launch line</title>
23 : * |[
24 : * gst-launch -v -m videotestsrc
25 : * ! video/x-raw,format=RGB,width=640,height=480,framerate=30/1
26 : * ! tensor_converter ! tensor_sink_grpc
27 : * ]|
28 : * </refsect2>
29 : */
30 :
31 : #ifdef HAVE_CONFIG_H
32 : #include <config.h>
33 : #endif
34 :
35 : #include <string.h>
36 : #include <errno.h>
37 :
38 : #include <gst/gst.h>
39 : #include <glib.h>
40 : #include <gmodule.h>
41 :
42 : #include <tensor_typedef.h>
43 : #include <nnstreamer_plugin_api.h>
44 :
45 : #include "tensor_sink_grpc.h"
46 : #include "nnstreamer_grpc.h"
47 :
48 : /**
49 : * @brief Macro for debug mode.
50 : */
51 : #ifndef DBG
52 : #define DBG (!self->silent)
53 : #endif
54 :
55 : /**
56 : * @brief Macro for debug message.
57 : */
58 : #define silent_debug(...) do { \
59 : if (DBG) { \
60 : GST_DEBUG_OBJECT (self, __VA_ARGS__); \
61 : } \
62 : } while (0)
63 :
64 : GST_DEBUG_CATEGORY_STATIC (gst_tensor_sink_grpc_debug);
65 : #define GST_CAT_DEFAULT gst_tensor_sink_grpc_debug
66 :
67 : /**
68 : * @brief Flag to print minimized log
69 : */
70 : #define DEFAULT_PROP_SILENT TRUE
71 :
72 : /**
73 : * @brief Default gRPC server mode for tensor sink
74 : */
75 : #define DEFAULT_PROP_SERVER FALSE
76 :
77 : /**
78 : * @brief Default gRPC blocking mode for tensor sink
79 : */
80 : #define DEFAULT_PROP_BLOCKING TRUE
81 :
82 : /**
83 : * @brief Default IDL for RPC comm.
84 : */
85 : #define DEFAULT_PROP_IDL "protobuf"
86 :
87 : /**
88 : * @brief Default host and port
89 : */
90 : #define DEFAULT_PROP_HOST "localhost"
91 : #define DEFAULT_PROP_PORT 55115
92 :
93 : #define CAPS_STRING GST_TENSOR_CAP_DEFAULT "; " GST_TENSORS_CAP_DEFAULT
94 :
95 : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
96 : GST_PAD_SINK,
97 : GST_PAD_ALWAYS,
98 : GST_STATIC_CAPS (CAPS_STRING));
99 :
100 : #define GET_GRPC_PRIVATE(arg) (grpc_private *) (arg->priv)
101 :
102 : /** GObject method implementation */
103 : static void gst_tensor_sink_grpc_finalize (GObject * gobject);
104 : static void gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id,
105 : const GValue * value, GParamSpec * pspec);
106 : static void gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id,
107 : GValue * value, GParamSpec * pspec);
108 :
109 : /** GstBaseSink method implementation */
110 : static gboolean gst_tensor_sink_grpc_setcaps (GstBaseSink * sink,
111 : GstCaps * caps);
112 : static GstFlowReturn gst_tensor_sink_grpc_render (GstBaseSink * sink,
113 : GstBuffer * buf);
114 : static gboolean gst_tensor_sink_grpc_start (GstBaseSink * sink);
115 : static gboolean gst_tensor_sink_grpc_stop (GstBaseSink * sink);
116 :
117 : static gboolean gst_tensor_sink_grpc_unlock (GstBaseSink * sink);
118 :
119 : /** internal functions */
120 : #define gst_tensor_sink_grpc_parent_class parent_class
121 413 : G_DEFINE_TYPE (GstTensorSinkGRPC, gst_tensor_sink_grpc, GST_TYPE_BASE_SINK);
122 :
123 : /**
124 : * @brief initialize the tensor_sink_grpc class.
125 : */
126 : static void
127 23 : gst_tensor_sink_grpc_class_init (GstTensorSinkGRPCClass * klass)
128 : {
129 : GObjectClass *gobject_class;
130 : GstElementClass *gstelement_class;
131 : GstBaseSinkClass *gstbasesink_class;
132 :
133 23 : gobject_class = (GObjectClass *) klass;
134 23 : gstelement_class = (GstElementClass *) klass;
135 23 : gstbasesink_class = (GstBaseSinkClass *) klass;
136 :
137 23 : parent_class = g_type_class_peek_parent (klass);
138 :
139 23 : gobject_class->set_property = gst_tensor_sink_grpc_set_property;
140 23 : gobject_class->get_property = gst_tensor_sink_grpc_get_property;
141 23 : gobject_class->finalize = gst_tensor_sink_grpc_finalize;
142 :
143 : /* install properties */
144 23 : g_object_class_install_property (gobject_class, PROP_SILENT,
145 : g_param_spec_boolean ("silent", "Silent",
146 : "Dont' produce verbose output",
147 : DEFAULT_PROP_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
148 :
149 23 : g_object_class_install_property (gobject_class, PROP_SERVER,
150 : g_param_spec_boolean ("server", "Server",
151 : "Specify its working mode either server or client",
152 : DEFAULT_PROP_SERVER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
153 :
154 23 : g_object_class_install_property (gobject_class, PROP_BLOCKING,
155 : g_param_spec_boolean ("blocking", "Blocking",
156 : "Specify its working mode either blocking or non-blocking",
157 : DEFAULT_PROP_BLOCKING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158 :
159 23 : g_object_class_install_property (gobject_class, PROP_IDL,
160 : g_param_spec_string ("idl", "IDL",
161 : "Specify Interface Description Language (IDL) for communication",
162 : DEFAULT_PROP_IDL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
163 :
164 23 : g_object_class_install_property (gobject_class, PROP_HOST,
165 : g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
166 : DEFAULT_PROP_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167 :
168 23 : g_object_class_install_property (gobject_class, PROP_PORT,
169 : g_param_spec_int ("port", "Port", "The port to send the packets to",
170 : 0, G_MAXUSHORT, DEFAULT_PROP_PORT,
171 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172 :
173 23 : g_object_class_install_property (gobject_class, PROP_OUT,
174 : g_param_spec_uint ("out", "Out",
175 : "The number of output messages generated",
176 : 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
177 :
178 23 : gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
179 :
180 23 : gst_element_class_set_static_metadata (gstelement_class,
181 : "TensorSinkGRPC", "Sink/Network",
182 : "Send nnstreamer protocol buffers as gRPC server/client",
183 : "Dongju Chae <dongju.chae@samsung.com>");
184 :
185 : /* GstBaseSinkClass */
186 23 : gstbasesink_class->start = gst_tensor_sink_grpc_start;
187 23 : gstbasesink_class->stop = gst_tensor_sink_grpc_stop;
188 23 : gstbasesink_class->set_caps = gst_tensor_sink_grpc_setcaps;
189 23 : gstbasesink_class->render = gst_tensor_sink_grpc_render;
190 23 : gstbasesink_class->unlock = gst_tensor_sink_grpc_unlock;
191 :
192 23 : GST_DEBUG_CATEGORY_INIT (gst_tensor_sink_grpc_debug,
193 : "tensor_sink_grpc", 0,
194 : "sink element to support protocol buffers as a gRPC server/client");
195 23 : }
196 :
197 : /**
198 : * @brief initialize grpc config.
199 : */
200 : static void
201 16 : grpc_config_init (GstTensorSinkGRPC * self)
202 : {
203 16 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
204 :
205 16 : grpc->config.is_server = DEFAULT_PROP_SERVER;
206 16 : grpc->config.is_blocking = DEFAULT_PROP_BLOCKING;
207 16 : grpc->config.idl = grpc_get_idl (DEFAULT_PROP_IDL);
208 16 : grpc->config.dir = GRPC_DIRECTION_TENSORS_TO_BUFFER;
209 16 : grpc->config.port = DEFAULT_PROP_PORT;
210 16 : grpc->config.host = g_strdup (DEFAULT_PROP_HOST);
211 16 : grpc->config.config = &self->config;
212 16 : }
213 :
214 : /**
215 : * @brief initialize tensor_sink_grpc element.
216 : */
217 : static void
218 16 : gst_tensor_sink_grpc_init (GstTensorSinkGRPC * self)
219 : {
220 16 : gst_tensors_config_init (&self->config);
221 :
222 16 : self->silent = DEFAULT_PROP_SILENT;
223 16 : self->out = 0;
224 :
225 16 : self->priv = g_new0 (grpc_private, 1);
226 16 : grpc_config_init (self);
227 :
228 16 : GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_CONFIGURED);
229 16 : GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED);
230 16 : }
231 :
232 : /**
233 : * @brief finalize tensor_sink_grpc element.
234 : */
235 : static void
236 16 : gst_tensor_sink_grpc_finalize (GObject * gobject)
237 : {
238 16 : GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (gobject);
239 16 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
240 :
241 16 : g_free (grpc->config.host);
242 16 : g_free (grpc);
243 16 : gst_tensors_config_free (&self->config);
244 :
245 16 : G_OBJECT_CLASS (parent_class)->finalize (gobject);
246 16 : }
247 :
248 : /**
249 : * @brief set caps of tensor_sink_grpc element.
250 : */
251 : static gboolean
252 12 : gst_tensor_sink_grpc_setcaps (GstBaseSink * sink, GstCaps * caps)
253 : {
254 : GstTensorSinkGRPC *self;
255 : GstStructure *structure;
256 :
257 12 : self = GST_TENSOR_SINK_GRPC (sink);
258 :
259 12 : GST_OBJECT_LOCK (self);
260 :
261 12 : structure = gst_caps_get_structure (caps, 0);
262 12 : gst_tensors_config_from_structure (&self->config, structure);
263 :
264 12 : GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_CONFIGURED);
265 :
266 12 : GST_OBJECT_UNLOCK (self);
267 :
268 12 : return gst_tensors_config_validate (&self->config);
269 : }
270 :
271 : /**
272 : * @brief render function of tensor_sink_grpc element.
273 : */
274 : static GstFlowReturn
275 90 : gst_tensor_sink_grpc_render (GstBaseSink * sink, GstBuffer * buf)
276 : {
277 90 : GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
278 90 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
279 : gboolean ret;
280 :
281 90 : g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (self,
282 : GST_TENSOR_SINK_GRPC_STARTED), GST_FLOW_FLUSHING);
283 :
284 90 : ret = grpc_send (grpc->instance, buf);
285 :
286 90 : return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
287 : }
288 :
289 : /**
290 : * @brief set properties of tensor_sink_grpc element.
291 : */
292 : static void
293 61 : gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id,
294 : const GValue * value, GParamSpec * pspec)
295 : {
296 : GstTensorSinkGRPC *self;
297 : grpc_private *grpc;
298 :
299 61 : g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object));
300 :
301 61 : self = GST_TENSOR_SINK_GRPC (object);
302 61 : grpc = GET_GRPC_PRIVATE (self);
303 :
304 61 : grpc_common_set_property (object, &self->silent, grpc, prop_id, value, pspec);
305 : }
306 :
307 : /**
308 : * @brief get properties of tensor_sink_grpc element.
309 : */
310 : static void
311 12 : gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id,
312 : GValue * value, GParamSpec * pspec)
313 : {
314 : GstTensorSinkGRPC *self;
315 : grpc_private *grpc;
316 :
317 12 : g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object));
318 :
319 12 : self = GST_TENSOR_SINK_GRPC (object);
320 12 : grpc = GET_GRPC_PRIVATE (self);
321 :
322 12 : grpc_common_get_property (object, self->silent, self->out, grpc, prop_id, value, pspec);
323 : }
324 :
325 : /**
326 : * @brief start tensor_sink_grpc element.
327 : */
328 : static gboolean
329 12 : gst_tensor_sink_grpc_start (GstBaseSink * sink)
330 : {
331 12 : GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
332 12 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
333 : gboolean ret;
334 :
335 12 : if (GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED))
336 0 : return TRUE;
337 :
338 12 : if (grpc->instance)
339 0 : grpc_destroy (grpc->instance);
340 :
341 12 : grpc->instance = grpc_new (&grpc->config);
342 12 : if (!grpc->instance)
343 0 : return FALSE;
344 :
345 12 : ret = grpc_start (grpc->instance);
346 12 : if (ret) {
347 12 : GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_STARTED);
348 :
349 12 : if (grpc->config.is_server) {
350 4 : gint port = grpc_get_listening_port (grpc->instance);
351 4 : if (port > 0)
352 4 : g_object_set (self, "port", port, NULL);
353 : }
354 : }
355 :
356 12 : return TRUE;
357 : }
358 :
359 : /**
360 : * @brief stop tensor_sink_grpc element.
361 : */
362 : static gboolean
363 12 : gst_tensor_sink_grpc_stop (GstBaseSink * sink)
364 : {
365 12 : GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
366 12 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
367 :
368 12 : if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED))
369 0 : return TRUE;
370 :
371 12 : if (grpc->instance)
372 12 : grpc_destroy (grpc->instance);
373 12 : grpc->instance = NULL;
374 :
375 12 : GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED);
376 :
377 12 : return TRUE;
378 : }
379 :
380 : /**
381 : * @brief unlock any blocking operations
382 : */
383 : static gboolean
384 24 : gst_tensor_sink_grpc_unlock (GstBaseSink * sink)
385 : {
386 24 : GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
387 24 : grpc_private *grpc = GET_GRPC_PRIVATE (self);
388 :
389 : /* notify to gRPC */
390 24 : if (grpc->instance)
391 24 : grpc_stop (grpc->instance);
392 :
393 24 : silent_debug ("Unlocking create");
394 :
395 24 : return TRUE;
396 : }
|