Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file ml-api-service-query.c
6 : * @date 30 Aug 2022
7 : * @brief Query client implementation of NNStreamer/Service C-API
8 : * @see https://github.com/nnstreamer/nnstreamer
9 : * @author Yongjoo Ahn <yongjoo1.ahn@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #include <glib.h>
14 : #include <gst/gst.h>
15 : #include <gst/gstbuffer.h>
16 : #include <gst/app/app.h>
17 : #include <string.h>
18 :
19 : #include "ml-api-internal.h"
20 : #include "ml-api-service-query.h"
21 :
22 : /**
23 : * @brief Structure for ml_service_query
24 : */
25 : typedef struct
26 : {
27 : ml_pipeline_h pipe_h;
28 : ml_pipeline_src_h src_h;
29 : ml_pipeline_sink_h sink_h;
30 :
31 : guint timeout; /**< in ms unit */
32 : GAsyncQueue *out_data_queue;
33 : } _ml_service_query_s;
34 :
35 : /**
36 : * @brief Sink callback for query_client
37 : */
38 : static void
39 10 : _sink_callback_for_query_client (const ml_tensors_data_h data,
40 : const ml_tensors_info_h info, void *user_data)
41 : {
42 10 : _ml_service_query_s *query = (_ml_service_query_s *) user_data;
43 : ml_tensors_data_h copied;
44 : int status;
45 :
46 10 : status = ml_tensors_data_clone (data, &copied);
47 10 : if (ML_ERROR_NONE != status) {
48 0 : _ml_error_report_continue
49 : ("Failed to create a new tensors data for query_client.");
50 0 : return;
51 : }
52 :
53 10 : g_async_queue_push (query->out_data_queue, copied);
54 : }
55 :
56 : /**
57 : * @brief Internal function to release ml-service query data.
58 : */
59 : int
60 4 : _ml_service_query_release_internal (ml_service_s * mls)
61 : {
62 4 : _ml_service_query_s *query = (_ml_service_query_s *) mls->priv;
63 : ml_tensors_data_h data_h;
64 :
65 : /* Supposed internal function call to release handle. */
66 4 : if (!query)
67 0 : return ML_ERROR_NONE;
68 :
69 4 : if (query->pipe_h) {
70 2 : if (ml_pipeline_destroy (query->pipe_h))
71 0 : _ml_error_report ("Failed to destroy pipeline");
72 : }
73 :
74 4 : if (query->out_data_queue) {
75 2 : while ((data_h = g_async_queue_try_pop (query->out_data_queue))) {
76 0 : ml_tensors_data_destroy (data_h);
77 : }
78 :
79 2 : g_async_queue_unref (query->out_data_queue);
80 : }
81 :
82 4 : g_free (query);
83 4 : mls->priv = NULL;
84 :
85 4 : return ML_ERROR_NONE;
86 : }
87 :
88 : /**
89 : * @brief Internal function to create query client service handle with given ml-option handle.
90 : */
91 : int
92 4 : _ml_service_query_create (ml_service_s * mls, ml_option_h option)
93 : {
94 4 : int status = ML_ERROR_NONE;
95 :
96 4 : g_autofree gchar *description = NULL;
97 : void *value;
98 :
99 : GString *tensor_query_client_prop;
100 4 : g_autofree gchar *prop = NULL;
101 :
102 : _ml_service_query_s *query_s;
103 : ml_pipeline_h pipe_h;
104 : ml_pipeline_src_h src_h;
105 : ml_pipeline_sink_h sink_h;
106 4 : g_autofree gchar *caps = NULL;
107 4 : guint timeout = 1000U; /* default 1s timeout */
108 :
109 4 : g_return_val_if_fail (mls && option, ML_ERROR_INVALID_PARAMETER);
110 :
111 4 : mls->priv = query_s = g_try_new0 (_ml_service_query_s, 1);
112 4 : if (query_s == NULL) {
113 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
114 : "Failed to allocate memory for the service handle's private data. Out of memory?");
115 : }
116 :
117 4 : tensor_query_client_prop = g_string_new (NULL);
118 :
119 4 : if (ML_ERROR_NONE == ml_option_get (option, "host", &value))
120 1 : g_string_append_printf (tensor_query_client_prop, " host=%s ",
121 : (gchar *) value);
122 :
123 4 : if (ML_ERROR_NONE == ml_option_get (option, "port", &value))
124 2 : g_string_append_printf (tensor_query_client_prop, " port=%u ",
125 2 : *((guint *) value));
126 :
127 4 : if (ML_ERROR_NONE == ml_option_get (option, "dest-host", &value))
128 1 : g_string_append_printf (tensor_query_client_prop, " dest-host=%s ",
129 : (gchar *) value);
130 :
131 4 : if (ML_ERROR_NONE == ml_option_get (option, "dest-port", &value))
132 2 : g_string_append_printf (tensor_query_client_prop, " dest-port=%u ",
133 2 : *((guint *) value));
134 :
135 4 : if (ML_ERROR_NONE == ml_option_get (option, "connect-type", &value))
136 1 : g_string_append_printf (tensor_query_client_prop, " connect-type=%s ",
137 : (gchar *) value);
138 :
139 4 : if (ML_ERROR_NONE == ml_option_get (option, "topic", &value))
140 1 : g_string_append_printf (tensor_query_client_prop, " topic=%s ",
141 : (gchar *) value);
142 :
143 4 : if (ML_ERROR_NONE == ml_option_get (option, "timeout", &value))
144 2 : g_string_append_printf (tensor_query_client_prop, " timeout=%u ",
145 2 : *((guint *) value));
146 :
147 4 : if (ML_ERROR_NONE != ml_option_get (option, "caps", &value)) {
148 1 : g_string_free (tensor_query_client_prop, TRUE);
149 1 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
150 : "The option 'caps' must be set before call ml_service_query_create.");
151 : }
152 3 : caps = g_strdup ((gchar *) value);
153 :
154 3 : prop = g_string_free (tensor_query_client_prop, FALSE);
155 3 : description =
156 3 : g_strdup_printf
157 : ("appsrc name=srcx ! %s ! tensor_query_client %s name=qcx ! tensor_sink name=sinkx async=false sync=false",
158 : caps, prop);
159 :
160 3 : status = ml_pipeline_construct (description, NULL, NULL, &pipe_h);
161 3 : if (ML_ERROR_NONE != status) {
162 1 : _ml_error_report_return (status, "Failed to construct pipeline");
163 : }
164 :
165 2 : status = ml_pipeline_start (pipe_h);
166 2 : if (ML_ERROR_NONE != status) {
167 0 : ml_pipeline_destroy (pipe_h);
168 0 : _ml_error_report_return (status, "Failed to start pipeline");
169 : }
170 :
171 2 : status = ml_pipeline_src_get_handle (pipe_h, "srcx", &src_h);
172 2 : if (ML_ERROR_NONE != status) {
173 0 : ml_pipeline_destroy (pipe_h);
174 0 : _ml_error_report_return (status, "Failed to get src handle");
175 : }
176 :
177 2 : status = ml_pipeline_sink_register (pipe_h, "sinkx",
178 : _sink_callback_for_query_client, query_s, &sink_h);
179 2 : if (ML_ERROR_NONE != status) {
180 0 : ml_pipeline_destroy (pipe_h);
181 0 : _ml_error_report_return (status, "Failed to register sink handle");
182 : }
183 :
184 2 : query_s->timeout = timeout;
185 2 : query_s->pipe_h = pipe_h;
186 2 : query_s->src_h = src_h;
187 2 : query_s->sink_h = sink_h;
188 2 : query_s->out_data_queue = g_async_queue_new ();
189 :
190 2 : return ML_ERROR_NONE;
191 : }
192 :
193 : /**
194 : * @brief Internal function to request an output to query client service with given input data.
195 : */
196 : int
197 10 : _ml_service_query_request (ml_service_s * mls,
198 : const ml_tensors_data_h input, ml_tensors_data_h * output)
199 : {
200 10 : int status = ML_ERROR_NONE;
201 : _ml_service_query_s *query;
202 :
203 10 : g_return_val_if_fail (mls && input && output, ML_ERROR_INVALID_PARAMETER);
204 :
205 10 : query = (_ml_service_query_s *) mls->priv;
206 :
207 10 : status = ml_pipeline_src_input_data (query->src_h, input,
208 : ML_PIPELINE_BUF_POLICY_DO_NOT_FREE);
209 10 : if (ML_ERROR_NONE != status) {
210 0 : _ml_error_report_return (status, "Failed to input data");
211 : }
212 :
213 20 : *output = g_async_queue_timeout_pop (query->out_data_queue,
214 10 : query->timeout * G_TIME_SPAN_MILLISECOND);
215 10 : if (NULL == *output) {
216 0 : _ml_error_report_return (ML_ERROR_TIMED_OUT, "timeout!");
217 : }
218 :
219 10 : return ML_ERROR_NONE;
220 : }
|