LCOV - code coverage report
Current view: top level - nnstreamer-2.4.2/ext/nnstreamer/extra - nnstreamer_grpc_common.cc (source / functions) Coverage Total Hit
Test: nnstreamer 2.4.2-0 nnstreamer/nnstreamer#eca68b8d050408568af95d831a8eef62aaee7784 Lines: 80.1 % 201 161
Test Date: 2025-03-14 05:36:58 Functions: 95.2 % 21 20

            Line data    Source code
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * GStreamer / NNStreamer gRPC support
       4              :  * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
       5              :  */
       6              : /**
       7              :  * @file    nnstreamer_grpc_common.cc
       8              :  * @date    21 Oct 2020
       9              :  * @brief   gRPC wrappers for nnstreamer
      10              :  * @see     https://github.com/nnstreamer/nnstreamer
      11              :  * @author  Dongju Chae <dongju.chae@samsung.com>
      12              :  * @bug     No known bugs except for NYI items
      13              :  */
      14              : 
      15              : #include "nnstreamer_grpc_common.h"
      16              : #include "nnstreamer_conf.h"
      17              : 
      18              : #include <gmodule.h>
      19              : 
      20              : #include <nnstreamer_log.h>
      21              : #include <nnstreamer_plugin_api.h>
      22              : 
      23              : #include <grpcpp/health_check_service_interface.h>
      24              : 
      25              : static constexpr const char *NNS_GRPC_PROTOBUF_NAME = "libnnstreamer_grpc_protobuf";
      26              : static constexpr const char *NNS_GRPC_FLATBUF_NAME = "libnnstreamer_grpc_flatbuf";
      27              : static constexpr const char *NNS_GRPC_CREATE_INSTANCE = "create_instance";
      28              : 
      29              : using namespace grpc;
      30              : 
      31              : /** @brief create new instance of NNStreamerRPC */
      32              : NNStreamerRPC *
      33           28 : NNStreamerRPC::createInstance (const grpc_config *config)
      34              : {
      35           28 :   gchar *name = NULL;
      36              : 
      37           28 :   if (config->idl == GRPC_IDL_PROTOBUF)
      38           14 :     name = g_strdup_printf ("%s%s", NNS_GRPC_PROTOBUF_NAME, NNSTREAMER_SO_FILE_EXTENSION);
      39           14 :   else if (config->idl == GRPC_IDL_FLATBUF)
      40           14 :     name = g_strdup_printf ("%s%s", NNS_GRPC_FLATBUF_NAME, NNSTREAMER_SO_FILE_EXTENSION);
      41              : 
      42           28 :   if (name == NULL) {
      43            0 :     ml_loge ("Unsupported IDL detected: %d\n", config->idl);
      44            0 :     return NULL;
      45              :   }
      46              : 
      47           28 :   GModule *module = g_module_open (name, G_MODULE_BIND_LAZY);
      48           28 :   if (!module) {
      49            0 :     ml_loge ("Error opening %s\n", name);
      50            0 :     g_free (name);
      51            0 :     return NULL;
      52              :   }
      53              : 
      54              :   using function_ptr = void *(*) (const grpc_config *config);
      55              :   function_ptr create_instance;
      56              : 
      57           28 :   if (!g_module_symbol (module, NNS_GRPC_CREATE_INSTANCE, (gpointer *) &create_instance)) {
      58            0 :     ml_loge ("Error loading create_instance: %s\n", g_module_error ());
      59            0 :     g_free (name);
      60            0 :     g_module_close (module);
      61            0 :     return NULL;
      62              :   }
      63              : 
      64           28 :   NNStreamerRPC *instance = (NNStreamerRPC *) create_instance (config);
      65           28 :   if (!instance) {
      66            0 :     ml_loge ("Error creating an instance\n");
      67            0 :     g_free (name);
      68            0 :     g_module_close (module);
      69            0 :     return NULL;
      70              :   }
      71              : 
      72           28 :   g_free (name);
      73           28 :   instance->setModuleHandle (module);
      74           28 :   return instance;
      75              : }
      76              : 
      77              : /** @brief constructor of NNStreamerRPC */
      78           28 : NNStreamerRPC::NNStreamerRPC (const grpc_config *config)
      79           28 :     : host_ (config->host), port_ (config->port), is_server_ (config->is_server),
      80           28 :       is_blocking_ (config->is_blocking), direction_ (config->dir),
      81           28 :       cb_ (config->cb), cb_data_ (config->cb_data), config_ (config->config),
      82           28 :       server_instance_ (nullptr), handle_ (nullptr), stop_ (false)
      83              : {
      84           28 :   queue_ = gst_data_queue_new (_data_queue_check_full_cb, NULL, NULL, NULL);
      85           28 : }
      86              : 
      87              : /** @brief destructor of NNStreamerRPC */
      88           12 : NNStreamerRPC::~NNStreamerRPC ()
      89              : {
      90           12 :   g_clear_pointer (&queue_, gst_object_unref);
      91           12 : }
      92              : 
      93              : /** @brief start gRPC server */
      94              : gboolean
      95           28 : NNStreamerRPC::start ()
      96              : {
      97           28 :   if (direction_ == GRPC_DIRECTION_NONE)
      98            0 :     return FALSE;
      99              : 
     100           28 :   if (is_server_)
     101           12 :     return _start_server ();
     102              :   else
     103           16 :     return _start_client ();
     104              : }
     105              : 
     106              : /** @brief stop the thread */
     107              : void
     108           40 : NNStreamerRPC::stop ()
     109              : {
     110           40 :   if (stop_)
     111           12 :     return;
     112              : 
     113              :   /* notify to the worker */
     114           28 :   stop_ = true;
     115              : 
     116           28 :   if (queue_) {
     117              :     /* wait until the queue's flushed */
     118           28 :     while (!gst_data_queue_is_empty (queue_))
     119            0 :       g_usleep (G_USEC_PER_SEC / 100);
     120              : 
     121           28 :     gst_data_queue_set_flushing (queue_, TRUE);
     122              :   }
     123              : 
     124           28 :   if (is_server_) {
     125           12 :     if (server_instance_.get ())
     126           12 :       server_instance_->Shutdown ();
     127              : 
     128           12 :     if (completion_queue_.get ())
     129            4 :       completion_queue_->Shutdown ();
     130              :   }
     131              : 
     132           28 :   if (worker_.joinable ())
     133           20 :     worker_.join ();
     134              : }
     135              : 
     136              : /** @brief send buffer holding tensors */
     137              : gboolean
     138           90 : NNStreamerRPC::send (GstBuffer *buffer)
     139              : {
     140              :   GstDataQueueItem *item;
     141              : 
     142           90 :   buffer = gst_buffer_ref (buffer);
     143              : 
     144           90 :   item = g_new0 (GstDataQueueItem, 1);
     145           90 :   item->object = GST_MINI_OBJECT (buffer);
     146           90 :   item->size = gst_buffer_get_size (buffer);
     147           90 :   item->visible = TRUE;
     148           90 :   item->destroy = (GDestroyNotify) _data_queue_item_free;
     149              : 
     150           90 :   if (!gst_data_queue_push (queue_, item)) {
     151            0 :     item->destroy (item);
     152            0 :     return FALSE;
     153              :   }
     154              : 
     155           90 :   return TRUE;
     156              : }
     157              : 
     158              : /** @brief start server service */
     159              : gboolean
     160           12 : NNStreamerRPC::_start_server ()
     161              : {
     162           12 :   std::string address (host_);
     163              : 
     164           12 :   address += ":" + std::to_string (port_);
     165              : 
     166           12 :   grpc::EnableDefaultHealthCheckService (true);
     167              : 
     168           24 :   return start_server (address);
     169           12 : }
     170              : 
     171              : /** @brief start client service */
     172              : gboolean
     173           16 : NNStreamerRPC::_start_client ()
     174              : {
     175           16 :   std::string address (host_);
     176              : 
     177           16 :   address += ":" + std::to_string (port_);
     178              : 
     179           32 :   return start_client (address);
     180           16 : }
     181              : 
     182              : /** @brief private method to check full  */
     183              : gboolean
     184           90 : NNStreamerRPC::_data_queue_check_full_cb (GstDataQueue *queue, guint visible,
     185              :     guint bytes, guint64 time, gpointer checkdata)
     186              : {
     187              :   /* no full */
     188           90 :   return FALSE;
     189              : }
     190              : 
     191              : /** @brief private method to free a data item */
     192              : void
     193           90 : NNStreamerRPC::_data_queue_item_free (GstDataQueueItem *item)
     194              : {
     195           90 :   if (item->object)
     196           90 :     gst_buffer_unref (GST_BUFFER (item->object));
     197           90 :   g_free (item);
     198           90 : }
     199              : 
     200              : /**
     201              :  * @brief get gRPC IDL enum from a given string
     202              :  */
     203              : grpc_idl
     204           64 : grpc_get_idl (const gchar *idl_str)
     205              : {
     206           64 :   if (g_ascii_strcasecmp (idl_str, "protobuf") == 0)
     207           50 :     return GRPC_IDL_PROTOBUF;
     208           14 :   else if (g_ascii_strcasecmp (idl_str, "flatbuf") == 0)
     209           14 :     return GRPC_IDL_FLATBUF;
     210              :   else
     211            0 :     return GRPC_IDL_NONE;
     212              : }
     213              : 
     214              : /**
     215              :  * @brief gRPC C++ wrapper to create the class instance
     216              :  */
     217              : void *
     218           28 : grpc_new (const grpc_config *config)
     219              : {
     220           28 :   g_return_val_if_fail (config != NULL, NULL);
     221              : 
     222           28 :   NNStreamerRPC *self = NNStreamerRPC::createInstance (config);
     223              : 
     224           28 :   return static_cast<void *> (self);
     225              : }
     226              : 
     227              : /**
     228              :  * @brief gRPC C++ wrapper to destroy the class instance
     229              :  */
     230              : void
     231           12 : grpc_destroy (void *instance)
     232              : {
     233           12 :   g_return_if_fail (instance != NULL);
     234              : 
     235           12 :   NNStreamerRPC *self = static_cast<NNStreamerRPC *> (instance);
     236           12 :   void *handle = self->getModuleHandle ();
     237              : 
     238           12 :   delete self;
     239              : 
     240           12 :   if (handle)
     241           12 :     g_module_close ((GModule *) handle);
     242              : }
     243              : 
     244              : /**
     245              :  * @brief gRPC C++ wrapper to start gRPC service
     246              :  */
     247              : gboolean
     248           28 : grpc_start (void *instance)
     249              : {
     250           28 :   g_return_val_if_fail (instance != NULL, FALSE);
     251              : 
     252           28 :   NNStreamerRPC *self = static_cast<NNStreamerRPC *> (instance);
     253              : 
     254           28 :   return self->start ();
     255              : }
     256              : 
     257              : /**
     258              :  * @brief gRPC C++ wrapper to stop service
     259              :  */
     260              : void
     261           40 : grpc_stop (void *instance)
     262              : {
     263           40 :   g_return_if_fail (instance != NULL);
     264              : 
     265           40 :   grpc::NNStreamerRPC *self = static_cast<grpc::NNStreamerRPC *> (instance);
     266              : 
     267           40 :   self->stop ();
     268              : }
     269              : 
     270              : /**
     271              :  * @brief gRPC C++ wrapper to send messages
     272              :  */
     273              : gboolean
     274           90 : grpc_send (void *instance, GstBuffer *buffer)
     275              : {
     276           90 :   g_return_val_if_fail (instance != NULL, FALSE);
     277              : 
     278           90 :   grpc::NNStreamerRPC *self = static_cast<grpc::NNStreamerRPC *> (instance);
     279              : 
     280           90 :   return self->send (buffer);
     281              : }
     282              : 
     283              : /**
     284              :  * @brief get gRPC listening port of the server instance
     285              :  */
     286              : int
     287           12 : grpc_get_listening_port (void *instance)
     288              : {
     289           12 :   g_return_val_if_fail (instance != NULL, -EINVAL);
     290              : 
     291           12 :   NNStreamerRPC *self = static_cast<NNStreamerRPC *> (instance);
     292              : 
     293           12 :   return self->getListeningPort ();
     294              : }
     295              : 
     296              : #define silent_debug(...)                   \
     297              :   do {                                      \
     298              :     if (*silent) {                          \
     299              :       GST_DEBUG_OBJECT (self, __VA_ARGS__); \
     300              :     }                                       \
     301              :   } while (0)
     302              : 
     303              : /**
     304              :  * @brief check the validity of hostname string
     305              :  */
     306              : gboolean
     307           12 : _check_hostname (gchar *str)
     308              : {
     309           12 :   if (g_strcmp0 (str, "localhost") == 0 || g_hostname_is_ip_address (str))
     310           10 :     return TRUE;
     311              : 
     312            2 :   return FALSE;
     313              : }
     314              : 
     315              : /**
     316              :  * @brief set-prop common for both grpc elements
     317              :  */
     318              : void
     319          142 : grpc_common_set_property (GObject *self, gboolean *silent, grpc_private *grpc,
     320              :     guint prop_id, const GValue *value, GParamSpec *pspec)
     321              : {
     322          142 :   switch (prop_id) {
     323            2 :     case PROP_SILENT:
     324            2 :       *silent = g_value_get_boolean (value);
     325            2 :       silent_debug ("Set silent = %d", *silent);
     326            2 :       break;
     327           22 :     case PROP_SERVER:
     328           22 :       grpc->config.is_server = g_value_get_boolean (value);
     329           22 :       silent_debug ("Set server = %d", grpc->config.is_server);
     330           22 :       break;
     331           28 :     case PROP_BLOCKING:
     332           28 :       grpc->config.is_blocking = g_value_get_boolean (value);
     333           28 :       silent_debug ("Set blocking = %d", grpc->config.is_blocking);
     334           28 :       break;
     335           28 :     case PROP_IDL:
     336              :       {
     337           28 :         const gchar *idl_str = g_value_get_string (value);
     338              : 
     339           28 :         if (idl_str) {
     340           28 :           grpc_idl idl = grpc_get_idl (idl_str);
     341           28 :           if (idl != GRPC_IDL_NONE) {
     342           28 :             grpc->config.idl = idl;
     343           28 :             silent_debug ("Set idl = %s", idl_str);
     344              :           } else {
     345            0 :             ml_loge ("Invalid IDL string provided: %s", idl_str);
     346              :           }
     347              :         }
     348           28 :         break;
     349              :       }
     350           12 :     case PROP_HOST:
     351              :       {
     352              :         gchar *host;
     353              : 
     354           12 :         if (!g_value_get_string (value))
     355            0 :           break;
     356              : 
     357           12 :         host = g_value_dup_string (value);
     358           12 :         if (_check_hostname (host)) {
     359           10 :           g_free (grpc->config.host);
     360           10 :           grpc->config.host = host;
     361           10 :           silent_debug ("Set host = %s", grpc->config.host);
     362              :         } else {
     363            2 :           g_free (host);
     364              :         }
     365           12 :         break;
     366              :       }
     367           50 :     case PROP_PORT:
     368           50 :       grpc->config.port = g_value_get_int (value);
     369           50 :       silent_debug ("Set port = %d", grpc->config.port);
     370           50 :       break;
     371            0 :     default:
     372            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
     373            0 :       break;
     374              :   }
     375          142 : }
     376              : 
     377              : /**
     378              :  * @brief get-prop common for both grpc elements
     379              :  */
     380              : void
     381           24 : grpc_common_get_property (GObject *self, gboolean silent, guint out,
     382              :     grpc_private *grpc, guint prop_id, GValue *value, GParamSpec *pspec)
     383              : {
     384           24 :   switch (prop_id) {
     385            4 :     case PROP_SILENT:
     386            4 :       g_value_set_boolean (value, silent);
     387            4 :       break;
     388            4 :     case PROP_SERVER:
     389            4 :       g_value_set_boolean (value, grpc->config.is_server);
     390            4 :       break;
     391            0 :     case PROP_BLOCKING:
     392            0 :       g_value_set_boolean (value, grpc->config.is_blocking);
     393            0 :       break;
     394            0 :     case PROP_IDL:
     395            0 :       switch (grpc->config.idl) {
     396            0 :         case GRPC_IDL_PROTOBUF:
     397            0 :           g_value_set_string (value, "protobuf");
     398            0 :           break;
     399            0 :         case GRPC_IDL_FLATBUF:
     400            0 :           g_value_set_string (value, "flatbuf");
     401            0 :           break;
     402            0 :         default:
     403            0 :           break;
     404              :       }
     405            0 :       break;
     406            6 :     case PROP_HOST:
     407            6 :       g_value_set_string (value, grpc->config.host);
     408            6 :       break;
     409            8 :     case PROP_PORT:
     410            8 :       g_value_set_int (value, grpc->config.port);
     411            8 :       break;
     412            2 :     case PROP_OUT:
     413            2 :       g_value_set_uint (value, out);
     414            2 :       break;
     415            0 :     default:
     416            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
     417            0 :       break;
     418              :   }
     419           24 : }
        

Generated by: LCOV version 2.0-1