LCOV - code coverage report
Current view: top level - nnstreamer-2.4.2/gst/nnstreamer/tensor_query - tensor_query_server.c (source / functions) Coverage Total Hit
Test: nnstreamer 2.4.2-0 nnstreamer/nnstreamer#eca68b8d050408568af95d831a8eef62aaee7784 Lines: 66.1 % 177 117
Test Date: 2025-03-13 05:38:21 Functions: 91.7 % 12 11

            Line data    Source code
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * Copyright (C) 2021 Samsung Electronics Co., Ltd.
       4              :  *
       5              :  * @file    tensor_query_server.c
       6              :  * @date    03 Aug 2021
       7              :  * @brief   GStreamer plugin to handle meta_query for server elements
       8              :  * @author  Junhwan Kim <jejudo.kim@samsung.com>
       9              :  * @see     http://github.com/nnstreamer/nnstreamer
      10              :  * @bug     No known bugs
      11              :  *
      12              :  */
      13              : #ifdef HAVE_CONFIG_H
      14              : #include "config.h"
      15              : #endif
      16              : 
      17              : #include "tensor_query_server.h"
      18              : #include <tensor_typedef.h>
      19              : #include <tensor_common.h>
      20              : 
      21              : /**
      22              :  * @brief mutex for tensor-query server table.
      23              :  */
      24              : G_LOCK_DEFINE_STATIC (query_server_table);
      25              : 
      26              : /**
      27              :  * @brief Table for query server data.
      28              :  */
      29              : static GHashTable *_qs_table = NULL;
      30              : 
      31              : static void init_queryserver (void) __attribute__((constructor));
      32              : static void fini_queryserver (void) __attribute__((destructor));
      33              : 
      34              : /**
      35              :  * @brief Internal function to release query server data.
      36              :  */
      37              : static void
      38            4 : _release_server_data (gpointer data)
      39              : {
      40            4 :   GstTensorQueryServer *_data = (GstTensorQueryServer *) data;
      41              : 
      42            4 :   if (!_data)
      43            0 :     return;
      44              : 
      45            4 :   g_mutex_lock (&_data->lock);
      46            4 :   if (_data->edge_h) {
      47            0 :     nns_edge_release_handle (_data->edge_h);
      48            0 :     _data->edge_h = NULL;
      49              :   }
      50            4 :   g_mutex_unlock (&_data->lock);
      51              : 
      52            4 :   g_mutex_clear (&_data->lock);
      53            4 :   g_cond_clear (&_data->cond);
      54              : 
      55            4 :   g_free (_data);
      56              : }
      57              : 
      58              : /**
      59              :  * @brief Get nnstreamer edge server handle.
      60              :  */
      61              : static GstTensorQueryServer *
      62           44 : gst_tensor_query_server_get_handle (const guint id)
      63              : {
      64              :   GstTensorQueryServer *data;
      65              : 
      66           44 :   G_LOCK (query_server_table);
      67           44 :   data = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
      68           44 :   G_UNLOCK (query_server_table);
      69              : 
      70           44 :   return data;
      71              : }
      72              : 
      73              : /**
      74              :  * @brief Add nnstreamer edge server handle into hash table.
      75              :  */
      76              : gboolean
      77           10 : gst_tensor_query_server_add_data (const guint id)
      78              : {
      79              :   GstTensorQueryServer *data;
      80              :   gboolean ret;
      81              : 
      82           10 :   data = gst_tensor_query_server_get_handle (id);
      83              : 
      84           10 :   if (NULL != data) {
      85            6 :     return TRUE;
      86              :   }
      87              : 
      88            4 :   data = g_try_new0 (GstTensorQueryServer, 1);
      89            4 :   if (NULL == data) {
      90            0 :     nns_loge ("Failed to allocate memory for tensor query server data.");
      91            0 :     return FALSE;
      92              :   }
      93              : 
      94            4 :   g_mutex_init (&data->lock);
      95            4 :   g_cond_init (&data->cond);
      96            4 :   data->id = id;
      97            4 :   data->configured = FALSE;
      98              : 
      99            4 :   G_LOCK (query_server_table);
     100            4 :   ret = g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
     101            4 :   if (!ret) {
     102            0 :     _release_server_data (data);
     103            0 :     nns_loge ("Failed to add tensor query server data into the table.");
     104              :   }
     105            4 :   G_UNLOCK (query_server_table);
     106              : 
     107            4 :   return ret;
     108              : }
     109              : 
     110              : /**
     111              :  * @brief Prepare edge connection and its handle.
     112              :  */
     113              : gboolean
     114            8 : gst_tensor_query_server_prepare (const guint id,
     115              :     nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo * edge_info)
     116              : {
     117              :   GstTensorQueryServer *data;
     118              :   gchar *port_str, *id_str;
     119            8 :   gboolean prepared = FALSE;
     120              :   gint ret;
     121              : 
     122            8 :   data = gst_tensor_query_server_get_handle (id);
     123            8 :   if (NULL == data) {
     124            0 :     return FALSE;
     125              :   }
     126              : 
     127            8 :   g_mutex_lock (&data->lock);
     128            8 :   if (data->edge_h == NULL) {
     129            4 :     id_str = g_strdup_printf ("%u", id);
     130              : 
     131            4 :     ret = nns_edge_create_handle (id_str, connect_type,
     132              :         NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
     133            4 :     g_free (id_str);
     134              : 
     135            4 :     if (NNS_EDGE_ERROR_NONE != ret) {
     136            0 :       GST_ERROR ("Failed to get nnstreamer edge handle.");
     137            0 :       goto done;
     138              :     }
     139              :   }
     140              : 
     141            8 :   if (edge_info) {
     142            4 :     if (edge_info->host) {
     143            4 :       nns_edge_set_info (data->edge_h, "HOST", edge_info->host);
     144              :     }
     145            4 :     if (edge_info->port > 0) {
     146            4 :       port_str = g_strdup_printf ("%u", edge_info->port);
     147            4 :       nns_edge_set_info (data->edge_h, "PORT", port_str);
     148            4 :       g_free (port_str);
     149              :     }
     150            4 :     if (edge_info->dest_host) {
     151            4 :       nns_edge_set_info (data->edge_h, "DEST_HOST", edge_info->dest_host);
     152              :     }
     153            4 :     if (edge_info->dest_port > 0) {
     154            4 :       port_str = g_strdup_printf ("%u", edge_info->dest_port);
     155            4 :       nns_edge_set_info (data->edge_h, "DEST_PORT", port_str);
     156            4 :       g_free (port_str);
     157              :     }
     158            4 :     if (edge_info->topic) {
     159            0 :       nns_edge_set_info (data->edge_h, "TOPIC", edge_info->topic);
     160              :     }
     161              : 
     162            4 :     nns_edge_set_event_callback (data->edge_h, edge_info->cb, edge_info->pdata);
     163              : 
     164            4 :     ret = nns_edge_start (data->edge_h);
     165            4 :     if (NNS_EDGE_ERROR_NONE != ret) {
     166            1 :       nns_loge
     167              :           ("Failed to start NNStreamer-edge. Please check server IP and port.");
     168            1 :       goto done;
     169              :     }
     170              :   }
     171              : 
     172            7 :   prepared = TRUE;
     173              : 
     174            8 : done:
     175            8 :   g_mutex_unlock (&data->lock);
     176            8 :   return prepared;
     177              : }
     178              : 
     179              : /**
     180              :  * @brief Send buffer to connected edge device.
     181              :  */
     182              : gboolean
     183            0 : gst_tensor_query_server_send_buffer (const guint id, GstBuffer * buffer)
     184              : {
     185              :   GstTensorQueryServer *data;
     186              :   GstMetaQuery *meta_query;
     187              :   nns_edge_data_h data_h;
     188            0 :   guint i, num_tensors = 0;
     189            0 :   gint ret = NNS_EDGE_ERROR_NONE;
     190              :   GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
     191              :   GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
     192              :   gchar *val;
     193            0 :   gboolean sent = FALSE;
     194              : 
     195            0 :   data = gst_tensor_query_server_get_handle (id);
     196              : 
     197            0 :   if (NULL == data) {
     198            0 :     nns_loge ("Failed to send buffer, server handle is null.");
     199            0 :     return FALSE;
     200              :   }
     201              : 
     202            0 :   meta_query = gst_buffer_get_meta_query (buffer);
     203            0 :   if (!meta_query) {
     204            0 :     nns_loge ("Failed to send buffer, cannot get tensor query meta.");
     205            0 :     return FALSE;
     206              :   }
     207              : 
     208            0 :   ret = nns_edge_data_create (&data_h);
     209            0 :   if (ret != NNS_EDGE_ERROR_NONE) {
     210            0 :     nns_loge ("Failed to create edge data handle in query server.");
     211            0 :     return FALSE;
     212              :   }
     213              : 
     214            0 :   num_tensors = gst_tensor_buffer_get_count (buffer);
     215            0 :   for (i = 0; i < num_tensors; i++) {
     216            0 :     mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
     217              : 
     218            0 :     if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
     219            0 :       ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
     220            0 :       gst_memory_unref (mem[i]);
     221            0 :       num_tensors = i;
     222            0 :       goto done;
     223              :     }
     224              : 
     225            0 :     nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
     226              :   }
     227              : 
     228            0 :   val = g_strdup_printf ("%lld", (long long) meta_query->client_id);
     229            0 :   nns_edge_data_set_info (data_h, "client_id", val);
     230            0 :   g_free (val);
     231              : 
     232            0 :   g_mutex_lock (&data->lock);
     233            0 :   ret = nns_edge_send (data->edge_h, data_h);
     234            0 :   g_mutex_unlock (&data->lock);
     235              : 
     236            0 :   if (ret != NNS_EDGE_ERROR_NONE) {
     237            0 :     nns_loge ("Failed to send edge data handle in query server.");
     238            0 :     goto done;
     239              :   }
     240              : 
     241            0 :   sent = TRUE;
     242              : 
     243            0 : done:
     244            0 :   for (i = 0; i < num_tensors; i++) {
     245            0 :     gst_memory_unmap (mem[i], &map[i]);
     246            0 :     gst_memory_unref (mem[i]);
     247              :   }
     248              : 
     249            0 :   nns_edge_data_destroy (data_h);
     250              : 
     251            0 :   return sent;
     252              : }
     253              : 
     254              : /**
     255              :  * @brief Release nnstreamer edge handle of query server.
     256              :  */
     257              : void
     258            7 : gst_tensor_query_server_release_edge_handle (const guint id)
     259              : {
     260              :   GstTensorQueryServer *data;
     261              : 
     262            7 :   data = gst_tensor_query_server_get_handle (id);
     263              : 
     264            7 :   if (NULL == data) {
     265            0 :     return;
     266              :   }
     267              : 
     268            7 :   g_mutex_lock (&data->lock);
     269            7 :   if (data->edge_h) {
     270            4 :     nns_edge_release_handle (data->edge_h);
     271            4 :     data->edge_h = NULL;
     272              :   }
     273            7 :   g_mutex_unlock (&data->lock);
     274              : }
     275              : 
     276              : /**
     277              :  * @brief Remove GstTensorQueryServer.
     278              :  */
     279              : void
     280            6 : gst_tensor_query_server_remove_data (const guint id)
     281              : {
     282            6 :   G_LOCK (query_server_table);
     283            6 :   if (g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id)))
     284            4 :     g_hash_table_remove (_qs_table, GUINT_TO_POINTER (id));
     285            6 :   G_UNLOCK (query_server_table);
     286            6 : }
     287              : 
     288              : /**
     289              :  * @brief Wait until the sink is configured and get server info handle.
     290              :  */
     291              : gboolean
     292            5 : gst_tensor_query_server_wait_sink (const guint id)
     293              : {
     294              :   gint64 end_time;
     295              :   GstTensorQueryServer *data;
     296              : 
     297            5 :   data = gst_tensor_query_server_get_handle (id);
     298              : 
     299            5 :   if (NULL == data) {
     300            0 :     return FALSE;
     301              :   }
     302              : 
     303            5 :   end_time = g_get_monotonic_time () +
     304              :       DEFAULT_QUERY_INFO_TIMEOUT * G_TIME_SPAN_SECOND;
     305            5 :   g_mutex_lock (&data->lock);
     306            5 :   while (!data->configured) {
     307            0 :     if (!g_cond_wait_until (&data->cond, &data->lock, end_time)) {
     308            0 :       g_mutex_unlock (&data->lock);
     309            0 :       ml_loge ("Failed to get server sink info.");
     310            0 :       return FALSE;
     311              :     }
     312              :   }
     313            5 :   g_mutex_unlock (&data->lock);
     314              : 
     315            5 :   return TRUE;
     316              : }
     317              : 
     318              : /**
     319              :  * @brief set query server sink configured.
     320              :  */
     321              : void
     322            5 : gst_tensor_query_server_set_configured (const guint id)
     323              : {
     324              :   GstTensorQueryServer *data;
     325              : 
     326            5 :   data = gst_tensor_query_server_get_handle (id);
     327              : 
     328            5 :   if (NULL == data) {
     329            0 :     return;
     330              :   }
     331              : 
     332            5 :   g_mutex_lock (&data->lock);
     333            5 :   data->configured = TRUE;
     334            5 :   g_cond_broadcast (&data->cond);
     335            5 :   g_mutex_unlock (&data->lock);
     336              : }
     337              : 
     338              : /**
     339              :  * @brief set query server caps.
     340              :  */
     341              : void
     342            9 : gst_tensor_query_server_set_caps (const guint id, const gchar * caps_str)
     343              : {
     344              :   GstTensorQueryServer *data;
     345              :   gchar *prev_caps_str, *new_caps_str;
     346              : 
     347            9 :   data = gst_tensor_query_server_get_handle (id);
     348              : 
     349            9 :   if (NULL == data) {
     350            0 :     return;
     351              :   }
     352              : 
     353            9 :   g_mutex_lock (&data->lock);
     354              : 
     355            9 :   prev_caps_str = new_caps_str = NULL;
     356            9 :   nns_edge_get_info (data->edge_h, "CAPS", &prev_caps_str);
     357            9 :   if (!prev_caps_str)
     358            2 :     prev_caps_str = g_strdup ("");
     359            9 :   new_caps_str = g_strdup_printf ("%s%s", prev_caps_str, caps_str);
     360            9 :   nns_edge_set_info (data->edge_h, "CAPS", new_caps_str);
     361              : 
     362            9 :   g_free (prev_caps_str);
     363            9 :   g_free (new_caps_str);
     364              : 
     365            9 :   g_mutex_unlock (&data->lock);
     366              : }
     367              : 
     368              : /**
     369              :  * @brief Initialize the query server.
     370              :  */
     371              : static void
     372          468 : init_queryserver (void)
     373              : {
     374          468 :   G_LOCK (query_server_table);
     375          468 :   g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */
     376          468 :   _qs_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
     377              :       _release_server_data);
     378          468 :   G_UNLOCK (query_server_table);
     379          468 : }
     380              : 
     381              : /**
     382              :  * @brief Destruct the query server.
     383              :  */
     384              : static void
     385          468 : fini_queryserver (void)
     386              : {
     387          468 :   G_LOCK (query_server_table);
     388          468 :   g_assert (_qs_table); /** Internal error (init not called?) */
     389          468 :   g_hash_table_destroy (_qs_table);
     390          468 :   _qs_table = NULL;
     391          468 :   G_UNLOCK (query_server_table);
     392          468 : }
        

Generated by: LCOV version 2.0-1