LCOV - code coverage report
Current view: top level - nnstreamer-2.4.2/gst/nnstreamer/tensor_query - tensor_query_client.c (source / functions) Coverage Total Hit
Test: nnstreamer 2.4.2-0 nnstreamer/nnstreamer#eca68b8d050408568af95d831a8eef62aaee7784 Lines: 80.3 % 376 302
Test Date: 2025-03-13 05:38:21 Functions: 93.8 % 16 15

            Line data    Source code
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * Copyright (C) 2021 Samsung Electronics Co., Ltd.
       4              :  *
       5              :  * @file    tensor_query_client.c
       6              :  * @date    09 Jul 2021
       7              :  * @brief   GStreamer plugin to handle tensor query client
       8              :  * @author  Junhwan Kim <jejudo.kim@samsung.com>
       9              :  * @see     http://github.com/nnstreamer/nnstreamer
      10              :  * @bug     No known bugs
      11              :  *
      12              :  */
      13              : #ifdef HAVE_CONFIG_H
      14              : #include <config.h>
      15              : #endif
      16              : 
      17              : #include "nnstreamer_util.h"
      18              : #include "tensor_query_client.h"
      19              : #include <gio/gio.h>
      20              : #include <glib.h>
      21              : #include <string.h>
      22              : #include "tensor_query_common.h"
      23              : 
      24              : #include <stdio.h>
      25              : #include <stdlib.h>
      26              : #include <unistd.h>
      27              : 
      28              : /**
      29              :  * @brief Macro for debug mode.
      30              :  */
      31              : #ifndef DBG
      32              : #define DBG (!self->silent)
      33              : #endif
      34              : 
      35              : /**
      36              :  * @brief Properties.
      37              :  */
      38              : enum
      39              : {
      40              :   PROP_0,
      41              :   PROP_HOST,
      42              :   PROP_PORT,
      43              :   PROP_DEST_HOST,
      44              :   PROP_DEST_PORT,
      45              :   PROP_CONNECT_TYPE,
      46              :   PROP_TOPIC,
      47              :   PROP_TIMEOUT,
      48              :   PROP_SILENT,
      49              :   PROP_MAX_REQUEST,
      50              : };
      51              : 
      52              : #define TCP_HIGHEST_PORT        65535
      53              : #define TCP_DEFAULT_HOST        "localhost"
      54              : #define TCP_DEFAULT_SRV_SRC_PORT 3000
      55              : #define TCP_DEFAULT_CLIENT_SRC_PORT 3001
      56              : #define DEFAULT_CLIENT_TIMEOUT  0
      57              : #define DEFAULT_SILENT TRUE
      58              : #define DEFAULT_MAX_REQUEST 2
      59              : 
      60              : GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_client_debug);
      61              : #define GST_CAT_DEFAULT gst_tensor_query_client_debug
      62              : 
      63              : /**
      64              :  * @brief the capabilities of the inputs.
      65              :  */
      66              : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
      67              :     GST_PAD_SINK,
      68              :     GST_PAD_ALWAYS,
      69              :     GST_STATIC_CAPS_ANY);
      70              : 
      71              : /**
      72              :  * @brief the capabilities of the outputs.
      73              :  */
      74              : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
      75              :     GST_PAD_SRC,
      76              :     GST_PAD_ALWAYS,
      77              :     GST_STATIC_CAPS_ANY);
      78              : 
      79              : #define gst_tensor_query_client_parent_class parent_class
      80         1227 : G_DEFINE_TYPE (GstTensorQueryClient, gst_tensor_query_client, GST_TYPE_ELEMENT);
      81              : 
      82              : static void gst_tensor_query_client_finalize (GObject * object);
      83              : static void gst_tensor_query_client_set_property (GObject * object,
      84              :     guint prop_id, const GValue * value, GParamSpec * pspec);
      85              : static void gst_tensor_query_client_get_property (GObject * object,
      86              :     guint prop_id, GValue * value, GParamSpec * pspec);
      87              : 
      88              : static gboolean gst_tensor_query_client_sink_event (GstPad * pad,
      89              :     GstObject * parent, GstEvent * event);
      90              : static gboolean gst_tensor_query_client_sink_query (GstPad * pad,
      91              :     GstObject * parent, GstQuery * query);
      92              : static GstFlowReturn gst_tensor_query_client_chain (GstPad * pad,
      93              :     GstObject * parent, GstBuffer * buf);
      94              : static GstCaps *gst_tensor_query_client_query_caps (GstTensorQueryClient * self,
      95              :     GstPad * pad, GstCaps * filter);
      96              : 
      97              : /**
      98              :  * @brief initialize the class
      99              :  */
     100              : static void
     101           12 : gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
     102              : {
     103              :   GObjectClass *gobject_class;
     104              :   GstElementClass *gstelement_class;
     105              : 
     106           12 :   gobject_class = (GObjectClass *) klass;
     107           12 :   gstelement_class = (GstElementClass *) klass;
     108              : 
     109           12 :   gobject_class->set_property = gst_tensor_query_client_set_property;
     110           12 :   gobject_class->get_property = gst_tensor_query_client_get_property;
     111           12 :   gobject_class->finalize = gst_tensor_query_client_finalize;
     112              : 
     113              :   /** install property goes here */
     114           12 :   g_object_class_install_property (gobject_class, PROP_HOST,
     115              :       g_param_spec_string ("host", "Host",
     116              :           "A host address to receive the packets from query server",
     117              :           TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     118           12 :   g_object_class_install_property (gobject_class, PROP_PORT,
     119              :       g_param_spec_uint ("port", "Port",
     120              :           "A port number to receive the packets from query server", 0,
     121              :           TCP_HIGHEST_PORT, TCP_DEFAULT_SRV_SRC_PORT,
     122              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     123           12 :   g_object_class_install_property (gobject_class, PROP_DEST_HOST,
     124              :       g_param_spec_string ("dest-host", "Destination Host",
     125              :           "A tenor query server host to send the packets",
     126              :           TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     127           12 :   g_object_class_install_property (gobject_class, PROP_DEST_PORT,
     128              :       g_param_spec_uint ("dest-port", "Destination Port",
     129              :           "The port of tensor query server to send the packets", 0,
     130              :           TCP_HIGHEST_PORT, TCP_DEFAULT_CLIENT_SRC_PORT,
     131              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     132           12 :   g_object_class_install_property (gobject_class, PROP_SILENT,
     133              :       g_param_spec_boolean ("silent", "Silent", "Produce verbose output",
     134              :           DEFAULT_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     135           12 :   g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
     136              :       g_param_spec_enum ("connect-type", "Connect Type",
     137              :           "The connections type between client and server.",
     138              :           GST_TYPE_QUERY_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
     139              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     140           12 :   g_object_class_install_property (gobject_class, PROP_TOPIC,
     141              :       g_param_spec_string ("topic", "Topic",
     142              :           "The main topic of the host.",
     143              :           "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     144              : 
     145           12 :   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
     146              :       g_param_spec_uint ("timeout", "timeout value",
     147              :           "A timeout value (in ms) to wait message from query server after sending buffer to server. 0 means no wait.",
     148              :           0, G_MAXUINT, DEFAULT_CLIENT_TIMEOUT,
     149              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     150           12 :   g_object_class_install_property (gobject_class, PROP_MAX_REQUEST,
     151              :       g_param_spec_uint ("max-request", "Maximum number of request",
     152              :           "Sets the maximum number of buffers to request to the query server. "
     153              :           "If the processing speed of query server is slower than the query client, the input buffer is dropped. "
     154              :           "Two buffers are requested by default, and 0 means that all buffers are sent to query server without drop. ",
     155              :           0, G_MAXUINT, DEFAULT_MAX_REQUEST,
     156              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     157           12 :   gst_element_class_add_pad_template (gstelement_class,
     158              :       gst_static_pad_template_get (&sinktemplate));
     159           12 :   gst_element_class_add_pad_template (gstelement_class,
     160              :       gst_static_pad_template_get (&srctemplate));
     161              : 
     162           12 :   gst_element_class_set_static_metadata (gstelement_class,
     163              :       "TensorQueryClient", "Filter/Tensor/Query",
     164              :       "Handle querying tensor data through the network",
     165              :       "Samsung Electronics Co., Ltd.");
     166              : 
     167           12 :   GST_DEBUG_CATEGORY_INIT (gst_tensor_query_client_debug, "tensor_query_client",
     168              :       0, "Tensor Query Client");
     169           12 : }
     170              : 
     171              : /**
     172              :  * @brief initialize the new element
     173              :  */
     174              : static void
     175           12 : gst_tensor_query_client_init (GstTensorQueryClient * self)
     176              : {
     177              :   /** setup sink pad */
     178           12 :   self->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
     179           12 :   gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
     180           12 :   gst_pad_set_event_function (self->sinkpad,
     181              :       GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_event));
     182           12 :   gst_pad_set_query_function (self->sinkpad,
     183              :       GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_query));
     184           12 :   gst_pad_set_chain_function (self->sinkpad,
     185              :       GST_DEBUG_FUNCPTR (gst_tensor_query_client_chain));
     186              : 
     187              :   /** setup src pad */
     188           12 :   self->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
     189           12 :   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
     190              : 
     191              :   /* init properties */
     192           12 :   self->silent = DEFAULT_SILENT;
     193           12 :   self->connect_type = DEFAULT_CONNECT_TYPE;
     194           12 :   self->host = g_strdup (TCP_DEFAULT_HOST);
     195           12 :   self->port = TCP_DEFAULT_CLIENT_SRC_PORT;
     196           12 :   self->dest_host = g_strdup (TCP_DEFAULT_HOST);
     197           12 :   self->dest_port = TCP_DEFAULT_SRV_SRC_PORT;
     198           12 :   self->topic = NULL;
     199           12 :   self->in_caps_str = NULL;
     200           12 :   self->timeout = DEFAULT_CLIENT_TIMEOUT;
     201           12 :   self->edge_h = NULL;
     202           12 :   self->msg_queue = g_async_queue_new ();
     203           12 :   self->max_request = DEFAULT_MAX_REQUEST;
     204           12 :   self->requested_num = 0;
     205           12 :   self->is_tensor = FALSE;
     206           12 :   gst_tensors_config_init (&self->config);
     207           12 : }
     208              : 
     209              : /**
     210              :  * @brief finalize the object
     211              :  */
     212              : static void
     213           11 : gst_tensor_query_client_finalize (GObject * object)
     214              : {
     215           11 :   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
     216              :   nns_edge_data_h data_h;
     217              : 
     218           11 :   g_free (self->host);
     219           11 :   self->host = NULL;
     220           11 :   g_free (self->dest_host);
     221           11 :   self->dest_host = NULL;
     222           11 :   g_free (self->topic);
     223           11 :   self->topic = NULL;
     224           11 :   g_free (self->in_caps_str);
     225           11 :   self->in_caps_str = NULL;
     226              : 
     227           17 :   while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
     228            6 :     nns_edge_data_destroy (data_h);
     229              :   }
     230              : 
     231           11 :   if (self->msg_queue) {
     232           11 :     g_async_queue_unref (self->msg_queue);
     233           11 :     self->msg_queue = NULL;
     234              :   }
     235              : 
     236           11 :   if (self->edge_h) {
     237           11 :     nns_edge_release_handle (self->edge_h);
     238           11 :     self->edge_h = NULL;
     239              :   }
     240              : 
     241           11 :   gst_tensors_config_free (&self->config);
     242              : 
     243           11 :   G_OBJECT_CLASS (parent_class)->finalize (object);
     244           11 : }
     245              : 
     246              : /**
     247              :  * @brief set property
     248              :  */
     249              : static void
     250           28 : gst_tensor_query_client_set_property (GObject * object, guint prop_id,
     251              :     const GValue * value, GParamSpec * pspec)
     252              : {
     253           28 :   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
     254              : 
     255              :   /** @todo DO NOT update properties (host, port, ..) while pipeline is running. */
     256           28 :   switch (prop_id) {
     257            2 :     case PROP_HOST:
     258            2 :       if (!g_value_get_string (value)) {
     259            0 :         nns_logw ("Sink host property cannot be NULL");
     260            0 :         break;
     261              :       }
     262            2 :       g_free (self->host);
     263            2 :       self->host = g_value_dup_string (value);
     264            2 :       break;
     265            9 :     case PROP_PORT:
     266            9 :       self->port = g_value_get_uint (value);
     267            9 :       break;
     268            3 :     case PROP_DEST_HOST:
     269            3 :       if (!g_value_get_string (value)) {
     270            0 :         nns_logw ("Sink host property cannot be NULL");
     271            0 :         break;
     272              :       }
     273            3 :       g_free (self->dest_host);
     274            3 :       self->dest_host = g_value_dup_string (value);
     275            3 :       break;
     276           11 :     case PROP_DEST_PORT:
     277           11 :       self->dest_port = g_value_get_uint (value);
     278           11 :       break;
     279            1 :     case PROP_CONNECT_TYPE:
     280            1 :       self->connect_type = g_value_get_enum (value);
     281            1 :       break;
     282            1 :     case PROP_TOPIC:
     283            1 :       if (!g_value_get_string (value)) {
     284            0 :         nns_logw ("Topic property cannot be NULL. Query-hybrid is disabled.");
     285            0 :         break;
     286              :       }
     287            1 :       g_free (self->topic);
     288            1 :       self->topic = g_value_dup_string (value);
     289            1 :       break;
     290            0 :     case PROP_TIMEOUT:
     291            0 :       self->timeout = g_value_get_uint (value);
     292            0 :       break;
     293            0 :     case PROP_SILENT:
     294            0 :       self->silent = g_value_get_boolean (value);
     295            0 :       break;
     296            1 :     case PROP_MAX_REQUEST:
     297            1 :       self->max_request = g_value_get_uint (value);
     298            1 :       break;
     299            0 :     default:
     300            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     301            0 :       break;
     302              :   }
     303           28 : }
     304              : 
     305              : /**
     306              :  * @brief get property
     307              :  */
     308              : static void
     309            0 : gst_tensor_query_client_get_property (GObject * object, guint prop_id,
     310              :     GValue * value, GParamSpec * pspec)
     311              : {
     312            0 :   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
     313              : 
     314            0 :   switch (prop_id) {
     315            0 :     case PROP_HOST:
     316            0 :       g_value_set_string (value, self->host);
     317            0 :       break;
     318            0 :     case PROP_PORT:
     319            0 :       g_value_set_uint (value, self->port);
     320            0 :       break;
     321            0 :     case PROP_DEST_HOST:
     322            0 :       g_value_set_string (value, self->dest_host);
     323            0 :       break;
     324            0 :     case PROP_DEST_PORT:
     325            0 :       g_value_set_uint (value, self->dest_port);
     326            0 :       break;
     327            0 :     case PROP_CONNECT_TYPE:
     328            0 :       g_value_set_enum (value, self->connect_type);
     329            0 :       break;
     330            0 :     case PROP_TOPIC:
     331            0 :       g_value_set_string (value, self->topic);
     332            0 :       break;
     333            0 :     case PROP_TIMEOUT:
     334            0 :       g_value_set_uint (value, self->timeout);
     335            0 :       break;
     336            0 :     case PROP_SILENT:
     337            0 :       g_value_set_boolean (value, self->silent);
     338            0 :       break;
     339            0 :     case PROP_MAX_REQUEST:
     340            0 :       g_value_set_uint (value, self->max_request);
     341            0 :       break;
     342            0 :     default:
     343            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     344            0 :       break;
     345              :   }
     346            0 : }
     347              : 
     348              : /**
     349              :  * @brief Update src pad caps from tensors config.
     350              :  */
     351              : static gboolean
     352           11 : gst_tensor_query_client_update_caps (GstTensorQueryClient * self,
     353              :     const gchar * caps_str)
     354              : {
     355              :   GstCaps *curr_caps, *out_caps;
     356           11 :   gboolean ret = FALSE;
     357           11 :   out_caps = gst_caps_from_string (caps_str);
     358           11 :   silent_debug_caps (self, out_caps, "set out-caps");
     359              : 
     360              :   /* Update src pad caps if it is different. */
     361           11 :   curr_caps = gst_pad_get_current_caps (self->srcpad);
     362           11 :   if (curr_caps == NULL || !gst_caps_is_equal (curr_caps, out_caps)) {
     363           22 :     if (gst_caps_is_fixed (out_caps)) {
     364            9 :       ret = gst_pad_set_caps (self->srcpad, out_caps);
     365              : 
     366              :       /**
     367              :        * If pad caps is updated, prepare the tensor information here.
     368              :        * It will be used in chain function, to push tensor buffer into src pad.
     369              :        */
     370            9 :       if (ret) {
     371            9 :         GstStructure *s = gst_caps_get_structure (out_caps, 0);
     372              : 
     373            9 :         self->is_tensor = gst_structure_is_tensor_stream (s);
     374            9 :         if (self->is_tensor) {
     375            8 :           gst_tensors_config_free (&self->config);
     376            8 :           gst_tensors_config_from_structure (&self->config, s);
     377              :         }
     378              :       }
     379              :     } else {
     380            2 :       nns_loge ("out-caps from tensor_query_serversink is not fixed. "
     381              :           "Failed to update client src caps, out-caps: %s", caps_str);
     382              :     }
     383              :   } else {
     384              :     /** Don't need to update when the capability is the same. */
     385            0 :     ret = TRUE;
     386              :   }
     387              : 
     388           11 :   if (curr_caps)
     389            0 :     gst_caps_unref (curr_caps);
     390              : 
     391           11 :   gst_caps_unref (out_caps);
     392              : 
     393           11 :   return ret;
     394              : }
     395              : 
     396              : /**
     397              :  * @brief Parse caps from received event data.
     398              :  */
     399              : static gchar *
     400           22 : _nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
     401              : {
     402              :   gchar **strv;
     403              :   gint num, i;
     404           22 :   gchar *find_key = NULL;
     405           22 :   gchar *ret_str = NULL;
     406              : 
     407           22 :   if (!caps_str)
     408            0 :     return NULL;
     409              : 
     410           22 :   strv = g_strsplit (caps_str, "@", -1);
     411           22 :   num = g_strv_length (strv);
     412              : 
     413           22 :   find_key =
     414              :       is_src ==
     415           11 :       TRUE ? g_strdup ("query_server_src_caps") :
     416           11 :       g_strdup ("query_server_sink_caps");
     417              : 
     418           33 :   for (i = 1; i < num; i += 2) {
     419           33 :     if (0 == g_strcmp0 (find_key, strv[i])) {
     420           22 :       ret_str = g_strdup (strv[i + 1]);
     421           22 :       break;
     422              :     }
     423              :   }
     424              : 
     425           22 :   g_free (find_key);
     426           22 :   g_strfreev (strv);
     427              : 
     428           22 :   return ret_str;
     429              : }
     430              : 
     431              : /**
     432              :  * @brief nnstreamer-edge event callback.
     433              :  */
     434              : static int
     435          108 : _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
     436              : {
     437              :   nns_edge_event_e event_type;
     438          108 :   int ret = NNS_EDGE_ERROR_NONE;
     439          108 :   GstTensorQueryClient *self = (GstTensorQueryClient *) user_data;
     440              : 
     441          108 :   if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) {
     442            0 :     nns_loge ("Failed to get event type!");
     443          108 :     return NNS_EDGE_ERROR_NOT_SUPPORTED;
     444              :   }
     445              : 
     446          108 :   switch (event_type) {
     447           11 :     case NNS_EDGE_EVENT_CAPABILITY:
     448              :     {
     449              :       GstCaps *server_caps, *client_caps;
     450              :       GstStructure *server_st, *client_st;
     451           11 :       gboolean result = FALSE;
     452              :       gchar *ret_str, *caps_str;
     453              : 
     454           11 :       nns_edge_event_parse_capability (event_h, &caps_str);
     455           11 :       ret_str = _nns_edge_parse_caps (caps_str, TRUE);
     456           11 :       nns_logd ("Received server-src caps: %s", GST_STR_NULL (ret_str));
     457           11 :       client_caps = gst_caps_from_string ((gchar *) self->in_caps_str);
     458           11 :       server_caps = gst_caps_from_string (ret_str);
     459           11 :       g_free (ret_str);
     460              : 
     461              :       /** Server framerate may vary. Let's skip comparing the framerate. */
     462           11 :       gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
     463              :           NULL);
     464           11 :       gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
     465              :           NULL);
     466              : 
     467           11 :       server_st = gst_caps_get_structure (server_caps, 0);
     468           11 :       client_st = gst_caps_get_structure (client_caps, 0);
     469              : 
     470           11 :       if (gst_structure_is_tensor_stream (server_st)) {
     471              :         GstTensorsConfig server_config, client_config;
     472              : 
     473            9 :         gst_tensors_config_from_structure (&server_config, server_st);
     474            9 :         gst_tensors_config_from_structure (&client_config, client_st);
     475              : 
     476            9 :         result = gst_tensors_config_is_equal (&server_config, &client_config);
     477              : 
     478            9 :         gst_tensors_config_free (&server_config);
     479            9 :         gst_tensors_config_free (&client_config);
     480              :       }
     481              : 
     482           11 :       if (result || gst_caps_can_intersect (client_caps, server_caps)) {
     483              :         /** Update client src caps */
     484           11 :         ret_str = _nns_edge_parse_caps (caps_str, FALSE);
     485           11 :         nns_logd ("Received server-sink caps: %s", GST_STR_NULL (ret_str));
     486           11 :         if (!gst_tensor_query_client_update_caps (self, ret_str)) {
     487            2 :           nns_loge ("Failed to update client source caps.");
     488            2 :           ret = NNS_EDGE_ERROR_UNKNOWN;
     489              :         }
     490           11 :         g_free (ret_str);
     491              :       } else {
     492              :         /* respond deny with src caps string */
     493            0 :         nns_loge ("Query caps is not acceptable!");
     494            0 :         ret = NNS_EDGE_ERROR_UNKNOWN;
     495              :       }
     496              : 
     497           11 :       gst_caps_unref (server_caps);
     498           11 :       gst_caps_unref (client_caps);
     499           11 :       g_free (caps_str);
     500           11 :       break;
     501              :     }
     502           88 :     case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
     503              :     {
     504              :       nns_edge_data_h data;
     505              : 
     506           88 :       nns_edge_event_parse_new_data (event_h, &data);
     507           88 :       g_async_queue_push (self->msg_queue, data);
     508           88 :       break;
     509              :     }
     510            9 :     default:
     511            9 :       break;
     512              :   }
     513              : 
     514          108 :   return ret;
     515              : }
     516              : 
     517              : /**
     518              :  * @brief Internal function to create edge handle.
     519              :  */
     520              : static gboolean
     521           16 : gst_tensor_query_client_create_edge_handle (GstTensorQueryClient * self)
     522              : {
     523           16 :   gboolean started = FALSE;
     524           16 :   gchar *prev_caps = NULL;
     525              :   int ret;
     526              : 
     527              :   /* Already created, compare caps string. */
     528           16 :   if (self->edge_h) {
     529            0 :     ret = nns_edge_get_info (self->edge_h, "CAPS", &prev_caps);
     530              : 
     531            0 :     if (ret != NNS_EDGE_ERROR_NONE || !prev_caps ||
     532            0 :         !g_str_equal (prev_caps, self->in_caps_str)) {
     533              :       /* Capability is changed, close old handle. */
     534            0 :       nns_edge_release_handle (self->edge_h);
     535            0 :       self->edge_h = NULL;
     536              :     } else {
     537           15 :       return TRUE;
     538              :     }
     539              :   }
     540              : 
     541           16 :   ret = nns_edge_create_handle ("TEMP_ID", self->connect_type,
     542              :       NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &self->edge_h);
     543           16 :   if (ret != NNS_EDGE_ERROR_NONE)
     544            0 :     return FALSE;
     545              : 
     546           16 :   nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
     547              : 
     548           16 :   if (self->topic)
     549            1 :     nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
     550           16 :   if (self->host)
     551           16 :     nns_edge_set_info (self->edge_h, "HOST", self->host);
     552           16 :   if (self->port > 0) {
     553            9 :     gchar *port = g_strdup_printf ("%u", self->port);
     554            9 :     nns_edge_set_info (self->edge_h, "PORT", port);
     555            9 :     g_free (port);
     556              :   }
     557           16 :   nns_edge_set_info (self->edge_h, "CAPS", self->in_caps_str);
     558              : 
     559           16 :   ret = nns_edge_start (self->edge_h);
     560           16 :   if (ret != NNS_EDGE_ERROR_NONE) {
     561            0 :     nns_loge
     562              :         ("Failed to start NNStreamer-edge. Please check server IP and port.");
     563            0 :     goto done;
     564              :   }
     565              : 
     566           16 :   ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
     567           15 :   if (ret != NNS_EDGE_ERROR_NONE) {
     568            4 :     nns_loge ("Failed to connect to edge server!");
     569            4 :     goto done;
     570              :   }
     571              : 
     572           11 :   started = TRUE;
     573              : 
     574           15 : done:
     575           15 :   if (!started) {
     576            4 :     nns_edge_release_handle (self->edge_h);
     577            4 :     self->edge_h = NULL;
     578              :   }
     579              : 
     580           15 :   return started;
     581              : }
     582              : 
     583              : /**
     584              :  * @brief This function handles sink event.
     585              :  */
     586              : static gboolean
     587           50 : gst_tensor_query_client_sink_event (GstPad * pad,
     588              :     GstObject * parent, GstEvent * event)
     589              : {
     590           50 :   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
     591              : 
     592           50 :   GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
     593              :       GST_EVENT_TYPE_NAME (event), event);
     594              : 
     595           50 :   switch (GST_EVENT_TYPE (event)) {
     596           16 :     case GST_EVENT_CAPS:
     597              :     {
     598              :       GstCaps *caps;
     599              :       gboolean ret;
     600              : 
     601           16 :       gst_event_parse_caps (event, &caps);
     602           16 :       g_free (self->in_caps_str);
     603           16 :       self->in_caps_str = gst_caps_to_string (caps);
     604              : 
     605           16 :       ret = gst_tensor_query_client_create_edge_handle (self);
     606           15 :       if (!ret)
     607            4 :         nns_loge ("Failed to create edge handle, cannot start query client.");
     608              : 
     609           15 :       gst_event_unref (event);
     610           15 :       return ret;
     611              :     }
     612           34 :     default:
     613           34 :       break;
     614              :   }
     615              : 
     616           34 :   return gst_pad_event_default (pad, parent, event);
     617              : }
     618              : 
     619              : /**
     620              :  * @brief This function handles sink pad query.
     621              :  */
     622              : static gboolean
     623          114 : gst_tensor_query_client_sink_query (GstPad * pad,
     624              :     GstObject * parent, GstQuery * query)
     625              : {
     626          114 :   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
     627              : 
     628          114 :   GST_DEBUG_OBJECT (self, "Received %s query: %" GST_PTR_FORMAT,
     629              :       GST_QUERY_TYPE_NAME (query), query);
     630              : 
     631          114 :   switch (GST_QUERY_TYPE (query)) {
     632           70 :     case GST_QUERY_CAPS:
     633              :     {
     634              :       GstCaps *caps;
     635              :       GstCaps *filter;
     636              : 
     637           70 :       gst_query_parse_caps (query, &filter);
     638           70 :       caps = gst_tensor_query_client_query_caps (self, pad, filter);
     639              : 
     640           70 :       gst_query_set_caps_result (query, caps);
     641           70 :       gst_caps_unref (caps);
     642           70 :       return TRUE;
     643              :     }
     644           44 :     case GST_QUERY_ACCEPT_CAPS:
     645              :     {
     646              :       GstCaps *caps;
     647              :       GstCaps *template_caps;
     648           44 :       gboolean res = FALSE;
     649              : 
     650           44 :       gst_query_parse_accept_caps (query, &caps);
     651           44 :       silent_debug_caps (self, caps, "accept-caps");
     652              : 
     653           44 :       if (gst_caps_is_fixed (caps)) {
     654           44 :         template_caps = gst_pad_get_pad_template_caps (pad);
     655              : 
     656           44 :         res = gst_caps_can_intersect (template_caps, caps);
     657           44 :         gst_caps_unref (template_caps);
     658              :       }
     659              : 
     660           44 :       gst_query_set_accept_caps_result (query, res);
     661           44 :       return TRUE;
     662              :     }
     663            0 :     default:
     664            0 :       break;
     665              :   }
     666              : 
     667            0 :   return gst_pad_query_default (pad, parent, query);
     668              : }
     669              : 
     670              : /**
     671              :  * @brief Chain function, this function does the actual processing.
     672              :  */
     673              : static GstFlowReturn
     674          150 : gst_tensor_query_client_chain (GstPad * pad,
     675              :     GstObject * parent, GstBuffer * buf)
     676              : {
     677          150 :   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
     678          150 :   GstBuffer *out_buf = NULL;
     679          150 :   GstFlowReturn res = GST_FLOW_OK;
     680          150 :   nns_edge_data_h data_h = NULL;
     681          150 :   guint i, num_tensors = 0, num_data = 0;
     682          150 :   int ret = NNS_EDGE_ERROR_NONE;
     683              :   GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
     684              :   GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
     685              :   gchar *val;
     686              :   UNUSED (pad);
     687              : 
     688          150 :   if (self->max_request > 0 && self->requested_num > self->max_request) {
     689           37 :     nns_logi
     690              :         ("The processing speed of the query server is too slow. Drop the input buffer.");
     691           37 :     goto try_pop;
     692              :   }
     693              : 
     694          113 :   ret = nns_edge_data_create (&data_h);
     695          113 :   if (ret != NNS_EDGE_ERROR_NONE) {
     696            0 :     nns_loge ("Failed to create data handle in client chain.");
     697            0 :     goto try_pop;
     698              :   }
     699              : 
     700          113 :   num_tensors = gst_tensor_buffer_get_count (buf);
     701          226 :   for (i = 0; i < num_tensors; i++) {
     702          113 :     mem[i] = gst_tensor_buffer_get_nth_memory (buf, i);
     703          113 :     if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
     704            0 :       ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
     705            0 :       gst_memory_unref (mem[i]);
     706            0 :       num_tensors = i;
     707            0 :       goto try_pop;
     708              :     }
     709          113 :     nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
     710              :   }
     711              : 
     712          113 :   nns_edge_get_info (self->edge_h, "client_id", &val);
     713          113 :   nns_edge_data_set_info (data_h, "client_id", val);
     714          113 :   g_free (val);
     715              : 
     716          113 :   ret = nns_edge_send (self->edge_h, data_h);
     717          113 :   if (ret == NNS_EDGE_ERROR_NONE) {
     718           95 :     self->requested_num++;
     719              :   } else {
     720           18 :     nns_loge ("Failed to publish to server node.");
     721              :   }
     722              : 
     723          150 : try_pop:
     724          150 :   if (data_h)
     725          113 :     nns_edge_data_destroy (data_h);
     726              : 
     727          300 :   data_h = g_async_queue_timeout_pop (self->msg_queue,
     728          150 :       self->timeout * G_TIME_SPAN_MILLISECOND);
     729          150 :   if (data_h) {
     730           81 :     if (self->requested_num > 0)
     731           81 :       self->requested_num--;
     732           81 :     ret = nns_edge_data_get_count (data_h, &num_data);
     733              : 
     734           81 :     if (ret == NNS_EDGE_ERROR_NONE && num_data > 0) {
     735              :       GstMemory *new_mem;
     736              :       GstTensorInfo *_info;
     737              : 
     738           81 :       out_buf = gst_buffer_new ();
     739              : 
     740          162 :       for (i = 0; i < num_data; i++) {
     741           81 :         void *data = NULL;
     742              :         nns_size_t data_len;
     743              :         gpointer new_data;
     744              : 
     745           81 :         nns_edge_data_get (data_h, i, &data, &data_len);
     746           81 :         new_data = _g_memdup (data, data_len);
     747              : 
     748           81 :         new_mem = gst_memory_new_wrapped (0, new_data, data_len, 0, data_len,
     749              :             new_data, g_free);
     750              : 
     751           81 :         if (self->is_tensor) {
     752           72 :           _info = gst_tensors_info_get_nth_info (&self->config.info, i);
     753           72 :           gst_tensor_buffer_append_memory (out_buf, new_mem, _info);
     754              :         } else {
     755            9 :           gst_buffer_append_memory (out_buf, new_mem);
     756              :         }
     757              :       }
     758              : 
     759              :       /* metadata from incoming buffer */
     760           81 :       gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
     761              : 
     762           81 :       res = gst_pad_push (self->srcpad, out_buf);
     763              :     } else {
     764            0 :       nns_loge ("Failed to get the number of memories of the edge data.");
     765            0 :       res = GST_FLOW_ERROR;
     766              :     }
     767              : 
     768           81 :     nns_edge_data_destroy (data_h);
     769              :   }
     770              : 
     771          263 :   for (i = 0; i < num_tensors; i++) {
     772          113 :     gst_memory_unmap (mem[i], &map[i]);
     773          113 :     gst_memory_unref (mem[i]);
     774              :   }
     775              : 
     776          150 :   gst_buffer_unref (buf);
     777          150 :   return res;
     778              : }
     779              : 
     780              : /**
     781              :  * @brief Get pad caps for caps negotiation.
     782              :  */
     783              : static GstCaps *
     784           70 : gst_tensor_query_client_query_caps (GstTensorQueryClient * self, GstPad * pad,
     785              :     GstCaps * filter)
     786              : {
     787              :   GstCaps *caps;
     788              : 
     789           70 :   caps = gst_pad_get_current_caps (pad);
     790           70 :   if (!caps) {
     791              :     /** pad don't have current caps. use the template caps */
     792           70 :     caps = gst_pad_get_pad_template_caps (pad);
     793              :   }
     794              : 
     795           70 :   silent_debug_caps (self, caps, "caps");
     796           70 :   silent_debug_caps (self, filter, "filter");
     797              : 
     798           70 :   if (filter) {
     799              :     GstCaps *intersection;
     800              :     intersection =
     801            6 :         gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
     802              : 
     803            6 :     gst_caps_unref (caps);
     804            6 :     caps = intersection;
     805              :   }
     806              : 
     807           70 :   silent_debug_caps (self, caps, "result");
     808           70 :   return caps;
     809              : }
        

Generated by: LCOV version 2.0-1