LCOV - code coverage report
Current view: top level - nnstreamer-2.4.2/ext/nnstreamer/tensor_source - tensor_src_grpc.c (source / functions) Coverage Total Hit
Test: nnstreamer 2.4.2-0 nnstreamer/nnstreamer#eca68b8d050408568af95d831a8eef62aaee7784 Lines: 86.7 % 165 143
Test Date: 2025-03-13 05:38:21 Functions: 94.7 % 19 18

            Line data    Source code
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * GStreamer Tensor_Src_gRPC
       4              :  * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
       5              :  */
       6              : /**
       7              :  * @file    tensor_src_grpc.c
       8              :  * @date    20 Oct 2020
       9              :  * @brief   GStreamer plugin to support gRPC tensor source
      10              :  * @see     http://github.com/nnstreamer/nnstreamer
      11              :  * @author  Dongju Chae <dongju.chae@samsung.com>
      12              :  * @bug     No known bugs except for NYI items
      13              :  */
      14              : 
      15              : /**
      16              :  * SECTION:element-tensor_src_grpc
      17              :  *
      18              :  * #tensor_src_grpc extends #gstpushsrc source element to handle gRPC
      19              :  * requests as either server or client.
      20              :  *
      21              :  * <refsect2>
      22              :  * <title>Example launch line</title>
      23              :  * |[
      24              :  * gst-launch -v -m tensor_src_grpc ! 'other/tensor,dimension=(string)1:1:1:1,type=(string)uint8,framerate=(fraction)10/1' ! fakesink
      25              :  * ]|
      26              :  * </refsect2>
      27              :  */
      28              : 
      29              : #ifdef HAVE_CONFIG_H
      30              : #include <config.h>
      31              : #endif
      32              : 
      33              : #include <string.h>
      34              : #include <errno.h>
      35              : 
      36              : #include <gst/gst.h>
      37              : #include <glib.h>
      38              : 
      39              : #include <tensor_typedef.h>
      40              : #include <nnstreamer_plugin_api.h>
      41              : #include <nnstreamer_log.h>
      42              : 
      43              : #include "tensor_src_grpc.h"
      44              : #include "nnstreamer_grpc.h"
      45              : 
      46              : /**
      47              :  * @brief Macro for debug mode.
      48              :  */
      49              : #ifndef DBG
      50              : #define DBG (!self->silent)
      51              : #endif
      52              : 
      53              : /**
      54              :  * @brief Macro for debug message.
      55              :  */
      56              : #define silent_debug(...) do { \
      57              :     if (DBG) { \
      58              :       GST_DEBUG_OBJECT (self, __VA_ARGS__); \
      59              :     } \
      60              :   } while (0)
      61              : 
      62              : GST_DEBUG_CATEGORY_STATIC (gst_tensor_src_grpc_debug);
      63              : #define GST_CAT_DEFAULT gst_tensor_src_grpc_debug
      64              : 
      65              : /**
      66              :  * @brief Flag to print minimized log
      67              :  */
      68              : #define DEFAULT_PROP_SILENT TRUE
      69              : 
      70              : /**
      71              :  * @brief Default gRPC server mode for tensor source
      72              :  */
      73              : #define DEFAULT_PROP_SERVER TRUE
      74              : 
      75              : /**
      76              :  * @brief Default gRPC blocking mode for tensor source
      77              :  */
      78              : #define DEFAULT_PROP_BLOCKING TRUE
      79              : 
      80              : /**
      81              :  * @brief Default IDL for RPC comm.
      82              :  */
      83              : #define DEFAULT_PROP_IDL "protobuf"
      84              : 
      85              : /**
      86              :  * @brief Default host and port
      87              :  */
      88              : #define DEFAULT_PROP_HOST  "localhost"
      89              : #define DEFAULT_PROP_PORT  55115
      90              : 
      91              : #define GST_TENSOR_SRC_GRPC_SCALED_TIME(self, count)\
      92              :   gst_util_uint64_scale (count, \
      93              :       self->config.rate_d * GST_SECOND, self->config.rate_n)
      94              : 
      95              : #define CAPS_STRING GST_TENSOR_CAP_DEFAULT "; " GST_TENSORS_CAP_DEFAULT
      96              : 
      97              : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
      98              :     GST_PAD_SRC,
      99              :     GST_PAD_ALWAYS,
     100              :     GST_STATIC_CAPS (CAPS_STRING));
     101              : 
     102              : #define GET_GRPC_PRIVATE(arg) (grpc_private *) (arg->priv)
     103              : 
     104              : /** GObject method implementation */
     105              : static void gst_tensor_src_grpc_finalize (GObject * object);
     106              : static void gst_tensor_src_grpc_set_property (GObject * object, guint prop_id,
     107              :     const GValue * value, GParamSpec * pspec);
     108              : static void gst_tensor_src_grpc_get_property (GObject * object, guint prop_id,
     109              :     GValue * value, GParamSpec * pspec);
     110              : 
     111              : /** GstBaseSrc method implementation */
     112              : static gboolean gst_tensor_src_grpc_start (GstBaseSrc * src);
     113              : static gboolean gst_tensor_src_grpc_stop (GstBaseSrc * src);
     114              : static gboolean gst_tensor_src_grpc_set_caps (GstBaseSrc * src, GstCaps * caps);
     115              : static gboolean gst_tensor_src_grpc_unlock (GstBaseSrc * src);
     116              : static gboolean gst_tensor_src_grpc_unlock_stop (GstBaseSrc * src);
     117              : 
     118              : /** GstPushSrc method implementation */
     119              : static GstFlowReturn gst_tensor_src_grpc_create (GstPushSrc * psrc,
     120              :     GstBuffer ** buf);
     121              : 
     122              : /** internal functions */
     123              : #define gst_tensor_src_grpc_parent_class parent_class
     124          391 : G_DEFINE_TYPE (GstTensorSrcGRPC, gst_tensor_src_grpc, GST_TYPE_PUSH_SRC);
     125              : 
     126              : /**
     127              :  * @brief initialize the tensor_src_grpc class.
     128              :  */
     129              : static void
     130           27 : gst_tensor_src_grpc_class_init (GstTensorSrcGRPCClass * klass)
     131              : {
     132           27 :   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
     133           27 :   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
     134           27 :   GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
     135           27 :   GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
     136              : 
     137           27 :   gobject_class->set_property = gst_tensor_src_grpc_set_property;
     138           27 :   gobject_class->get_property = gst_tensor_src_grpc_get_property;
     139           27 :   gobject_class->finalize = gst_tensor_src_grpc_finalize;
     140              : 
     141              :   /* install properties */
     142           27 :   g_object_class_install_property (gobject_class, PROP_SILENT,
     143              :       g_param_spec_boolean ("silent", "Silent",
     144              :           "Dont' produce verbose output",
     145              :           DEFAULT_PROP_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     146              : 
     147           27 :   g_object_class_install_property (gobject_class, PROP_SERVER,
     148              :       g_param_spec_boolean ("server", "Server",
     149              :           "Specify its working mode either server or client",
     150              :           DEFAULT_PROP_SERVER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     151              : 
     152           27 :   g_object_class_install_property (gobject_class, PROP_BLOCKING,
     153              :       g_param_spec_boolean ("blocking", "Blocking",
     154              :           "Specify its working mode either blocking or non-blocking",
     155              :           DEFAULT_PROP_BLOCKING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     156              : 
     157           27 :   g_object_class_install_property (gobject_class, PROP_IDL,
     158              :       g_param_spec_string ("idl", "IDL",
     159              :           "Specify Interface Description Language (IDL) for communication",
     160              :           DEFAULT_PROP_IDL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     161              : 
     162           27 :   g_object_class_install_property (gobject_class, PROP_HOST,
     163              :       g_param_spec_string ("host", "Host",
     164              :           "The hostname to listen as or connect",
     165              :           DEFAULT_PROP_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     166              : 
     167           27 :   g_object_class_install_property (gobject_class, PROP_PORT,
     168              :       g_param_spec_int ("port", "Port",
     169              :           "The port to listen to (0=random available port) or connect",
     170              :           0, G_MAXUSHORT, DEFAULT_PROP_PORT,
     171              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     172              : 
     173           27 :   g_object_class_install_property (gobject_class, PROP_OUT,
     174              :       g_param_spec_uint ("out", "Out",
     175              :           "The number of output buffers generated",
     176              :           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
     177              : 
     178           27 :   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
     179              : 
     180           27 :   gst_element_class_set_static_metadata (gstelement_class,
     181              :       "TensorSrcGRPC", "Source/Network",
     182              :       "Receive nnstreamer protocol buffers as a gRPC server/client",
     183              :       "Dongju Chae <dongju.chae@samsung.com>");
     184              : 
     185              :   /* GstBasrSrcClass */
     186           27 :   gstbasesrc_class->start = gst_tensor_src_grpc_start;
     187           27 :   gstbasesrc_class->stop = gst_tensor_src_grpc_stop;
     188           27 :   gstbasesrc_class->set_caps = gst_tensor_src_grpc_set_caps;
     189           27 :   gstbasesrc_class->unlock = gst_tensor_src_grpc_unlock;
     190           27 :   gstbasesrc_class->unlock_stop = gst_tensor_src_grpc_unlock_stop;
     191              : 
     192              :   /* GstPushSrcClass */
     193           27 :   gstpushsrc_class->create = gst_tensor_src_grpc_create;
     194              : 
     195           27 :   GST_DEBUG_CATEGORY_INIT (gst_tensor_src_grpc_debug,
     196              :       "tensor_src_grpc", 0,
     197              :       "src element to support protocol buffers as a gRPC server/client");
     198           27 : }
     199              : 
     200              : /**
     201              :  * @brief callback for checking data_queue full
     202              :  */
     203              : static gboolean
     204          124 : _data_queue_check_full_cb (GstDataQueue * queue, guint visible,
     205              :     guint bytes, guint64 time, gpointer checkdata)
     206              : {
     207              :   /** it's dummy */
     208          124 :   return FALSE;
     209              : }
     210              : 
     211              : /**
     212              :  * @brief destroy callback for a data queue item
     213              :  */
     214              : static void
     215            4 : _data_queue_item_free (GstDataQueueItem * item)
     216              : {
     217            4 :   if (item->object)
     218            4 :     gst_buffer_unref (GST_BUFFER (item->object));
     219            4 :   g_free (item);
     220            4 : }
     221              : 
     222              : /**
     223              :  * @brief send eos event to downstream elements
     224              :  */
     225              : static void
     226            0 : _send_eos_event (GstTensorSrcGRPC * self)
     227              : {
     228            0 :   GstPad *srcpad = GST_BASE_SRC_PAD (&self->element);
     229            0 :   GstEvent *eos = gst_event_new_eos ();
     230            0 :   gst_pad_push_event (srcpad, eos);
     231            0 : }
     232              : 
     233              : /**
     234              :  * @brief callback function for gRPC requests
     235              :  */
     236              : static void
     237          124 : _grpc_callback (void *obj, void *data)
     238              : {
     239              :   GstTensorSrcGRPC *self;
     240              : 
     241              :   GstBuffer *buffer;
     242              :   GstClockTime duration;
     243              :   GstClockTime timestamp;
     244              :   GstDataQueueItem *item;
     245              : 
     246          124 :   g_return_if_fail (obj != NULL);
     247          124 :   g_return_if_fail (data != NULL);
     248              : 
     249          124 :   self = GST_TENSOR_SRC_GRPC_CAST (obj);
     250          124 :   buffer = (GstBuffer *) data;
     251              : 
     252          124 :   GST_OBJECT_LOCK (self);
     253              : 
     254          124 :   if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_STARTED) ||
     255          124 :       !GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_CONFIGURED)) {
     256            0 :     gst_buffer_unref (buffer);
     257              : 
     258            0 :     GST_OBJECT_UNLOCK (self);
     259            0 :     return;
     260              :   }
     261              : 
     262          124 :   if (self->config.rate_n != 0) {
     263          124 :     duration = GST_TENSOR_SRC_GRPC_SCALED_TIME (self, 1);
     264          124 :     timestamp = GST_TENSOR_SRC_GRPC_SCALED_TIME (self, self->out++);
     265              :   } else {
     266            0 :     duration = 0;
     267            0 :     timestamp = 0;
     268              :   }
     269              : 
     270          124 :   GST_BUFFER_DURATION (buffer) = duration;
     271          124 :   GST_BUFFER_PTS (buffer) = timestamp;
     272              : 
     273          124 :   item = g_new0 (GstDataQueueItem, 1);
     274          124 :   item->object = GST_MINI_OBJECT (buffer);
     275          124 :   item->size = gst_buffer_get_size (buffer);
     276          124 :   item->visible = TRUE;
     277          124 :   item->destroy = (GDestroyNotify) _data_queue_item_free;
     278              : 
     279          124 :   if (!gst_data_queue_push (self->queue, item)) {
     280            0 :     item->destroy (item);
     281            0 :     ml_logw ("Failed to push item because we're flushing");
     282              :   } else {
     283          124 :     silent_debug ("new buffer: timestamp %" GST_TIME_FORMAT " duration %"
     284              :         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
     285              :   }
     286              : 
     287          124 :   GST_OBJECT_UNLOCK (self);
     288              : 
     289              :   /* special case: framerate == (fraction)0/1 */
     290          124 :   if (duration == 0)
     291            0 :     _send_eos_event (self);
     292              : }
     293              : 
     294              : /**
     295              :  * @brief initialize grpc config.
     296              :  */
     297              : static void
     298           20 : grpc_config_init (GstTensorSrcGRPC * self)
     299              : {
     300           20 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     301              : 
     302           20 :   grpc->config.is_server = DEFAULT_PROP_SERVER;
     303           20 :   grpc->config.is_blocking = DEFAULT_PROP_BLOCKING;
     304           20 :   grpc->config.idl = grpc_get_idl (DEFAULT_PROP_IDL);
     305           20 :   grpc->config.dir = GRPC_DIRECTION_BUFFER_TO_TENSORS;
     306           20 :   grpc->config.port = DEFAULT_PROP_PORT;
     307           20 :   grpc->config.host = g_strdup (DEFAULT_PROP_HOST);
     308           20 :   grpc->config.cb = _grpc_callback;
     309           20 :   grpc->config.cb_data = (void *) self;
     310           20 :   grpc->config.config = &self->config;
     311           20 : }
     312              : 
     313              : /**
     314              :  * @brief initialize tensor_src_grpc element.
     315              :  */
     316              : static void
     317           20 : gst_tensor_src_grpc_init (GstTensorSrcGRPC * self)
     318              : {
     319           20 :   gst_tensors_config_init (&self->config);
     320              : 
     321           20 :   self->queue = gst_data_queue_new (_data_queue_check_full_cb,
     322              :       NULL, NULL, NULL);
     323           20 :   self->silent = DEFAULT_PROP_SILENT;
     324           20 :   self->out = 0;
     325              : 
     326           20 :   self->priv = g_new0 (grpc_private, 1);
     327           20 :   grpc_config_init (self);
     328              : 
     329           20 :   GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SRC_GRPC_CONFIGURED);
     330           20 :   GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SRC_GRPC_STARTED);
     331           20 : }
     332              : 
     333              : /**
     334              :  * @brief finalize tensor_src_grpc element.
     335              :  */
     336              : static void
     337           20 : gst_tensor_src_grpc_finalize (GObject * object)
     338              : {
     339           20 :   GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (object);
     340           20 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     341              : 
     342           20 :   g_free (grpc->config.host);
     343           20 :   g_free (grpc);
     344           20 :   g_clear_pointer (&self->queue, gst_object_unref);
     345           20 :   gst_tensors_config_free (&self->config);
     346              : 
     347           20 :   G_OBJECT_CLASS (parent_class)->finalize (object);
     348           20 : }
     349              : 
     350              : /**
     351              :  * @brief start function
     352              :  */
     353              : static gboolean
     354           16 : gst_tensor_src_grpc_start (GstBaseSrc * src)
     355              : {
     356           16 :   GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
     357           16 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     358              :   gboolean ret;
     359              : 
     360           16 :   if (grpc->instance)
     361            0 :     grpc_destroy (grpc->instance);
     362              : 
     363           16 :   grpc->instance = grpc_new (&grpc->config);
     364           16 :   if (!grpc->instance)
     365            0 :     return FALSE;
     366              : 
     367           16 :   ret = grpc_start (grpc->instance);
     368           16 :   if (ret) {
     369           16 :     GST_OBJECT_FLAG_SET (self, GST_TENSOR_SRC_GRPC_STARTED);
     370              : 
     371           16 :     if (grpc->config.is_server) {
     372            8 :       gint port = grpc_get_listening_port (grpc->instance);
     373            8 :       if (port > 0)
     374            8 :         g_object_set (self, "port", port, NULL);
     375              :     }
     376              :   }
     377              : 
     378           16 :   return ret;
     379              : }
     380              : 
     381              : /**
     382              :  * @brief stop function
     383              :  */
     384              : static gboolean
     385           16 : gst_tensor_src_grpc_stop (GstBaseSrc * src)
     386              : {
     387           16 :   GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
     388           16 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     389              : 
     390           16 :   if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_STARTED))
     391           16 :     return TRUE;
     392              : 
     393            0 :   _send_eos_event (self);
     394              : 
     395            0 :   if (grpc->instance)
     396            0 :     grpc_destroy (grpc->instance);
     397            0 :   grpc->instance = NULL;
     398              : 
     399            0 :   GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SRC_GRPC_STARTED);
     400              : 
     401            0 :   return TRUE;
     402              : }
     403              : 
     404              : /**
     405              :  * @brief unlock function, flush any pending data in the data queue
     406              :  */
     407              : static gboolean
     408           16 : gst_tensor_src_grpc_unlock (GstBaseSrc * src)
     409              : {
     410           16 :   GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
     411           16 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     412              : 
     413              :   /* notify to gRPC */
     414           16 :   if (grpc->instance)
     415           16 :     grpc_stop (grpc->instance);
     416              : 
     417           16 :   silent_debug ("Unlocking create");
     418           16 :   gst_data_queue_set_flushing (self->queue, TRUE);
     419              : 
     420           16 :   return TRUE;
     421              : }
     422              : 
     423              : /**
     424              :  * @brief unlock_stop function, clear the previous unlock request
     425              :  */
     426              : static gboolean
     427           16 : gst_tensor_src_grpc_unlock_stop (GstBaseSrc * src)
     428              : {
     429           16 :   GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC (src);
     430              : 
     431           16 :   silent_debug ("Stopping unlock");
     432           16 :   gst_data_queue_set_flushing (self->queue, FALSE);
     433              : 
     434           16 :   return TRUE;
     435              : }
     436              : 
     437              : /**
     438              :  * @brief set caps and configure tensor_src_grpc
     439              :  */
     440              : static gboolean
     441           16 : gst_tensor_src_grpc_set_caps (GstBaseSrc * src, GstCaps * caps)
     442              : {
     443              :   GstTensorSrcGRPC *self;
     444              :   GstStructure *structure;
     445              : 
     446           16 :   self = GST_TENSOR_SRC_GRPC (src);
     447              : 
     448           16 :   GST_OBJECT_LOCK (self);
     449              : 
     450           16 :   structure = gst_caps_get_structure (caps, 0);
     451           16 :   gst_tensors_config_from_structure (&self->config, structure);
     452              : 
     453           16 :   GST_OBJECT_FLAG_SET (self, GST_TENSOR_SRC_GRPC_CONFIGURED);
     454              : 
     455           16 :   GST_OBJECT_UNLOCK (self);
     456              : 
     457           16 :   return gst_tensors_config_validate (&self->config);
     458              : }
     459              : 
     460              : /**
     461              :  * @brief set a buffer which is a head item in data queue
     462              :  */
     463              : static GstFlowReturn
     464          120 : gst_tensor_src_grpc_create (GstPushSrc * src, GstBuffer ** buf)
     465              : {
     466          120 :   GstTensorSrcGRPC *self = GST_TENSOR_SRC_GRPC_CAST (src);
     467              :   GstDataQueueItem *item;
     468              : 
     469          120 :   if (!gst_data_queue_pop (self->queue, &item)) {
     470            0 :     silent_debug ("We're flushing");
     471          120 :     return GST_FLOW_FLUSHING;
     472              :   }
     473              : 
     474          120 :   *buf = GST_BUFFER (item->object);
     475          120 :   g_free (item);
     476              : 
     477          120 :   return GST_FLOW_OK;
     478              : }
     479              : 
     480              : /**
     481              :  * @brief set tensor_src_grpc properties
     482              :  */
     483              : static void
     484           81 : gst_tensor_src_grpc_set_property (GObject * object, guint prop_id,
     485              :     const GValue * value, GParamSpec * pspec)
     486              : {
     487              :   GstTensorSrcGRPC *self;
     488              :   grpc_private *grpc;
     489              : 
     490           81 :   g_return_if_fail (GST_IS_TENSOR_SRC_GRPC (object));
     491              : 
     492           81 :   self = GST_TENSOR_SRC_GRPC (object);
     493           81 :   grpc = GET_GRPC_PRIVATE (self);
     494              : 
     495           81 :   grpc_common_set_property (object, &self->silent, grpc, prop_id, value, pspec);
     496              : }
     497              : 
     498              : /**
     499              :  * @brief get tensor_src_grpc properties
     500              :  */
     501              : static void
     502           12 : gst_tensor_src_grpc_get_property (GObject * object, guint prop_id,
     503              :     GValue * value, GParamSpec * pspec)
     504              : {
     505              :   GstTensorSrcGRPC *self;
     506              :   grpc_private *grpc;
     507              : 
     508           12 :   g_return_if_fail (GST_IS_TENSOR_SRC_GRPC (object));
     509              : 
     510           12 :   self = GST_TENSOR_SRC_GRPC (object);
     511           12 :   grpc = GET_GRPC_PRIVATE (self);
     512              : 
     513           12 :   grpc_common_get_property (object, self->silent, self->out, grpc, prop_id,
     514              :       value, pspec);
     515              : }
        

Generated by: LCOV version 2.0-1