Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2021 Samsung Electronics Co., Ltd.
4 : *
5 : * @file tensor_query_server.c
6 : * @date 03 Aug 2021
7 : * @brief GStreamer plugin to handle meta_query for server elements
8 : * @author Junhwan Kim <jejudo.kim@samsung.com>
9 : * @see http://github.com/nnstreamer/nnstreamer
10 : * @bug No known bugs
11 : *
12 : */
13 : #ifdef HAVE_CONFIG_H
14 : #include "config.h"
15 : #endif
16 :
17 : #include "tensor_query_server.h"
18 : #include <tensor_typedef.h>
19 : #include <tensor_common.h>
20 :
21 : /**
22 : * @brief mutex for tensor-query server table.
23 : */
24 : G_LOCK_DEFINE_STATIC (query_server_table);
25 :
26 : /**
27 : * @brief Table for query server data.
28 : */
29 : static GHashTable *_qs_table = NULL;
30 :
31 : static void init_queryserver (void) __attribute__((constructor));
32 : static void fini_queryserver (void) __attribute__((destructor));
33 :
34 : /**
35 : * @brief Internal function to release query server data.
36 : */
37 : static void
38 4 : _release_server_data (gpointer data)
39 : {
40 4 : GstTensorQueryServer *_data = (GstTensorQueryServer *) data;
41 :
42 4 : if (!_data)
43 0 : return;
44 :
45 4 : g_mutex_lock (&_data->lock);
46 4 : if (_data->edge_h) {
47 0 : nns_edge_release_handle (_data->edge_h);
48 0 : _data->edge_h = NULL;
49 : }
50 4 : g_mutex_unlock (&_data->lock);
51 :
52 4 : g_mutex_clear (&_data->lock);
53 4 : g_cond_clear (&_data->cond);
54 :
55 4 : g_free (_data);
56 : }
57 :
58 : /**
59 : * @brief Get nnstreamer edge server handle.
60 : */
61 : static GstTensorQueryServer *
62 44 : gst_tensor_query_server_get_handle (const guint id)
63 : {
64 : GstTensorQueryServer *data;
65 :
66 44 : G_LOCK (query_server_table);
67 44 : data = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
68 44 : G_UNLOCK (query_server_table);
69 :
70 44 : return data;
71 : }
72 :
73 : /**
74 : * @brief Add nnstreamer edge server handle into hash table.
75 : */
76 : gboolean
77 10 : gst_tensor_query_server_add_data (const guint id)
78 : {
79 : GstTensorQueryServer *data;
80 : gboolean ret;
81 :
82 10 : data = gst_tensor_query_server_get_handle (id);
83 :
84 10 : if (NULL != data) {
85 6 : return TRUE;
86 : }
87 :
88 4 : data = g_try_new0 (GstTensorQueryServer, 1);
89 4 : if (NULL == data) {
90 0 : nns_loge ("Failed to allocate memory for tensor query server data.");
91 0 : return FALSE;
92 : }
93 :
94 4 : g_mutex_init (&data->lock);
95 4 : g_cond_init (&data->cond);
96 4 : data->id = id;
97 4 : data->configured = FALSE;
98 :
99 4 : G_LOCK (query_server_table);
100 4 : ret = g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
101 4 : if (!ret) {
102 0 : _release_server_data (data);
103 0 : nns_loge ("Failed to add tensor query server data into the table.");
104 : }
105 4 : G_UNLOCK (query_server_table);
106 :
107 4 : return ret;
108 : }
109 :
110 : /**
111 : * @brief Prepare edge connection and its handle.
112 : */
113 : gboolean
114 8 : gst_tensor_query_server_prepare (const guint id,
115 : nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo * edge_info)
116 : {
117 : GstTensorQueryServer *data;
118 : gchar *port_str, *id_str;
119 8 : gboolean prepared = FALSE;
120 : gint ret;
121 :
122 8 : data = gst_tensor_query_server_get_handle (id);
123 8 : if (NULL == data) {
124 0 : return FALSE;
125 : }
126 :
127 8 : g_mutex_lock (&data->lock);
128 8 : if (data->edge_h == NULL) {
129 4 : id_str = g_strdup_printf ("%u", id);
130 :
131 4 : ret = nns_edge_create_handle (id_str, connect_type,
132 : NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
133 4 : g_free (id_str);
134 :
135 4 : if (NNS_EDGE_ERROR_NONE != ret) {
136 0 : GST_ERROR ("Failed to get nnstreamer edge handle.");
137 0 : goto done;
138 : }
139 : }
140 :
141 8 : if (edge_info) {
142 4 : if (edge_info->host) {
143 4 : nns_edge_set_info (data->edge_h, "HOST", edge_info->host);
144 : }
145 4 : if (edge_info->port > 0) {
146 4 : port_str = g_strdup_printf ("%u", edge_info->port);
147 4 : nns_edge_set_info (data->edge_h, "PORT", port_str);
148 4 : g_free (port_str);
149 : }
150 4 : if (edge_info->dest_host) {
151 4 : nns_edge_set_info (data->edge_h, "DEST_HOST", edge_info->dest_host);
152 : }
153 4 : if (edge_info->dest_port > 0) {
154 4 : port_str = g_strdup_printf ("%u", edge_info->dest_port);
155 4 : nns_edge_set_info (data->edge_h, "DEST_PORT", port_str);
156 4 : g_free (port_str);
157 : }
158 4 : if (edge_info->topic) {
159 0 : nns_edge_set_info (data->edge_h, "TOPIC", edge_info->topic);
160 : }
161 :
162 4 : nns_edge_set_event_callback (data->edge_h, edge_info->cb, edge_info->pdata);
163 :
164 4 : ret = nns_edge_start (data->edge_h);
165 4 : if (NNS_EDGE_ERROR_NONE != ret) {
166 1 : nns_loge
167 : ("Failed to start NNStreamer-edge. Please check server IP and port.");
168 1 : goto done;
169 : }
170 : }
171 :
172 7 : prepared = TRUE;
173 :
174 8 : done:
175 8 : g_mutex_unlock (&data->lock);
176 8 : return prepared;
177 : }
178 :
179 : /**
180 : * @brief Send buffer to connected edge device.
181 : */
182 : gboolean
183 0 : gst_tensor_query_server_send_buffer (const guint id, GstBuffer * buffer)
184 : {
185 : GstTensorQueryServer *data;
186 : GstMetaQuery *meta_query;
187 : nns_edge_data_h data_h;
188 0 : guint i, num_tensors = 0;
189 0 : gint ret = NNS_EDGE_ERROR_NONE;
190 : GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
191 : GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
192 : gchar *val;
193 0 : gboolean sent = FALSE;
194 :
195 0 : data = gst_tensor_query_server_get_handle (id);
196 :
197 0 : if (NULL == data) {
198 0 : nns_loge ("Failed to send buffer, server handle is null.");
199 0 : return FALSE;
200 : }
201 :
202 0 : meta_query = gst_buffer_get_meta_query (buffer);
203 0 : if (!meta_query) {
204 0 : nns_loge ("Failed to send buffer, cannot get tensor query meta.");
205 0 : return FALSE;
206 : }
207 :
208 0 : ret = nns_edge_data_create (&data_h);
209 0 : if (ret != NNS_EDGE_ERROR_NONE) {
210 0 : nns_loge ("Failed to create edge data handle in query server.");
211 0 : return FALSE;
212 : }
213 :
214 0 : num_tensors = gst_tensor_buffer_get_count (buffer);
215 0 : for (i = 0; i < num_tensors; i++) {
216 0 : mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
217 :
218 0 : if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
219 0 : ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
220 0 : gst_memory_unref (mem[i]);
221 0 : num_tensors = i;
222 0 : goto done;
223 : }
224 :
225 0 : nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
226 : }
227 :
228 0 : val = g_strdup_printf ("%lld", (long long) meta_query->client_id);
229 0 : nns_edge_data_set_info (data_h, "client_id", val);
230 0 : g_free (val);
231 :
232 0 : g_mutex_lock (&data->lock);
233 0 : ret = nns_edge_send (data->edge_h, data_h);
234 0 : g_mutex_unlock (&data->lock);
235 :
236 0 : if (ret != NNS_EDGE_ERROR_NONE) {
237 0 : nns_loge ("Failed to send edge data handle in query server.");
238 0 : goto done;
239 : }
240 :
241 0 : sent = TRUE;
242 :
243 0 : done:
244 0 : for (i = 0; i < num_tensors; i++) {
245 0 : gst_memory_unmap (mem[i], &map[i]);
246 0 : gst_memory_unref (mem[i]);
247 : }
248 :
249 0 : nns_edge_data_destroy (data_h);
250 :
251 0 : return sent;
252 : }
253 :
254 : /**
255 : * @brief Release nnstreamer edge handle of query server.
256 : */
257 : void
258 7 : gst_tensor_query_server_release_edge_handle (const guint id)
259 : {
260 : GstTensorQueryServer *data;
261 :
262 7 : data = gst_tensor_query_server_get_handle (id);
263 :
264 7 : if (NULL == data) {
265 0 : return;
266 : }
267 :
268 7 : g_mutex_lock (&data->lock);
269 7 : if (data->edge_h) {
270 4 : nns_edge_release_handle (data->edge_h);
271 4 : data->edge_h = NULL;
272 : }
273 7 : g_mutex_unlock (&data->lock);
274 : }
275 :
276 : /**
277 : * @brief Remove GstTensorQueryServer.
278 : */
279 : void
280 6 : gst_tensor_query_server_remove_data (const guint id)
281 : {
282 6 : G_LOCK (query_server_table);
283 6 : if (g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id)))
284 4 : g_hash_table_remove (_qs_table, GUINT_TO_POINTER (id));
285 6 : G_UNLOCK (query_server_table);
286 6 : }
287 :
288 : /**
289 : * @brief Wait until the sink is configured and get server info handle.
290 : */
291 : gboolean
292 5 : gst_tensor_query_server_wait_sink (const guint id)
293 : {
294 : gint64 end_time;
295 : GstTensorQueryServer *data;
296 :
297 5 : data = gst_tensor_query_server_get_handle (id);
298 :
299 5 : if (NULL == data) {
300 0 : return FALSE;
301 : }
302 :
303 5 : end_time = g_get_monotonic_time () +
304 : DEFAULT_QUERY_INFO_TIMEOUT * G_TIME_SPAN_SECOND;
305 5 : g_mutex_lock (&data->lock);
306 5 : while (!data->configured) {
307 0 : if (!g_cond_wait_until (&data->cond, &data->lock, end_time)) {
308 0 : g_mutex_unlock (&data->lock);
309 0 : ml_loge ("Failed to get server sink info.");
310 0 : return FALSE;
311 : }
312 : }
313 5 : g_mutex_unlock (&data->lock);
314 :
315 5 : return TRUE;
316 : }
317 :
318 : /**
319 : * @brief set query server sink configured.
320 : */
321 : void
322 5 : gst_tensor_query_server_set_configured (const guint id)
323 : {
324 : GstTensorQueryServer *data;
325 :
326 5 : data = gst_tensor_query_server_get_handle (id);
327 :
328 5 : if (NULL == data) {
329 0 : return;
330 : }
331 :
332 5 : g_mutex_lock (&data->lock);
333 5 : data->configured = TRUE;
334 5 : g_cond_broadcast (&data->cond);
335 5 : g_mutex_unlock (&data->lock);
336 : }
337 :
338 : /**
339 : * @brief set query server caps.
340 : */
341 : void
342 9 : gst_tensor_query_server_set_caps (const guint id, const gchar * caps_str)
343 : {
344 : GstTensorQueryServer *data;
345 : gchar *prev_caps_str, *new_caps_str;
346 :
347 9 : data = gst_tensor_query_server_get_handle (id);
348 :
349 9 : if (NULL == data) {
350 0 : return;
351 : }
352 :
353 9 : g_mutex_lock (&data->lock);
354 :
355 9 : prev_caps_str = new_caps_str = NULL;
356 9 : nns_edge_get_info (data->edge_h, "CAPS", &prev_caps_str);
357 9 : if (!prev_caps_str)
358 2 : prev_caps_str = g_strdup ("");
359 9 : new_caps_str = g_strdup_printf ("%s%s", prev_caps_str, caps_str);
360 9 : nns_edge_set_info (data->edge_h, "CAPS", new_caps_str);
361 :
362 9 : g_free (prev_caps_str);
363 9 : g_free (new_caps_str);
364 :
365 9 : g_mutex_unlock (&data->lock);
366 : }
367 :
368 : /**
369 : * @brief Initialize the query server.
370 : */
371 : static void
372 468 : init_queryserver (void)
373 : {
374 468 : G_LOCK (query_server_table);
375 468 : g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */
376 468 : _qs_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
377 : _release_server_data);
378 468 : G_UNLOCK (query_server_table);
379 468 : }
380 :
381 : /**
382 : * @brief Destruct the query server.
383 : */
384 : static void
385 468 : fini_queryserver (void)
386 : {
387 468 : G_LOCK (query_server_table);
388 468 : g_assert (_qs_table); /** Internal error (init not called?) */
389 468 : g_hash_table_destroy (_qs_table);
390 468 : _qs_table = NULL;
391 468 : G_UNLOCK (query_server_table);
392 468 : }
|