Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * GStreamer / NNStreamer gRPC support
4 : * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
5 : */
6 : /**
7 : * @file nnstreamer_grpc_common.cc
8 : * @date 21 Oct 2020
9 : * @brief gRPC wrappers for nnstreamer
10 : * @see https://github.com/nnstreamer/nnstreamer
11 : * @author Dongju Chae <dongju.chae@samsung.com>
12 : * @bug No known bugs except for NYI items
13 : */
14 :
15 : #include "nnstreamer_grpc_common.h"
16 : #include "nnstreamer_conf.h"
17 :
18 : #include <gmodule.h>
19 :
20 : #include <nnstreamer_log.h>
21 : #include <nnstreamer_plugin_api.h>
22 :
23 : #include <grpcpp/health_check_service_interface.h>
24 :
25 : static constexpr const char *NNS_GRPC_PROTOBUF_NAME = "libnnstreamer_grpc_protobuf";
26 : static constexpr const char *NNS_GRPC_FLATBUF_NAME = "libnnstreamer_grpc_flatbuf";
27 : static constexpr const char *NNS_GRPC_CREATE_INSTANCE = "create_instance";
28 :
29 : using namespace grpc;
30 :
31 : /** @brief create new instance of NNStreamerRPC */
32 : NNStreamerRPC *
33 28 : NNStreamerRPC::createInstance (const grpc_config *config)
34 : {
35 28 : gchar *name = NULL;
36 :
37 28 : if (config->idl == GRPC_IDL_PROTOBUF)
38 14 : name = g_strdup_printf ("%s%s", NNS_GRPC_PROTOBUF_NAME, NNSTREAMER_SO_FILE_EXTENSION);
39 14 : else if (config->idl == GRPC_IDL_FLATBUF)
40 14 : name = g_strdup_printf ("%s%s", NNS_GRPC_FLATBUF_NAME, NNSTREAMER_SO_FILE_EXTENSION);
41 :
42 28 : if (name == NULL) {
43 0 : ml_loge ("Unsupported IDL detected: %d\n", config->idl);
44 0 : return NULL;
45 : }
46 :
47 28 : GModule *module = g_module_open (name, G_MODULE_BIND_LAZY);
48 28 : if (!module) {
49 0 : ml_loge ("Error opening %s\n", name);
50 0 : g_free (name);
51 0 : return NULL;
52 : }
53 :
54 : using function_ptr = void *(*) (const grpc_config *config);
55 : function_ptr create_instance;
56 :
57 28 : if (!g_module_symbol (module, NNS_GRPC_CREATE_INSTANCE, (gpointer *) &create_instance)) {
58 0 : ml_loge ("Error loading create_instance: %s\n", g_module_error ());
59 0 : g_free (name);
60 0 : g_module_close (module);
61 0 : return NULL;
62 : }
63 :
64 28 : NNStreamerRPC *instance = (NNStreamerRPC *) create_instance (config);
65 28 : if (!instance) {
66 0 : ml_loge ("Error creating an instance\n");
67 0 : g_free (name);
68 0 : g_module_close (module);
69 0 : return NULL;
70 : }
71 :
72 28 : g_free (name);
73 28 : instance->setModuleHandle (module);
74 28 : return instance;
75 : }
76 :
77 : /** @brief constructor of NNStreamerRPC */
78 28 : NNStreamerRPC::NNStreamerRPC (const grpc_config *config)
79 28 : : host_ (config->host), port_ (config->port), is_server_ (config->is_server),
80 28 : is_blocking_ (config->is_blocking), direction_ (config->dir),
81 28 : cb_ (config->cb), cb_data_ (config->cb_data), config_ (config->config),
82 28 : server_instance_ (nullptr), handle_ (nullptr), stop_ (false)
83 : {
84 28 : queue_ = gst_data_queue_new (_data_queue_check_full_cb, NULL, NULL, NULL);
85 28 : }
86 :
87 : /** @brief destructor of NNStreamerRPC */
88 12 : NNStreamerRPC::~NNStreamerRPC ()
89 : {
90 12 : g_clear_pointer (&queue_, gst_object_unref);
91 12 : }
92 :
93 : /** @brief start gRPC server */
94 : gboolean
95 28 : NNStreamerRPC::start ()
96 : {
97 28 : if (direction_ == GRPC_DIRECTION_NONE)
98 0 : return FALSE;
99 :
100 28 : if (is_server_)
101 12 : return _start_server ();
102 : else
103 16 : return _start_client ();
104 : }
105 :
106 : /** @brief stop the thread */
107 : void
108 40 : NNStreamerRPC::stop ()
109 : {
110 40 : if (stop_)
111 12 : return;
112 :
113 : /* notify to the worker */
114 28 : stop_ = true;
115 :
116 28 : if (queue_) {
117 : /* wait until the queue's flushed */
118 28 : while (!gst_data_queue_is_empty (queue_))
119 0 : g_usleep (G_USEC_PER_SEC / 100);
120 :
121 28 : gst_data_queue_set_flushing (queue_, TRUE);
122 : }
123 :
124 28 : if (is_server_) {
125 12 : if (server_instance_.get ())
126 12 : server_instance_->Shutdown ();
127 :
128 12 : if (completion_queue_.get ())
129 4 : completion_queue_->Shutdown ();
130 : }
131 :
132 28 : if (worker_.joinable ())
133 20 : worker_.join ();
134 : }
135 :
136 : /** @brief send buffer holding tensors */
137 : gboolean
138 90 : NNStreamerRPC::send (GstBuffer *buffer)
139 : {
140 : GstDataQueueItem *item;
141 :
142 90 : buffer = gst_buffer_ref (buffer);
143 :
144 90 : item = g_new0 (GstDataQueueItem, 1);
145 90 : item->object = GST_MINI_OBJECT (buffer);
146 90 : item->size = gst_buffer_get_size (buffer);
147 90 : item->visible = TRUE;
148 90 : item->destroy = (GDestroyNotify) _data_queue_item_free;
149 :
150 90 : if (!gst_data_queue_push (queue_, item)) {
151 0 : item->destroy (item);
152 0 : return FALSE;
153 : }
154 :
155 90 : return TRUE;
156 : }
157 :
158 : /** @brief start server service */
159 : gboolean
160 12 : NNStreamerRPC::_start_server ()
161 : {
162 12 : std::string address (host_);
163 :
164 12 : address += ":" + std::to_string (port_);
165 :
166 12 : grpc::EnableDefaultHealthCheckService (true);
167 :
168 24 : return start_server (address);
169 12 : }
170 :
171 : /** @brief start client service */
172 : gboolean
173 16 : NNStreamerRPC::_start_client ()
174 : {
175 16 : std::string address (host_);
176 :
177 16 : address += ":" + std::to_string (port_);
178 :
179 32 : return start_client (address);
180 16 : }
181 :
182 : /** @brief private method to check full */
183 : gboolean
184 90 : NNStreamerRPC::_data_queue_check_full_cb (GstDataQueue *queue, guint visible,
185 : guint bytes, guint64 time, gpointer checkdata)
186 : {
187 : /* no full */
188 90 : return FALSE;
189 : }
190 :
191 : /** @brief private method to free a data item */
192 : void
193 90 : NNStreamerRPC::_data_queue_item_free (GstDataQueueItem *item)
194 : {
195 90 : if (item->object)
196 90 : gst_buffer_unref (GST_BUFFER (item->object));
197 90 : g_free (item);
198 90 : }
199 :
200 : /**
201 : * @brief get gRPC IDL enum from a given string
202 : */
203 : grpc_idl
204 64 : grpc_get_idl (const gchar *idl_str)
205 : {
206 64 : if (g_ascii_strcasecmp (idl_str, "protobuf") == 0)
207 50 : return GRPC_IDL_PROTOBUF;
208 14 : else if (g_ascii_strcasecmp (idl_str, "flatbuf") == 0)
209 14 : return GRPC_IDL_FLATBUF;
210 : else
211 0 : return GRPC_IDL_NONE;
212 : }
213 :
214 : /**
215 : * @brief gRPC C++ wrapper to create the class instance
216 : */
217 : void *
218 28 : grpc_new (const grpc_config *config)
219 : {
220 28 : g_return_val_if_fail (config != NULL, NULL);
221 :
222 28 : NNStreamerRPC *self = NNStreamerRPC::createInstance (config);
223 :
224 28 : return static_cast<void *> (self);
225 : }
226 :
227 : /**
228 : * @brief gRPC C++ wrapper to destroy the class instance
229 : */
230 : void
231 12 : grpc_destroy (void *instance)
232 : {
233 12 : g_return_if_fail (instance != NULL);
234 :
235 12 : NNStreamerRPC *self = static_cast<NNStreamerRPC *> (instance);
236 12 : void *handle = self->getModuleHandle ();
237 :
238 12 : delete self;
239 :
240 12 : if (handle)
241 12 : g_module_close ((GModule *) handle);
242 : }
243 :
244 : /**
245 : * @brief gRPC C++ wrapper to start gRPC service
246 : */
247 : gboolean
248 28 : grpc_start (void *instance)
249 : {
250 28 : g_return_val_if_fail (instance != NULL, FALSE);
251 :
252 28 : NNStreamerRPC *self = static_cast<NNStreamerRPC *> (instance);
253 :
254 28 : return self->start ();
255 : }
256 :
257 : /**
258 : * @brief gRPC C++ wrapper to stop service
259 : */
260 : void
261 40 : grpc_stop (void *instance)
262 : {
263 40 : g_return_if_fail (instance != NULL);
264 :
265 40 : grpc::NNStreamerRPC *self = static_cast<grpc::NNStreamerRPC *> (instance);
266 :
267 40 : self->stop ();
268 : }
269 :
270 : /**
271 : * @brief gRPC C++ wrapper to send messages
272 : */
273 : gboolean
274 90 : grpc_send (void *instance, GstBuffer *buffer)
275 : {
276 90 : g_return_val_if_fail (instance != NULL, FALSE);
277 :
278 90 : grpc::NNStreamerRPC *self = static_cast<grpc::NNStreamerRPC *> (instance);
279 :
280 90 : return self->send (buffer);
281 : }
282 :
283 : /**
284 : * @brief get gRPC listening port of the server instance
285 : */
286 : int
287 12 : grpc_get_listening_port (void *instance)
288 : {
289 12 : g_return_val_if_fail (instance != NULL, -EINVAL);
290 :
291 12 : NNStreamerRPC *self = static_cast<NNStreamerRPC *> (instance);
292 :
293 12 : return self->getListeningPort ();
294 : }
295 :
296 : #define silent_debug(...) \
297 : do { \
298 : if (*silent) { \
299 : GST_DEBUG_OBJECT (self, __VA_ARGS__); \
300 : } \
301 : } while (0)
302 :
303 : /**
304 : * @brief check the validity of hostname string
305 : */
306 : gboolean
307 12 : _check_hostname (gchar *str)
308 : {
309 12 : if (g_strcmp0 (str, "localhost") == 0 || g_hostname_is_ip_address (str))
310 10 : return TRUE;
311 :
312 2 : return FALSE;
313 : }
314 :
315 : /**
316 : * @brief set-prop common for both grpc elements
317 : */
318 : void
319 142 : grpc_common_set_property (GObject *self, gboolean *silent, grpc_private *grpc,
320 : guint prop_id, const GValue *value, GParamSpec *pspec)
321 : {
322 142 : switch (prop_id) {
323 2 : case PROP_SILENT:
324 2 : *silent = g_value_get_boolean (value);
325 2 : silent_debug ("Set silent = %d", *silent);
326 2 : break;
327 22 : case PROP_SERVER:
328 22 : grpc->config.is_server = g_value_get_boolean (value);
329 22 : silent_debug ("Set server = %d", grpc->config.is_server);
330 22 : break;
331 28 : case PROP_BLOCKING:
332 28 : grpc->config.is_blocking = g_value_get_boolean (value);
333 28 : silent_debug ("Set blocking = %d", grpc->config.is_blocking);
334 28 : break;
335 28 : case PROP_IDL:
336 : {
337 28 : const gchar *idl_str = g_value_get_string (value);
338 :
339 28 : if (idl_str) {
340 28 : grpc_idl idl = grpc_get_idl (idl_str);
341 28 : if (idl != GRPC_IDL_NONE) {
342 28 : grpc->config.idl = idl;
343 28 : silent_debug ("Set idl = %s", idl_str);
344 : } else {
345 0 : ml_loge ("Invalid IDL string provided: %s", idl_str);
346 : }
347 : }
348 28 : break;
349 : }
350 12 : case PROP_HOST:
351 : {
352 : gchar *host;
353 :
354 12 : if (!g_value_get_string (value))
355 0 : break;
356 :
357 12 : host = g_value_dup_string (value);
358 12 : if (_check_hostname (host)) {
359 10 : g_free (grpc->config.host);
360 10 : grpc->config.host = host;
361 10 : silent_debug ("Set host = %s", grpc->config.host);
362 : } else {
363 2 : g_free (host);
364 : }
365 12 : break;
366 : }
367 50 : case PROP_PORT:
368 50 : grpc->config.port = g_value_get_int (value);
369 50 : silent_debug ("Set port = %d", grpc->config.port);
370 50 : break;
371 0 : default:
372 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
373 0 : break;
374 : }
375 142 : }
376 :
377 : /**
378 : * @brief get-prop common for both grpc elements
379 : */
380 : void
381 24 : grpc_common_get_property (GObject *self, gboolean silent, guint out,
382 : grpc_private *grpc, guint prop_id, GValue *value, GParamSpec *pspec)
383 : {
384 24 : switch (prop_id) {
385 4 : case PROP_SILENT:
386 4 : g_value_set_boolean (value, silent);
387 4 : break;
388 4 : case PROP_SERVER:
389 4 : g_value_set_boolean (value, grpc->config.is_server);
390 4 : break;
391 0 : case PROP_BLOCKING:
392 0 : g_value_set_boolean (value, grpc->config.is_blocking);
393 0 : break;
394 0 : case PROP_IDL:
395 0 : switch (grpc->config.idl) {
396 0 : case GRPC_IDL_PROTOBUF:
397 0 : g_value_set_string (value, "protobuf");
398 0 : break;
399 0 : case GRPC_IDL_FLATBUF:
400 0 : g_value_set_string (value, "flatbuf");
401 0 : break;
402 0 : default:
403 0 : break;
404 : }
405 0 : break;
406 6 : case PROP_HOST:
407 6 : g_value_set_string (value, grpc->config.host);
408 6 : break;
409 8 : case PROP_PORT:
410 8 : g_value_set_int (value, grpc->config.port);
411 8 : break;
412 2 : case PROP_OUT:
413 2 : g_value_set_uint (value, out);
414 2 : break;
415 0 : default:
416 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
417 0 : break;
418 : }
419 24 : }
|