Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file nnstreamer-edge-queue.c
6 : * @date 24 August 2022
7 : * @brief Thread-safe queue.
8 : * @see https://github.com/nnstreamer/nnstreamer-edge
9 : * @author Jaeyun Jung <jy1210.jung@samsung.com>
10 : * @bug No known bugs except for NYI items.
11 : */
12 :
13 : #include "nnstreamer-edge-log.h"
14 : #include "nnstreamer-edge-queue.h"
15 : #include "nnstreamer-edge-util.h"
16 :
17 : /**
18 : * @brief Internal structure for queue data.
19 : */
20 : typedef struct _nns_edge_queue_data_s nns_edge_queue_data_s;
21 :
22 : /**
23 : * @brief Internal structure for queue data.
24 : */
25 : struct _nns_edge_queue_data_s
26 : {
27 : nns_edge_raw_data_s data;
28 : nns_edge_queue_data_s *next;
29 : };
30 :
31 : /**
32 : * @brief Internal structure for queue.
33 : */
34 : typedef struct
35 : {
36 : uint32_t magic;
37 : pthread_mutex_t lock;
38 : pthread_cond_t cond;
39 :
40 : nns_edge_queue_leak_e leaky;
41 : unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */
42 : unsigned int length;
43 : nns_edge_queue_data_s *head;
44 : nns_edge_queue_data_s *tail;
45 : } nns_edge_queue_s;
46 :
47 : /**
48 : * @brief Pop data from queue. If the param 'clear' is true, release old data and return null.
49 : * @note This function should be called with lock.
50 : */
51 : static bool
52 70 : _pop_data (nns_edge_queue_s * q, bool clear, void **data, nns_size_t * size)
53 : {
54 : nns_edge_queue_data_s *qdata;
55 70 : bool popped = false;
56 :
57 70 : qdata = q->head;
58 70 : if (qdata) {
59 60 : q->head = qdata->next;
60 60 : if ((--q->length) == 0U)
61 46 : q->head = q->tail = NULL;
62 :
63 60 : if (clear) {
64 11 : if (qdata->data.destroy_cb)
65 8 : qdata->data.destroy_cb (qdata->data.data);
66 : } else {
67 49 : if (data)
68 49 : *data = qdata->data.data;
69 49 : if (size)
70 49 : *size = qdata->data.data_len;
71 49 : popped = true;
72 : }
73 :
74 60 : SAFE_FREE (qdata);
75 : }
76 :
77 70 : return popped;
78 : }
79 :
80 : /**
81 : * @brief Create queue.
82 : */
83 : int
84 72 : nns_edge_queue_create (nns_edge_queue_h * handle)
85 : {
86 : nns_edge_queue_s *q;
87 :
88 72 : if (!handle) {
89 1 : nns_edge_loge ("[Queue] Invalid param, handle is null.");
90 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
91 : }
92 :
93 71 : q = calloc (1, sizeof (nns_edge_queue_s));
94 71 : if (!q) {
95 0 : nns_edge_loge ("[Queue] Failed to allocate new memory.");
96 0 : return NNS_EDGE_ERROR_OUT_OF_MEMORY;
97 : }
98 :
99 71 : nns_edge_lock_init (q);
100 71 : nns_edge_cond_init (q);
101 71 : nns_edge_handle_set_magic (q, NNS_EDGE_MAGIC);
102 71 : q->leaky = NNS_EDGE_QUEUE_LEAK_NEW;
103 :
104 71 : *handle = q;
105 71 : return NNS_EDGE_ERROR_NONE;
106 : }
107 :
108 : /**
109 : * @brief Destroy queue.
110 : */
111 : int
112 73 : nns_edge_queue_destroy (nns_edge_queue_h handle)
113 : {
114 73 : nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
115 :
116 73 : if (!nns_edge_handle_is_valid (q)) {
117 2 : nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
118 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
119 : }
120 :
121 : /* Stop waiting and clear all data. */
122 71 : nns_edge_queue_clear (handle);
123 :
124 71 : nns_edge_handle_set_magic (q, NNS_EDGE_MAGIC_DEAD);
125 71 : nns_edge_cond_destroy (q);
126 71 : nns_edge_lock_destroy (q);
127 71 : SAFE_FREE (q);
128 :
129 71 : return NNS_EDGE_ERROR_NONE;
130 : }
131 :
132 : /**
133 : * @brief Get the number of data in the queue.
134 : */
135 : int
136 25 : nns_edge_queue_get_length (nns_edge_queue_h handle, unsigned int *length)
137 : {
138 25 : nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
139 :
140 25 : if (!nns_edge_handle_is_valid (q)) {
141 2 : nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
142 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
143 : }
144 :
145 23 : if (!length) {
146 1 : nns_edge_loge ("[Queue] Invalid param, length is null.");
147 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
148 : }
149 :
150 22 : nns_edge_lock (q);
151 22 : *length = q->length;
152 22 : nns_edge_unlock (q);
153 :
154 22 : return NNS_EDGE_ERROR_NONE;
155 : }
156 :
157 : /**
158 : * @brief Set the max length of the queue.
159 : */
160 : int
161 7 : nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit,
162 : nns_edge_queue_leak_e leaky)
163 : {
164 7 : nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
165 :
166 7 : if (!nns_edge_handle_is_valid (q)) {
167 2 : nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
168 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
169 : }
170 :
171 5 : nns_edge_lock (q);
172 5 : q->max_data = limit;
173 5 : q->leaky = leaky;
174 5 : nns_edge_unlock (q);
175 :
176 5 : return NNS_EDGE_ERROR_NONE;
177 : }
178 :
179 : /**
180 : * @brief Add new data into queue.
181 : */
182 : int
183 68 : nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_size_t size,
184 : nns_edge_data_destroy_cb destroy)
185 : {
186 68 : int ret = NNS_EDGE_ERROR_NONE;
187 68 : nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
188 : nns_edge_queue_data_s *qdata;
189 :
190 68 : if (!nns_edge_handle_is_valid (q)) {
191 2 : nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
192 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
193 : }
194 :
195 66 : if (!data) {
196 1 : nns_edge_loge ("[Queue] Invalid param, data is null.");
197 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
198 : }
199 :
200 65 : if (size == 0U) {
201 1 : nns_edge_loge ("[Queue] Invalid param, size should be larger than zero.");
202 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
203 : }
204 :
205 64 : nns_edge_lock (q);
206 64 : if (q->max_data > 0U && q->length >= q->max_data) {
207 : /* Clear old data in queue if leaky option is 'old'. */
208 6 : if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) {
209 2 : _pop_data (q, true, NULL, NULL);
210 : } else {
211 4 : nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
212 : q->max_data);
213 4 : ret = NNS_EDGE_ERROR_IO;
214 4 : goto done;
215 : }
216 : }
217 :
218 60 : qdata = calloc (1, sizeof (nns_edge_queue_data_s));
219 60 : if (!qdata) {
220 0 : nns_edge_loge ("[Queue] Failed to allocate new memory for data.");
221 0 : ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
222 0 : goto done;
223 : }
224 :
225 60 : qdata->data.data = data;
226 60 : qdata->data.data_len = size;
227 60 : qdata->data.destroy_cb = destroy;
228 :
229 60 : if (!q->head)
230 46 : q->head = qdata;
231 60 : if (q->tail)
232 14 : q->tail->next = qdata;
233 60 : q->tail = qdata;
234 60 : q->length++;
235 :
236 64 : done:
237 64 : nns_edge_cond_signal (q);
238 64 : nns_edge_unlock (q);
239 :
240 64 : return ret;
241 : }
242 :
243 : /**
244 : * @brief Remove and return the first data in queue.
245 : */
246 : int
247 13 : nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t * size)
248 : {
249 13 : bool popped = false;
250 13 : nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
251 :
252 13 : if (!nns_edge_handle_is_valid (q)) {
253 2 : nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
254 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
255 : }
256 :
257 11 : if (!data) {
258 1 : nns_edge_loge ("[Queue] Invalid param, data is null.");
259 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
260 : }
261 :
262 10 : if (!size) {
263 1 : nns_edge_loge ("[Queue] Invalid param, size is null.");
264 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
265 : }
266 :
267 : /* init data */
268 9 : *data = NULL;
269 9 : *size = 0U;
270 :
271 9 : nns_edge_lock (q);
272 9 : popped = _pop_data (q, false, data, size);
273 9 : nns_edge_unlock (q);
274 :
275 9 : return (popped && *data != NULL) ? NNS_EDGE_ERROR_NONE : NNS_EDGE_ERROR_IO;
276 : }
277 :
278 : /**
279 : * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue.
280 : */
281 : int
282 55 : nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
283 : void **data, nns_size_t * size)
284 : {
285 55 : nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
286 55 : bool popped = false;
287 :
288 55 : if (!nns_edge_handle_is_valid (q)) {
289 2 : nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
290 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
291 : }
292 :
293 53 : if (!data) {
294 1 : nns_edge_loge ("[Queue] Invalid param, data is null.");
295 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
296 : }
297 :
298 52 : if (!size) {
299 2 : nns_edge_loge ("[Queue] Invalid param, size is null.");
300 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
301 : }
302 :
303 : /* init data */
304 50 : *data = NULL;
305 50 : *size = 0U;
306 :
307 50 : nns_edge_lock (q);
308 50 : if (q->length == 0U)
309 50 : nns_edge_cond_wait_until (q, timeout);
310 :
311 50 : popped = _pop_data (q, false, data, size);
312 50 : nns_edge_unlock (q);
313 :
314 50 : return (popped && *data != NULL) ? NNS_EDGE_ERROR_NONE : NNS_EDGE_ERROR_IO;
315 : }
316 :
317 : /**
318 : * @brief Clear all data in the queue.
319 : * @note When this function is called, nns_edge_queue_wait_pop will stop the waiting.
320 : */
321 : int
322 107 : nns_edge_queue_clear (nns_edge_queue_h handle)
323 : {
324 107 : nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
325 :
326 107 : if (!nns_edge_handle_is_valid (q)) {
327 0 : nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
328 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
329 : }
330 :
331 107 : nns_edge_lock (q);
332 107 : nns_edge_cond_signal (q);
333 :
334 116 : while (q->length > 0U)
335 9 : _pop_data (q, true, NULL, NULL);
336 :
337 107 : nns_edge_unlock (q);
338 107 : return NNS_EDGE_ERROR_NONE;
339 : }
|