Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2021 Samsung Electronics Co., Ltd.
4 : *
5 : * @file tensor_query_client.c
6 : * @date 09 Jul 2021
7 : * @brief GStreamer plugin to handle tensor query client
8 : * @author Junhwan Kim <jejudo.kim@samsung.com>
9 : * @see http://github.com/nnstreamer/nnstreamer
10 : * @bug No known bugs
11 : *
12 : */
13 : #ifdef HAVE_CONFIG_H
14 : #include <config.h>
15 : #endif
16 :
17 : #include "nnstreamer_util.h"
18 : #include "tensor_query_client.h"
19 : #include <gio/gio.h>
20 : #include <glib.h>
21 : #include <string.h>
22 : #include "tensor_query_common.h"
23 :
24 : #include <stdio.h>
25 : #include <stdlib.h>
26 : #include <unistd.h>
27 :
28 : /**
29 : * @brief Macro for debug mode.
30 : */
31 : #ifndef DBG
32 : #define DBG (!self->silent)
33 : #endif
34 :
35 : /**
36 : * @brief Properties.
37 : */
38 : enum
39 : {
40 : PROP_0,
41 : PROP_HOST,
42 : PROP_PORT,
43 : PROP_DEST_HOST,
44 : PROP_DEST_PORT,
45 : PROP_CONNECT_TYPE,
46 : PROP_TOPIC,
47 : PROP_TIMEOUT,
48 : PROP_SILENT,
49 : PROP_MAX_REQUEST,
50 : };
51 :
52 : #define TCP_HIGHEST_PORT 65535
53 : #define TCP_DEFAULT_HOST "localhost"
54 : #define TCP_DEFAULT_SRV_SRC_PORT 3000
55 : #define TCP_DEFAULT_CLIENT_SRC_PORT 3001
56 : #define DEFAULT_CLIENT_TIMEOUT 0
57 : #define DEFAULT_SILENT TRUE
58 : #define DEFAULT_MAX_REQUEST 2
59 :
60 : GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_client_debug);
61 : #define GST_CAT_DEFAULT gst_tensor_query_client_debug
62 :
63 : /**
64 : * @brief the capabilities of the inputs.
65 : */
66 : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
67 : GST_PAD_SINK,
68 : GST_PAD_ALWAYS,
69 : GST_STATIC_CAPS_ANY);
70 :
71 : /**
72 : * @brief the capabilities of the outputs.
73 : */
74 : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
75 : GST_PAD_SRC,
76 : GST_PAD_ALWAYS,
77 : GST_STATIC_CAPS_ANY);
78 :
79 : #define gst_tensor_query_client_parent_class parent_class
80 1227 : G_DEFINE_TYPE (GstTensorQueryClient, gst_tensor_query_client, GST_TYPE_ELEMENT);
81 :
82 : static void gst_tensor_query_client_finalize (GObject * object);
83 : static void gst_tensor_query_client_set_property (GObject * object,
84 : guint prop_id, const GValue * value, GParamSpec * pspec);
85 : static void gst_tensor_query_client_get_property (GObject * object,
86 : guint prop_id, GValue * value, GParamSpec * pspec);
87 :
88 : static gboolean gst_tensor_query_client_sink_event (GstPad * pad,
89 : GstObject * parent, GstEvent * event);
90 : static gboolean gst_tensor_query_client_sink_query (GstPad * pad,
91 : GstObject * parent, GstQuery * query);
92 : static GstFlowReturn gst_tensor_query_client_chain (GstPad * pad,
93 : GstObject * parent, GstBuffer * buf);
94 : static GstCaps *gst_tensor_query_client_query_caps (GstTensorQueryClient * self,
95 : GstPad * pad, GstCaps * filter);
96 :
97 : /**
98 : * @brief initialize the class
99 : */
100 : static void
101 12 : gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
102 : {
103 : GObjectClass *gobject_class;
104 : GstElementClass *gstelement_class;
105 :
106 12 : gobject_class = (GObjectClass *) klass;
107 12 : gstelement_class = (GstElementClass *) klass;
108 :
109 12 : gobject_class->set_property = gst_tensor_query_client_set_property;
110 12 : gobject_class->get_property = gst_tensor_query_client_get_property;
111 12 : gobject_class->finalize = gst_tensor_query_client_finalize;
112 :
113 : /** install property goes here */
114 12 : g_object_class_install_property (gobject_class, PROP_HOST,
115 : g_param_spec_string ("host", "Host",
116 : "A host address to receive the packets from query server",
117 : TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
118 12 : g_object_class_install_property (gobject_class, PROP_PORT,
119 : g_param_spec_uint ("port", "Port",
120 : "A port number to receive the packets from query server", 0,
121 : TCP_HIGHEST_PORT, TCP_DEFAULT_SRV_SRC_PORT,
122 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
123 12 : g_object_class_install_property (gobject_class, PROP_DEST_HOST,
124 : g_param_spec_string ("dest-host", "Destination Host",
125 : "A tenor query server host to send the packets",
126 : TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127 12 : g_object_class_install_property (gobject_class, PROP_DEST_PORT,
128 : g_param_spec_uint ("dest-port", "Destination Port",
129 : "The port of tensor query server to send the packets", 0,
130 : TCP_HIGHEST_PORT, TCP_DEFAULT_CLIENT_SRC_PORT,
131 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132 12 : g_object_class_install_property (gobject_class, PROP_SILENT,
133 : g_param_spec_boolean ("silent", "Silent", "Produce verbose output",
134 : DEFAULT_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135 12 : g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
136 : g_param_spec_enum ("connect-type", "Connect Type",
137 : "The connections type between client and server.",
138 : GST_TYPE_QUERY_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
139 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140 12 : g_object_class_install_property (gobject_class, PROP_TOPIC,
141 : g_param_spec_string ("topic", "Topic",
142 : "The main topic of the host.",
143 : "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144 :
145 12 : g_object_class_install_property (gobject_class, PROP_TIMEOUT,
146 : g_param_spec_uint ("timeout", "timeout value",
147 : "A timeout value (in ms) to wait message from query server after sending buffer to server. 0 means no wait.",
148 : 0, G_MAXUINT, DEFAULT_CLIENT_TIMEOUT,
149 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150 12 : g_object_class_install_property (gobject_class, PROP_MAX_REQUEST,
151 : g_param_spec_uint ("max-request", "Maximum number of request",
152 : "Sets the maximum number of buffers to request to the query server. "
153 : "If the processing speed of query server is slower than the query client, the input buffer is dropped. "
154 : "Two buffers are requested by default, and 0 means that all buffers are sent to query server without drop. ",
155 : 0, G_MAXUINT, DEFAULT_MAX_REQUEST,
156 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
157 12 : gst_element_class_add_pad_template (gstelement_class,
158 : gst_static_pad_template_get (&sinktemplate));
159 12 : gst_element_class_add_pad_template (gstelement_class,
160 : gst_static_pad_template_get (&srctemplate));
161 :
162 12 : gst_element_class_set_static_metadata (gstelement_class,
163 : "TensorQueryClient", "Filter/Tensor/Query",
164 : "Handle querying tensor data through the network",
165 : "Samsung Electronics Co., Ltd.");
166 :
167 12 : GST_DEBUG_CATEGORY_INIT (gst_tensor_query_client_debug, "tensor_query_client",
168 : 0, "Tensor Query Client");
169 12 : }
170 :
171 : /**
172 : * @brief initialize the new element
173 : */
174 : static void
175 12 : gst_tensor_query_client_init (GstTensorQueryClient * self)
176 : {
177 : /** setup sink pad */
178 12 : self->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
179 12 : gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
180 12 : gst_pad_set_event_function (self->sinkpad,
181 : GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_event));
182 12 : gst_pad_set_query_function (self->sinkpad,
183 : GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_query));
184 12 : gst_pad_set_chain_function (self->sinkpad,
185 : GST_DEBUG_FUNCPTR (gst_tensor_query_client_chain));
186 :
187 : /** setup src pad */
188 12 : self->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
189 12 : gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
190 :
191 : /* init properties */
192 12 : self->silent = DEFAULT_SILENT;
193 12 : self->connect_type = DEFAULT_CONNECT_TYPE;
194 12 : self->host = g_strdup (TCP_DEFAULT_HOST);
195 12 : self->port = TCP_DEFAULT_CLIENT_SRC_PORT;
196 12 : self->dest_host = g_strdup (TCP_DEFAULT_HOST);
197 12 : self->dest_port = TCP_DEFAULT_SRV_SRC_PORT;
198 12 : self->topic = NULL;
199 12 : self->in_caps_str = NULL;
200 12 : self->timeout = DEFAULT_CLIENT_TIMEOUT;
201 12 : self->edge_h = NULL;
202 12 : self->msg_queue = g_async_queue_new ();
203 12 : self->max_request = DEFAULT_MAX_REQUEST;
204 12 : self->requested_num = 0;
205 12 : self->is_tensor = FALSE;
206 12 : gst_tensors_config_init (&self->config);
207 12 : }
208 :
209 : /**
210 : * @brief finalize the object
211 : */
212 : static void
213 11 : gst_tensor_query_client_finalize (GObject * object)
214 : {
215 11 : GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
216 : nns_edge_data_h data_h;
217 :
218 11 : g_free (self->host);
219 11 : self->host = NULL;
220 11 : g_free (self->dest_host);
221 11 : self->dest_host = NULL;
222 11 : g_free (self->topic);
223 11 : self->topic = NULL;
224 11 : g_free (self->in_caps_str);
225 11 : self->in_caps_str = NULL;
226 :
227 17 : while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
228 6 : nns_edge_data_destroy (data_h);
229 : }
230 :
231 11 : if (self->msg_queue) {
232 11 : g_async_queue_unref (self->msg_queue);
233 11 : self->msg_queue = NULL;
234 : }
235 :
236 11 : if (self->edge_h) {
237 11 : nns_edge_release_handle (self->edge_h);
238 11 : self->edge_h = NULL;
239 : }
240 :
241 11 : gst_tensors_config_free (&self->config);
242 :
243 11 : G_OBJECT_CLASS (parent_class)->finalize (object);
244 11 : }
245 :
246 : /**
247 : * @brief set property
248 : */
249 : static void
250 28 : gst_tensor_query_client_set_property (GObject * object, guint prop_id,
251 : const GValue * value, GParamSpec * pspec)
252 : {
253 28 : GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
254 :
255 : /** @todo DO NOT update properties (host, port, ..) while pipeline is running. */
256 28 : switch (prop_id) {
257 2 : case PROP_HOST:
258 2 : if (!g_value_get_string (value)) {
259 0 : nns_logw ("Sink host property cannot be NULL");
260 0 : break;
261 : }
262 2 : g_free (self->host);
263 2 : self->host = g_value_dup_string (value);
264 2 : break;
265 9 : case PROP_PORT:
266 9 : self->port = g_value_get_uint (value);
267 9 : break;
268 3 : case PROP_DEST_HOST:
269 3 : if (!g_value_get_string (value)) {
270 0 : nns_logw ("Sink host property cannot be NULL");
271 0 : break;
272 : }
273 3 : g_free (self->dest_host);
274 3 : self->dest_host = g_value_dup_string (value);
275 3 : break;
276 11 : case PROP_DEST_PORT:
277 11 : self->dest_port = g_value_get_uint (value);
278 11 : break;
279 1 : case PROP_CONNECT_TYPE:
280 1 : self->connect_type = g_value_get_enum (value);
281 1 : break;
282 1 : case PROP_TOPIC:
283 1 : if (!g_value_get_string (value)) {
284 0 : nns_logw ("Topic property cannot be NULL. Query-hybrid is disabled.");
285 0 : break;
286 : }
287 1 : g_free (self->topic);
288 1 : self->topic = g_value_dup_string (value);
289 1 : break;
290 0 : case PROP_TIMEOUT:
291 0 : self->timeout = g_value_get_uint (value);
292 0 : break;
293 0 : case PROP_SILENT:
294 0 : self->silent = g_value_get_boolean (value);
295 0 : break;
296 1 : case PROP_MAX_REQUEST:
297 1 : self->max_request = g_value_get_uint (value);
298 1 : break;
299 0 : default:
300 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
301 0 : break;
302 : }
303 28 : }
304 :
305 : /**
306 : * @brief get property
307 : */
308 : static void
309 0 : gst_tensor_query_client_get_property (GObject * object, guint prop_id,
310 : GValue * value, GParamSpec * pspec)
311 : {
312 0 : GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
313 :
314 0 : switch (prop_id) {
315 0 : case PROP_HOST:
316 0 : g_value_set_string (value, self->host);
317 0 : break;
318 0 : case PROP_PORT:
319 0 : g_value_set_uint (value, self->port);
320 0 : break;
321 0 : case PROP_DEST_HOST:
322 0 : g_value_set_string (value, self->dest_host);
323 0 : break;
324 0 : case PROP_DEST_PORT:
325 0 : g_value_set_uint (value, self->dest_port);
326 0 : break;
327 0 : case PROP_CONNECT_TYPE:
328 0 : g_value_set_enum (value, self->connect_type);
329 0 : break;
330 0 : case PROP_TOPIC:
331 0 : g_value_set_string (value, self->topic);
332 0 : break;
333 0 : case PROP_TIMEOUT:
334 0 : g_value_set_uint (value, self->timeout);
335 0 : break;
336 0 : case PROP_SILENT:
337 0 : g_value_set_boolean (value, self->silent);
338 0 : break;
339 0 : case PROP_MAX_REQUEST:
340 0 : g_value_set_uint (value, self->max_request);
341 0 : break;
342 0 : default:
343 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
344 0 : break;
345 : }
346 0 : }
347 :
348 : /**
349 : * @brief Update src pad caps from tensors config.
350 : */
351 : static gboolean
352 11 : gst_tensor_query_client_update_caps (GstTensorQueryClient * self,
353 : const gchar * caps_str)
354 : {
355 : GstCaps *curr_caps, *out_caps;
356 11 : gboolean ret = FALSE;
357 11 : out_caps = gst_caps_from_string (caps_str);
358 11 : silent_debug_caps (self, out_caps, "set out-caps");
359 :
360 : /* Update src pad caps if it is different. */
361 11 : curr_caps = gst_pad_get_current_caps (self->srcpad);
362 11 : if (curr_caps == NULL || !gst_caps_is_equal (curr_caps, out_caps)) {
363 22 : if (gst_caps_is_fixed (out_caps)) {
364 9 : ret = gst_pad_set_caps (self->srcpad, out_caps);
365 :
366 : /**
367 : * If pad caps is updated, prepare the tensor information here.
368 : * It will be used in chain function, to push tensor buffer into src pad.
369 : */
370 9 : if (ret) {
371 9 : GstStructure *s = gst_caps_get_structure (out_caps, 0);
372 :
373 9 : self->is_tensor = gst_structure_is_tensor_stream (s);
374 9 : if (self->is_tensor) {
375 8 : gst_tensors_config_free (&self->config);
376 8 : gst_tensors_config_from_structure (&self->config, s);
377 : }
378 : }
379 : } else {
380 2 : nns_loge ("out-caps from tensor_query_serversink is not fixed. "
381 : "Failed to update client src caps, out-caps: %s", caps_str);
382 : }
383 : } else {
384 : /** Don't need to update when the capability is the same. */
385 0 : ret = TRUE;
386 : }
387 :
388 11 : if (curr_caps)
389 0 : gst_caps_unref (curr_caps);
390 :
391 11 : gst_caps_unref (out_caps);
392 :
393 11 : return ret;
394 : }
395 :
396 : /**
397 : * @brief Parse caps from received event data.
398 : */
399 : static gchar *
400 22 : _nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
401 : {
402 : gchar **strv;
403 : gint num, i;
404 22 : gchar *find_key = NULL;
405 22 : gchar *ret_str = NULL;
406 :
407 22 : if (!caps_str)
408 0 : return NULL;
409 :
410 22 : strv = g_strsplit (caps_str, "@", -1);
411 22 : num = g_strv_length (strv);
412 :
413 22 : find_key =
414 : is_src ==
415 11 : TRUE ? g_strdup ("query_server_src_caps") :
416 11 : g_strdup ("query_server_sink_caps");
417 :
418 33 : for (i = 1; i < num; i += 2) {
419 33 : if (0 == g_strcmp0 (find_key, strv[i])) {
420 22 : ret_str = g_strdup (strv[i + 1]);
421 22 : break;
422 : }
423 : }
424 :
425 22 : g_free (find_key);
426 22 : g_strfreev (strv);
427 :
428 22 : return ret_str;
429 : }
430 :
431 : /**
432 : * @brief nnstreamer-edge event callback.
433 : */
434 : static int
435 108 : _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
436 : {
437 : nns_edge_event_e event_type;
438 108 : int ret = NNS_EDGE_ERROR_NONE;
439 108 : GstTensorQueryClient *self = (GstTensorQueryClient *) user_data;
440 :
441 108 : if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) {
442 0 : nns_loge ("Failed to get event type!");
443 108 : return NNS_EDGE_ERROR_NOT_SUPPORTED;
444 : }
445 :
446 108 : switch (event_type) {
447 11 : case NNS_EDGE_EVENT_CAPABILITY:
448 : {
449 : GstCaps *server_caps, *client_caps;
450 : GstStructure *server_st, *client_st;
451 11 : gboolean result = FALSE;
452 : gchar *ret_str, *caps_str;
453 :
454 11 : nns_edge_event_parse_capability (event_h, &caps_str);
455 11 : ret_str = _nns_edge_parse_caps (caps_str, TRUE);
456 11 : nns_logd ("Received server-src caps: %s", GST_STR_NULL (ret_str));
457 11 : client_caps = gst_caps_from_string ((gchar *) self->in_caps_str);
458 11 : server_caps = gst_caps_from_string (ret_str);
459 11 : g_free (ret_str);
460 :
461 : /** Server framerate may vary. Let's skip comparing the framerate. */
462 11 : gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
463 : NULL);
464 11 : gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
465 : NULL);
466 :
467 11 : server_st = gst_caps_get_structure (server_caps, 0);
468 11 : client_st = gst_caps_get_structure (client_caps, 0);
469 :
470 11 : if (gst_structure_is_tensor_stream (server_st)) {
471 : GstTensorsConfig server_config, client_config;
472 :
473 9 : gst_tensors_config_from_structure (&server_config, server_st);
474 9 : gst_tensors_config_from_structure (&client_config, client_st);
475 :
476 9 : result = gst_tensors_config_is_equal (&server_config, &client_config);
477 :
478 9 : gst_tensors_config_free (&server_config);
479 9 : gst_tensors_config_free (&client_config);
480 : }
481 :
482 11 : if (result || gst_caps_can_intersect (client_caps, server_caps)) {
483 : /** Update client src caps */
484 11 : ret_str = _nns_edge_parse_caps (caps_str, FALSE);
485 11 : nns_logd ("Received server-sink caps: %s", GST_STR_NULL (ret_str));
486 11 : if (!gst_tensor_query_client_update_caps (self, ret_str)) {
487 2 : nns_loge ("Failed to update client source caps.");
488 2 : ret = NNS_EDGE_ERROR_UNKNOWN;
489 : }
490 11 : g_free (ret_str);
491 : } else {
492 : /* respond deny with src caps string */
493 0 : nns_loge ("Query caps is not acceptable!");
494 0 : ret = NNS_EDGE_ERROR_UNKNOWN;
495 : }
496 :
497 11 : gst_caps_unref (server_caps);
498 11 : gst_caps_unref (client_caps);
499 11 : g_free (caps_str);
500 11 : break;
501 : }
502 88 : case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
503 : {
504 : nns_edge_data_h data;
505 :
506 88 : nns_edge_event_parse_new_data (event_h, &data);
507 88 : g_async_queue_push (self->msg_queue, data);
508 88 : break;
509 : }
510 9 : default:
511 9 : break;
512 : }
513 :
514 108 : return ret;
515 : }
516 :
517 : /**
518 : * @brief Internal function to create edge handle.
519 : */
520 : static gboolean
521 16 : gst_tensor_query_client_create_edge_handle (GstTensorQueryClient * self)
522 : {
523 16 : gboolean started = FALSE;
524 16 : gchar *prev_caps = NULL;
525 : int ret;
526 :
527 : /* Already created, compare caps string. */
528 16 : if (self->edge_h) {
529 0 : ret = nns_edge_get_info (self->edge_h, "CAPS", &prev_caps);
530 :
531 0 : if (ret != NNS_EDGE_ERROR_NONE || !prev_caps ||
532 0 : !g_str_equal (prev_caps, self->in_caps_str)) {
533 : /* Capability is changed, close old handle. */
534 0 : nns_edge_release_handle (self->edge_h);
535 0 : self->edge_h = NULL;
536 : } else {
537 15 : return TRUE;
538 : }
539 : }
540 :
541 16 : ret = nns_edge_create_handle ("TEMP_ID", self->connect_type,
542 : NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &self->edge_h);
543 16 : if (ret != NNS_EDGE_ERROR_NONE)
544 0 : return FALSE;
545 :
546 16 : nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
547 :
548 16 : if (self->topic)
549 1 : nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
550 16 : if (self->host)
551 16 : nns_edge_set_info (self->edge_h, "HOST", self->host);
552 16 : if (self->port > 0) {
553 9 : gchar *port = g_strdup_printf ("%u", self->port);
554 9 : nns_edge_set_info (self->edge_h, "PORT", port);
555 9 : g_free (port);
556 : }
557 16 : nns_edge_set_info (self->edge_h, "CAPS", self->in_caps_str);
558 :
559 16 : ret = nns_edge_start (self->edge_h);
560 16 : if (ret != NNS_EDGE_ERROR_NONE) {
561 0 : nns_loge
562 : ("Failed to start NNStreamer-edge. Please check server IP and port.");
563 0 : goto done;
564 : }
565 :
566 16 : ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
567 15 : if (ret != NNS_EDGE_ERROR_NONE) {
568 4 : nns_loge ("Failed to connect to edge server!");
569 4 : goto done;
570 : }
571 :
572 11 : started = TRUE;
573 :
574 15 : done:
575 15 : if (!started) {
576 4 : nns_edge_release_handle (self->edge_h);
577 4 : self->edge_h = NULL;
578 : }
579 :
580 15 : return started;
581 : }
582 :
583 : /**
584 : * @brief This function handles sink event.
585 : */
586 : static gboolean
587 50 : gst_tensor_query_client_sink_event (GstPad * pad,
588 : GstObject * parent, GstEvent * event)
589 : {
590 50 : GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
591 :
592 50 : GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
593 : GST_EVENT_TYPE_NAME (event), event);
594 :
595 50 : switch (GST_EVENT_TYPE (event)) {
596 16 : case GST_EVENT_CAPS:
597 : {
598 : GstCaps *caps;
599 : gboolean ret;
600 :
601 16 : gst_event_parse_caps (event, &caps);
602 16 : g_free (self->in_caps_str);
603 16 : self->in_caps_str = gst_caps_to_string (caps);
604 :
605 16 : ret = gst_tensor_query_client_create_edge_handle (self);
606 15 : if (!ret)
607 4 : nns_loge ("Failed to create edge handle, cannot start query client.");
608 :
609 15 : gst_event_unref (event);
610 15 : return ret;
611 : }
612 34 : default:
613 34 : break;
614 : }
615 :
616 34 : return gst_pad_event_default (pad, parent, event);
617 : }
618 :
619 : /**
620 : * @brief This function handles sink pad query.
621 : */
622 : static gboolean
623 114 : gst_tensor_query_client_sink_query (GstPad * pad,
624 : GstObject * parent, GstQuery * query)
625 : {
626 114 : GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
627 :
628 114 : GST_DEBUG_OBJECT (self, "Received %s query: %" GST_PTR_FORMAT,
629 : GST_QUERY_TYPE_NAME (query), query);
630 :
631 114 : switch (GST_QUERY_TYPE (query)) {
632 70 : case GST_QUERY_CAPS:
633 : {
634 : GstCaps *caps;
635 : GstCaps *filter;
636 :
637 70 : gst_query_parse_caps (query, &filter);
638 70 : caps = gst_tensor_query_client_query_caps (self, pad, filter);
639 :
640 70 : gst_query_set_caps_result (query, caps);
641 70 : gst_caps_unref (caps);
642 70 : return TRUE;
643 : }
644 44 : case GST_QUERY_ACCEPT_CAPS:
645 : {
646 : GstCaps *caps;
647 : GstCaps *template_caps;
648 44 : gboolean res = FALSE;
649 :
650 44 : gst_query_parse_accept_caps (query, &caps);
651 44 : silent_debug_caps (self, caps, "accept-caps");
652 :
653 44 : if (gst_caps_is_fixed (caps)) {
654 44 : template_caps = gst_pad_get_pad_template_caps (pad);
655 :
656 44 : res = gst_caps_can_intersect (template_caps, caps);
657 44 : gst_caps_unref (template_caps);
658 : }
659 :
660 44 : gst_query_set_accept_caps_result (query, res);
661 44 : return TRUE;
662 : }
663 0 : default:
664 0 : break;
665 : }
666 :
667 0 : return gst_pad_query_default (pad, parent, query);
668 : }
669 :
670 : /**
671 : * @brief Chain function, this function does the actual processing.
672 : */
673 : static GstFlowReturn
674 150 : gst_tensor_query_client_chain (GstPad * pad,
675 : GstObject * parent, GstBuffer * buf)
676 : {
677 150 : GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
678 150 : GstBuffer *out_buf = NULL;
679 150 : GstFlowReturn res = GST_FLOW_OK;
680 150 : nns_edge_data_h data_h = NULL;
681 150 : guint i, num_tensors = 0, num_data = 0;
682 150 : int ret = NNS_EDGE_ERROR_NONE;
683 : GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
684 : GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
685 : gchar *val;
686 : UNUSED (pad);
687 :
688 150 : if (self->max_request > 0 && self->requested_num > self->max_request) {
689 37 : nns_logi
690 : ("The processing speed of the query server is too slow. Drop the input buffer.");
691 37 : goto try_pop;
692 : }
693 :
694 113 : ret = nns_edge_data_create (&data_h);
695 113 : if (ret != NNS_EDGE_ERROR_NONE) {
696 0 : nns_loge ("Failed to create data handle in client chain.");
697 0 : goto try_pop;
698 : }
699 :
700 113 : num_tensors = gst_tensor_buffer_get_count (buf);
701 226 : for (i = 0; i < num_tensors; i++) {
702 113 : mem[i] = gst_tensor_buffer_get_nth_memory (buf, i);
703 113 : if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
704 0 : ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
705 0 : gst_memory_unref (mem[i]);
706 0 : num_tensors = i;
707 0 : goto try_pop;
708 : }
709 113 : nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
710 : }
711 :
712 113 : nns_edge_get_info (self->edge_h, "client_id", &val);
713 113 : nns_edge_data_set_info (data_h, "client_id", val);
714 113 : g_free (val);
715 :
716 113 : ret = nns_edge_send (self->edge_h, data_h);
717 113 : if (ret == NNS_EDGE_ERROR_NONE) {
718 95 : self->requested_num++;
719 : } else {
720 18 : nns_loge ("Failed to publish to server node.");
721 : }
722 :
723 150 : try_pop:
724 150 : if (data_h)
725 113 : nns_edge_data_destroy (data_h);
726 :
727 300 : data_h = g_async_queue_timeout_pop (self->msg_queue,
728 150 : self->timeout * G_TIME_SPAN_MILLISECOND);
729 150 : if (data_h) {
730 81 : if (self->requested_num > 0)
731 81 : self->requested_num--;
732 81 : ret = nns_edge_data_get_count (data_h, &num_data);
733 :
734 81 : if (ret == NNS_EDGE_ERROR_NONE && num_data > 0) {
735 : GstMemory *new_mem;
736 : GstTensorInfo *_info;
737 :
738 81 : out_buf = gst_buffer_new ();
739 :
740 162 : for (i = 0; i < num_data; i++) {
741 81 : void *data = NULL;
742 : nns_size_t data_len;
743 : gpointer new_data;
744 :
745 81 : nns_edge_data_get (data_h, i, &data, &data_len);
746 81 : new_data = _g_memdup (data, data_len);
747 :
748 81 : new_mem = gst_memory_new_wrapped (0, new_data, data_len, 0, data_len,
749 : new_data, g_free);
750 :
751 81 : if (self->is_tensor) {
752 72 : _info = gst_tensors_info_get_nth_info (&self->config.info, i);
753 72 : gst_tensor_buffer_append_memory (out_buf, new_mem, _info);
754 : } else {
755 9 : gst_buffer_append_memory (out_buf, new_mem);
756 : }
757 : }
758 :
759 : /* metadata from incoming buffer */
760 81 : gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
761 :
762 81 : res = gst_pad_push (self->srcpad, out_buf);
763 : } else {
764 0 : nns_loge ("Failed to get the number of memories of the edge data.");
765 0 : res = GST_FLOW_ERROR;
766 : }
767 :
768 81 : nns_edge_data_destroy (data_h);
769 : }
770 :
771 263 : for (i = 0; i < num_tensors; i++) {
772 113 : gst_memory_unmap (mem[i], &map[i]);
773 113 : gst_memory_unref (mem[i]);
774 : }
775 :
776 150 : gst_buffer_unref (buf);
777 150 : return res;
778 : }
779 :
780 : /**
781 : * @brief Get pad caps for caps negotiation.
782 : */
783 : static GstCaps *
784 70 : gst_tensor_query_client_query_caps (GstTensorQueryClient * self, GstPad * pad,
785 : GstCaps * filter)
786 : {
787 : GstCaps *caps;
788 :
789 70 : caps = gst_pad_get_current_caps (pad);
790 70 : if (!caps) {
791 : /** pad don't have current caps. use the template caps */
792 70 : caps = gst_pad_get_pad_template_caps (pad);
793 : }
794 :
795 70 : silent_debug_caps (self, caps, "caps");
796 70 : silent_debug_caps (self, filter, "filter");
797 :
798 70 : if (filter) {
799 : GstCaps *intersection;
800 : intersection =
801 6 : gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
802 :
803 6 : gst_caps_unref (caps);
804 6 : caps = intersection;
805 : }
806 :
807 70 : silent_debug_caps (self, caps, "result");
808 70 : return caps;
809 : }
|