|             Line data    Source code 
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * GStreamer Tensor_Sink_gRPC
       4              :  * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
       5              :  */
       6              : /**
       7              :  * @file        tensor_sink_grpc.c
       8              :  * @date        22 Oct 2020
       9              :  * @brief       GStreamer plugin to support gRPC tensor sink
      10              :  * @see         http://github.com/nnstreamer/nnstreamer
      11              :  * @author      Dongju Chae <dongju.chae@samsung.com>
      12              :  * @bug         No known bugs except for NYI items
      13              :  */
      14              : 
      15              : /**
      16              :  * SECTION:element-tensor_sink_grpc
      17              :  *
      18              :  * #tensor_sink_grpc extends #gstbasesink sink element to emit gRPC
      19              :  * messages as either server or client.
      20              :  *
      21              :  * <refsect2>
      22              :  * <title>Example launch line</title>
      23              :  * |[
      24              :  * gst-launch -v -m videotestsrc
      25              :  *        ! video/x-raw,format=RGB,width=640,height=480,framerate=30/1
      26              :  *        ! tensor_converter ! tensor_sink_grpc
      27              :  * ]|
      28              :  * </refsect2>
      29              :  */
      30              : 
      31              : #ifdef HAVE_CONFIG_H
      32              : #include <config.h>
      33              : #endif
      34              : 
      35              : #include <string.h>
      36              : #include <errno.h>
      37              : 
      38              : #include <gst/gst.h>
      39              : #include <glib.h>
      40              : #include <gmodule.h>
      41              : 
      42              : #include <tensor_typedef.h>
      43              : #include <nnstreamer_plugin_api.h>
      44              : 
      45              : #include "tensor_sink_grpc.h"
      46              : #include "nnstreamer_grpc.h"
      47              : 
      48              : /**
      49              :  * @brief Macro for debug mode.
      50              :  */
      51              : #ifndef DBG
      52              : #define DBG (!self->silent)
      53              : #endif
      54              : 
      55              : /**
      56              :  * @brief Macro for debug message.
      57              :  */
      58              : #define silent_debug(...) do { \
      59              :     if (DBG) { \
      60              :       GST_DEBUG_OBJECT (self, __VA_ARGS__); \
      61              :     } \
      62              :   } while (0)
      63              : 
      64              : GST_DEBUG_CATEGORY_STATIC (gst_tensor_sink_grpc_debug);
      65              : #define GST_CAT_DEFAULT gst_tensor_sink_grpc_debug
      66              : 
      67              : /**
      68              :  * @brief Flag to print minimized log
      69              :  */
      70              : #define DEFAULT_PROP_SILENT TRUE
      71              : 
      72              : /**
      73              :  * @brief Default gRPC server mode for tensor sink
      74              :  */
      75              : #define DEFAULT_PROP_SERVER FALSE
      76              : 
      77              : /**
      78              :  * @brief Default gRPC blocking mode for tensor sink
      79              :  */
      80              : #define DEFAULT_PROP_BLOCKING TRUE
      81              : 
      82              : /**
      83              :  * @brief Default IDL for RPC comm.
      84              :  */
      85              : #define DEFAULT_PROP_IDL "protobuf"
      86              : 
      87              : /**
      88              :  * @brief Default host and port
      89              :  */
      90              : #define DEFAULT_PROP_HOST  "localhost"
      91              : #define DEFAULT_PROP_PORT  55115
      92              : 
      93              : #define CAPS_STRING GST_TENSOR_CAP_DEFAULT "; " GST_TENSORS_CAP_DEFAULT
      94              : 
      95              : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
      96              :     GST_PAD_SINK,
      97              :     GST_PAD_ALWAYS,
      98              :     GST_STATIC_CAPS (CAPS_STRING));
      99              : 
     100              : #define GET_GRPC_PRIVATE(arg) (grpc_private *) (arg->priv)
     101              : 
     102              : /** GObject method implementation */
     103              : static void gst_tensor_sink_grpc_finalize (GObject * gobject);
     104              : static void gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id,
     105              :     const GValue * value, GParamSpec * pspec);
     106              : static void gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id,
     107              :     GValue * value, GParamSpec * pspec);
     108              : 
     109              : /** GstBaseSink method implementation */
     110              : static gboolean gst_tensor_sink_grpc_setcaps (GstBaseSink * sink,
     111              :     GstCaps * caps);
     112              : static GstFlowReturn gst_tensor_sink_grpc_render (GstBaseSink * sink,
     113              :     GstBuffer * buf);
     114              : static gboolean gst_tensor_sink_grpc_start (GstBaseSink * sink);
     115              : static gboolean gst_tensor_sink_grpc_stop (GstBaseSink * sink);
     116              : 
     117              : static gboolean gst_tensor_sink_grpc_unlock (GstBaseSink * sink);
     118              : 
     119              : /** internal functions */
     120              : #define gst_tensor_sink_grpc_parent_class parent_class
     121          455 : G_DEFINE_TYPE (GstTensorSinkGRPC, gst_tensor_sink_grpc, GST_TYPE_BASE_SINK);
     122              : 
     123              : /**
     124              :  * @brief initialize the tensor_sink_grpc class.
     125              :  */
     126              : static void
     127           37 : gst_tensor_sink_grpc_class_init (GstTensorSinkGRPCClass * klass)
     128              : {
     129              :   GObjectClass *gobject_class;
     130              :   GstElementClass *gstelement_class;
     131              :   GstBaseSinkClass *gstbasesink_class;
     132              : 
     133           37 :   gobject_class = (GObjectClass *) klass;
     134           37 :   gstelement_class = (GstElementClass *) klass;
     135           37 :   gstbasesink_class = (GstBaseSinkClass *) klass;
     136              : 
     137           37 :   parent_class = g_type_class_peek_parent (klass);
     138              : 
     139           37 :   gobject_class->set_property = gst_tensor_sink_grpc_set_property;
     140           37 :   gobject_class->get_property = gst_tensor_sink_grpc_get_property;
     141           37 :   gobject_class->finalize = gst_tensor_sink_grpc_finalize;
     142              : 
     143              :   /* install properties */
     144           37 :   g_object_class_install_property (gobject_class, PROP_SILENT,
     145              :       g_param_spec_boolean ("silent", "Silent",
     146              :           "Dont' produce verbose output",
     147              :           DEFAULT_PROP_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     148              : 
     149           37 :   g_object_class_install_property (gobject_class, PROP_SERVER,
     150              :       g_param_spec_boolean ("server", "Server",
     151              :           "Specify its working mode either server or client",
     152              :           DEFAULT_PROP_SERVER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     153              : 
     154           37 :   g_object_class_install_property (gobject_class, PROP_BLOCKING,
     155              :       g_param_spec_boolean ("blocking", "Blocking",
     156              :           "Specify its working mode either blocking or non-blocking",
     157              :           DEFAULT_PROP_BLOCKING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     158              : 
     159           37 :   g_object_class_install_property (gobject_class, PROP_IDL,
     160              :       g_param_spec_string ("idl", "IDL",
     161              :           "Specify Interface Description Language (IDL) for communication",
     162              :           DEFAULT_PROP_IDL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     163              : 
     164           37 :   g_object_class_install_property (gobject_class, PROP_HOST,
     165              :       g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
     166              :           DEFAULT_PROP_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     167              : 
     168           37 :   g_object_class_install_property (gobject_class, PROP_PORT,
     169              :       g_param_spec_int ("port", "Port", "The port to send the packets to",
     170              :           0, G_MAXUSHORT, DEFAULT_PROP_PORT,
     171              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     172              : 
     173           37 :   g_object_class_install_property (gobject_class, PROP_OUT,
     174              :       g_param_spec_uint ("out", "Out",
     175              :           "The number of output messages generated",
     176              :           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
     177              : 
     178           37 :   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
     179              : 
     180           37 :   gst_element_class_set_static_metadata (gstelement_class,
     181              :       "TensorSinkGRPC", "Sink/Network",
     182              :       "Send nnstreamer protocol buffers as gRPC server/client",
     183              :       "Dongju Chae <dongju.chae@samsung.com>");
     184              : 
     185              :   /* GstBaseSinkClass */
     186           37 :   gstbasesink_class->start = gst_tensor_sink_grpc_start;
     187           37 :   gstbasesink_class->stop = gst_tensor_sink_grpc_stop;
     188           37 :   gstbasesink_class->set_caps = gst_tensor_sink_grpc_setcaps;
     189           37 :   gstbasesink_class->render = gst_tensor_sink_grpc_render;
     190           37 :   gstbasesink_class->unlock = gst_tensor_sink_grpc_unlock;
     191              : 
     192           37 :   GST_DEBUG_CATEGORY_INIT (gst_tensor_sink_grpc_debug,
     193              :       "tensor_sink_grpc", 0,
     194              :       "sink element to support protocol buffers as a gRPC server/client");
     195           37 : }
     196              : 
     197              : /**
     198              :  * @brief initialize grpc config.
     199              :  */
     200              : static void
     201           16 : grpc_config_init (GstTensorSinkGRPC * self)
     202              : {
     203           16 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     204              : 
     205           16 :   grpc->config.is_server = DEFAULT_PROP_SERVER;
     206           16 :   grpc->config.is_blocking = DEFAULT_PROP_BLOCKING;
     207           16 :   grpc->config.idl = grpc_get_idl (DEFAULT_PROP_IDL);
     208           16 :   grpc->config.dir = GRPC_DIRECTION_TENSORS_TO_BUFFER;
     209           16 :   grpc->config.port = DEFAULT_PROP_PORT;
     210           16 :   grpc->config.host = g_strdup (DEFAULT_PROP_HOST);
     211           16 :   grpc->config.config = &self->config;
     212           16 : }
     213              : 
     214              : /**
     215              :  * @brief initialize tensor_sink_grpc element.
     216              :  */
     217              : static void
     218           16 : gst_tensor_sink_grpc_init (GstTensorSinkGRPC * self)
     219              : {
     220           16 :   gst_tensors_config_init (&self->config);
     221              : 
     222           16 :   self->silent = DEFAULT_PROP_SILENT;
     223           16 :   self->out = 0;
     224              : 
     225           16 :   self->priv = g_new0 (grpc_private, 1);
     226           16 :   grpc_config_init (self);
     227              : 
     228           16 :   GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_CONFIGURED);
     229           16 :   GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED);
     230           16 : }
     231              : 
     232              : /**
     233              :  * @brief finalize tensor_sink_grpc element.
     234              :  */
     235              : static void
     236           16 : gst_tensor_sink_grpc_finalize (GObject * gobject)
     237              : {
     238           16 :   GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (gobject);
     239           16 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     240              : 
     241           16 :   g_free (grpc->config.host);
     242           16 :   g_free (grpc);
     243           16 :   gst_tensors_config_free (&self->config);
     244              : 
     245           16 :   G_OBJECT_CLASS (parent_class)->finalize (gobject);
     246           16 : }
     247              : 
     248              : /**
     249              :  * @brief set caps of tensor_sink_grpc element.
     250              :  */
     251              : static gboolean
     252           12 : gst_tensor_sink_grpc_setcaps (GstBaseSink * sink, GstCaps * caps)
     253              : {
     254              :   GstTensorSinkGRPC *self;
     255              :   gboolean ret;
     256              : 
     257           12 :   self = GST_TENSOR_SINK_GRPC (sink);
     258              : 
     259           12 :   GST_OBJECT_LOCK (self);
     260              : 
     261           12 :   ret = gst_tensors_config_from_caps (&self->config, caps, TRUE);
     262           12 :   if (ret) {
     263           12 :     GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_CONFIGURED);
     264              :   }
     265              : 
     266           12 :   GST_OBJECT_UNLOCK (self);
     267              : 
     268           12 :   return ret;
     269              : }
     270              : 
     271              : /**
     272              :  * @brief render function of tensor_sink_grpc element.
     273              :  */
     274              : static GstFlowReturn
     275           90 : gst_tensor_sink_grpc_render (GstBaseSink * sink, GstBuffer * buf)
     276              : {
     277           90 :   GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
     278           90 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     279              :   gboolean ret;
     280              : 
     281           90 :   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (self,
     282              :           GST_TENSOR_SINK_GRPC_STARTED), GST_FLOW_FLUSHING);
     283              : 
     284           90 :   ret = grpc_send (grpc->instance, buf);
     285              : 
     286           90 :   return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
     287              : }
     288              : 
     289              : /**
     290              :  * @brief set properties of tensor_sink_grpc element.
     291              :  */
     292              : static void
     293           61 : gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id,
     294              :     const GValue * value, GParamSpec * pspec)
     295              : {
     296              :   GstTensorSinkGRPC *self;
     297              :   grpc_private *grpc;
     298              : 
     299           61 :   g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object));
     300              : 
     301           61 :   self = GST_TENSOR_SINK_GRPC (object);
     302           61 :   grpc = GET_GRPC_PRIVATE (self);
     303              : 
     304           61 :   grpc_common_set_property (object, &self->silent, grpc, prop_id, value, pspec);
     305              : }
     306              : 
     307              : /**
     308              :  * @brief get properties of tensor_sink_grpc element.
     309              :  */
     310              : static void
     311           12 : gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id,
     312              :     GValue * value, GParamSpec * pspec)
     313              : {
     314              :   GstTensorSinkGRPC *self;
     315              :   grpc_private *grpc;
     316              : 
     317           12 :   g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object));
     318              : 
     319           12 :   self = GST_TENSOR_SINK_GRPC (object);
     320           12 :   grpc = GET_GRPC_PRIVATE (self);
     321              : 
     322           12 :   grpc_common_get_property (object, self->silent, self->out, grpc, prop_id, value, pspec);
     323              : }
     324              : 
     325              : /**
     326              :  * @brief start tensor_sink_grpc element.
     327              :  */
     328              : static gboolean
     329           12 : gst_tensor_sink_grpc_start (GstBaseSink * sink)
     330              : {
     331           12 :   GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
     332           12 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     333              :   gboolean ret;
     334              : 
     335           12 :   if (GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED))
     336            0 :     return TRUE;
     337              : 
     338           12 :   if (grpc->instance)
     339            0 :     grpc_destroy (grpc->instance);
     340              : 
     341           12 :   grpc->instance = grpc_new (&grpc->config);
     342           12 :   if (!grpc->instance)
     343            0 :     return FALSE;
     344              : 
     345           12 :   ret = grpc_start (grpc->instance);
     346           12 :   if (ret) {
     347           12 :     GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_STARTED);
     348              : 
     349           12 :     if (grpc->config.is_server) {
     350            4 :       gint port = grpc_get_listening_port (grpc->instance);
     351            4 :       if (port > 0)
     352            4 :         g_object_set (self, "port", port, NULL);
     353              :     }
     354              :   }
     355              : 
     356           12 :   return TRUE;
     357              : }
     358              : 
     359              : /**
     360              :  * @brief stop tensor_sink_grpc element.
     361              :  */
     362              : static gboolean
     363           12 : gst_tensor_sink_grpc_stop (GstBaseSink * sink)
     364              : {
     365           12 :   GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
     366           12 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     367              : 
     368           12 :   if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED))
     369            0 :     return TRUE;
     370              : 
     371           12 :   if (grpc->instance)
     372           12 :     grpc_destroy (grpc->instance);
     373           12 :   grpc->instance = NULL;
     374              : 
     375           12 :   GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED);
     376              : 
     377           12 :   return TRUE;
     378              : }
     379              : 
     380              : /**
     381              :  * @brief unlock any blocking operations
     382              :  */
     383              : static gboolean
     384           24 : gst_tensor_sink_grpc_unlock (GstBaseSink * sink)
     385              : {
     386           24 :   GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
     387           24 :   grpc_private *grpc = GET_GRPC_PRIVATE (self);
     388              : 
     389              :   /* notify to gRPC */
     390           24 :   if (grpc->instance)
     391           24 :     grpc_stop (grpc->instance);
     392              : 
     393           24 :   silent_debug ("Unlocking create");
     394              : 
     395           24 :   return TRUE;
     396              : }
         |