Line data Source code
1 : /**
2 : * NNStreamer Tensor Repo Header's Contents
3 : * Copyright (C) 2018 Jijoong Moon <jijoong.moon@samsung.com>
4 : *
5 : * This library is free software; you can redistribute it and/or
6 : * modify it under the terms of the GNU Library General Public
7 : * License as published by the Free Software Foundation;
8 : * version 2.1 of the License.
9 : *
10 : * This library is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 : * Library General Public License for more details.
14 : *
15 : */
16 : /**
17 : * @file gsttensor_repo.c
18 : * @date 17 Nov 2018
19 : * @brief tensor repo file for NNStreamer, the GStreamer plugin for neural networks
20 : * @see https://github.com/nnstreamer/nnstreamer
21 : * @author Jijoong Moon <jijoong.moon@samsung.com>
22 : * @bug No known bugs except for NYI items
23 : *
24 : */
25 :
26 : #include "gsttensor_repo.h"
27 :
28 : #ifndef DBG
29 : #define DBG FALSE
30 : #endif
31 :
32 : /**
33 : * @brief tensor repo global variable with init.
34 : */
35 : static GstTensorRepo _repo = {.num_data = 0,.initialized = FALSE };
36 :
37 : /**
38 : * @brief Macro for Lock & Cond
39 : */
40 : #define GST_REPO_LOCK() (g_mutex_lock(&_repo.repo_lock))
41 : #define GST_REPO_UNLOCK() (g_mutex_unlock(&_repo.repo_lock))
42 : #define GST_REPO_WAIT() (g_cond_wait(&_repo.repo_cond, &_repo.repo_lock))
43 : #define GST_REPO_BROADCAST() (g_cond_broadcast (&_repo.repo_cond))
44 :
45 : /**
46 : * @brief Internal function to release repo data.
47 : */
48 : static void
49 13 : gst_tensor_repo_release_repodata (gpointer data)
50 : {
51 13 : GstTensorRepoData *_data = (GstTensorRepoData *) data;
52 13 : g_return_if_fail (_data != NULL);
53 :
54 13 : g_mutex_lock (&_data->lock);
55 13 : g_cond_signal (&_data->cond_pull);
56 13 : g_cond_signal (&_data->cond_push);
57 13 : if (_data->buffer)
58 3 : gst_buffer_unref (_data->buffer);
59 13 : if (_data->caps)
60 13 : gst_caps_unref (_data->caps);
61 13 : g_mutex_unlock (&_data->lock);
62 :
63 13 : g_mutex_clear (&_data->lock);
64 13 : g_cond_clear (&_data->cond_pull);
65 13 : g_cond_clear (&_data->cond_push);
66 :
67 13 : g_free (_data);
68 : }
69 :
70 : /**
71 : * @brief Getter to get nth GstTensorRepoData.
72 : */
73 : GstTensorRepoData *
74 335 : gst_tensor_repo_get_repodata (guint nth)
75 : {
76 : gpointer p;
77 :
78 335 : g_return_val_if_fail (_repo.initialized, NULL);
79 :
80 335 : GST_REPO_LOCK ();
81 335 : p = g_hash_table_lookup (_repo.hash, GINT_TO_POINTER (nth));
82 335 : GST_REPO_UNLOCK ();
83 :
84 335 : return (GstTensorRepoData *) p;
85 : }
86 :
87 : /**
88 : * @brief Set the changing status of repo.
89 : */
90 : gboolean
91 0 : gst_tensor_repo_set_changed (guint o_nth, guint nth, gboolean is_sink)
92 : {
93 : GstTensorRepoData *data;
94 :
95 0 : data = gst_tensor_repo_get_repodata (o_nth);
96 :
97 0 : if (data) {
98 0 : g_mutex_lock (&data->lock);
99 :
100 0 : if (is_sink) {
101 0 : data->sink_changed = TRUE;
102 0 : data->sink_id = nth;
103 : if (DBG)
104 : GST_DEBUG ("SET sink_changed! @id %d \n", o_nth);
105 :
106 : /* signal pull */
107 0 : g_cond_signal (&data->cond_pull);
108 : } else {
109 0 : data->src_changed = TRUE;
110 0 : data->src_id = nth;
111 : if (DBG)
112 : GST_DEBUG ("SET src_changed! @id %d\n", o_nth);
113 :
114 : /* signal push */
115 0 : g_cond_signal (&data->cond_push);
116 : }
117 :
118 0 : g_mutex_unlock (&data->lock);
119 0 : return TRUE;
120 : }
121 :
122 0 : return FALSE;
123 : }
124 :
125 : /**
126 : * @brief Add GstTensorRepoData into repo.
127 : */
128 : gboolean
129 28 : gst_tensor_repo_add_repodata (guint nth, gboolean is_sink)
130 : {
131 28 : gboolean ret = FALSE;
132 : GstTensorRepoData *data;
133 :
134 28 : data = gst_tensor_repo_get_repodata (nth);
135 :
136 28 : if (data != NULL) {
137 14 : g_mutex_lock (&data->lock);
138 :
139 14 : if (is_sink)
140 0 : data->sink_changed = FALSE;
141 : else
142 14 : data->src_changed = FALSE;
143 :
144 14 : data->pushed = FALSE;
145 :
146 14 : g_mutex_unlock (&data->lock);
147 :
148 : if (DBG)
149 : GST_DEBUG ("SET SINK & SRC Changed FALSE!! @%d\n", nth);
150 14 : return TRUE;
151 : }
152 :
153 14 : data = g_new0 (GstTensorRepoData, 1);
154 14 : if (data == NULL) {
155 0 : GST_ERROR ("Failed to allocate memory for repo data.");
156 0 : return FALSE;
157 : }
158 :
159 14 : g_cond_init (&data->cond_push);
160 14 : g_cond_init (&data->cond_pull);
161 14 : g_mutex_init (&data->lock);
162 :
163 14 : g_mutex_lock (&data->lock);
164 14 : data->eos = FALSE;
165 14 : data->buffer = NULL;
166 14 : data->caps = NULL;
167 14 : data->sink_changed = FALSE;
168 14 : data->src_changed = FALSE;
169 14 : data->pushed = FALSE;
170 14 : g_mutex_unlock (&data->lock);
171 :
172 14 : GST_REPO_LOCK ();
173 14 : ret = g_hash_table_insert (_repo.hash, GINT_TO_POINTER (nth), data);
174 :
175 14 : if (ret) {
176 14 : _repo.num_data++;
177 :
178 : if (DBG)
179 : GST_DEBUG ("Successfully added in hash table with key[%d]", nth);
180 : } else {
181 0 : gst_tensor_repo_release_repodata (data);
182 0 : ml_logf ("The key[%d] is duplicated. Cannot proceed.\n", nth);
183 : }
184 :
185 14 : GST_REPO_UNLOCK ();
186 14 : return ret;
187 : }
188 :
189 : /**
190 : * @brief Push GstBuffer into repo.
191 : */
192 : gboolean
193 119 : gst_tensor_repo_set_buffer (guint nth, GstBuffer * buffer, GstCaps * caps)
194 : {
195 : GstTensorRepoData *data;
196 :
197 119 : data = gst_tensor_repo_get_repodata (nth);
198 :
199 119 : g_return_val_if_fail (data != NULL, FALSE);
200 :
201 119 : g_mutex_lock (&data->lock);
202 :
203 192 : while (data->buffer != NULL && !data->eos) {
204 : /* wait pull */
205 73 : g_cond_wait (&data->cond_pull, &data->lock);
206 : }
207 :
208 119 : if (data->eos) {
209 3 : g_mutex_unlock (&data->lock);
210 3 : return FALSE;
211 : }
212 :
213 116 : data->buffer = gst_buffer_copy_deep (buffer);
214 116 : if (!data->caps || !gst_caps_is_equal (data->caps, caps)) {
215 14 : if (data->caps)
216 0 : gst_caps_unref (data->caps);
217 14 : data->caps = gst_caps_copy (caps);
218 : }
219 :
220 : if (DBG) {
221 : unsigned long size = gst_buffer_get_size (data->buffer);
222 : GST_DEBUG ("Pushed [%d] (size : %lu)\n", nth, size);
223 : }
224 :
225 : /* signal push */
226 116 : g_cond_signal (&data->cond_push);
227 :
228 116 : g_mutex_unlock (&data->lock);
229 116 : return TRUE;
230 : }
231 :
232 : /**
233 : * @brief Check EOS (End-of-Stream) of slot.
234 : */
235 : gboolean
236 20 : gst_tensor_repo_check_eos (guint nth)
237 : {
238 : GstTensorRepoData *data;
239 :
240 20 : data = gst_tensor_repo_get_repodata (nth);
241 :
242 20 : if (data) {
243 : if (DBG)
244 : GST_DEBUG ("check eos done [%s]\n", data->eos ? "TRUE" : "FALSE");
245 20 : return data->eos;
246 : }
247 :
248 0 : return FALSE;
249 : }
250 :
251 : /**
252 : * @brief Check repo data is changed.
253 : */
254 : gboolean
255 20 : gst_tensor_repo_check_changed (guint nth, guint * newid, gboolean is_sink)
256 : {
257 20 : gboolean ret = FALSE;
258 : GstTensorRepoData *data;
259 :
260 20 : data = gst_tensor_repo_get_repodata (nth);
261 :
262 20 : g_return_val_if_fail (data != NULL, FALSE);
263 :
264 : if (DBG)
265 : GST_DEBUG ("%dth RepoData : sink_changed %d, src_changed %d\n", nth,
266 : data->sink_changed, data->src_changed);
267 :
268 20 : if (is_sink) {
269 0 : if (data->sink_changed) {
270 0 : *newid = data->sink_id;
271 0 : ret = TRUE;
272 : }
273 : } else {
274 20 : if (data->src_changed) {
275 0 : *newid = data->src_id;
276 0 : ret = TRUE;
277 : }
278 : }
279 :
280 20 : return ret;
281 : }
282 :
283 : /**
284 : * @brief Set EOS (End-of-Stream) of slot.
285 : */
286 : gboolean
287 14 : gst_tensor_repo_set_eos (guint nth)
288 : {
289 : GstTensorRepoData *data;
290 :
291 14 : data = gst_tensor_repo_get_repodata (nth);
292 :
293 14 : g_return_val_if_fail (data != NULL, FALSE);
294 :
295 14 : g_mutex_lock (&data->lock);
296 :
297 14 : data->eos = TRUE;
298 14 : g_cond_signal (&data->cond_push);
299 14 : g_cond_signal (&data->cond_pull);
300 :
301 14 : g_mutex_unlock (&data->lock);
302 14 : return TRUE;
303 : }
304 :
305 : /**
306 : * @brief Get GstTensorRepoData from repo.
307 : */
308 : GstBuffer *
309 121 : gst_tensor_repo_get_buffer (guint nth, gboolean * eos, guint * newid,
310 : GstCaps ** caps)
311 : {
312 : GstTensorRepoData *data;
313 121 : GstBuffer *buf = NULL;
314 :
315 121 : data = gst_tensor_repo_get_repodata (nth);
316 :
317 121 : g_return_val_if_fail (data != NULL, NULL);
318 :
319 121 : g_mutex_lock (&data->lock);
320 :
321 133 : while (!data->buffer) {
322 20 : if (gst_tensor_repo_check_changed (nth, newid, FALSE)) {
323 0 : buf = NULL;
324 0 : goto done;
325 : }
326 :
327 20 : if (gst_tensor_repo_check_eos (nth)) {
328 8 : *eos = TRUE;
329 8 : buf = NULL;
330 8 : goto done;
331 : }
332 :
333 : /* wait push */
334 12 : g_cond_wait (&data->cond_push, &data->lock);
335 : }
336 :
337 : /* Current buffer will be wasted. */
338 113 : buf = data->buffer;
339 113 : *caps = gst_caps_ref (data->caps);
340 : if (DBG) {
341 : unsigned long size = gst_buffer_get_size (buf);
342 : GST_DEBUG ("Popped [ %d ] (size: %lu)\n", nth, size);
343 : }
344 :
345 113 : done:
346 121 : data->buffer = NULL;
347 : /* signal pull */
348 121 : g_cond_signal (&data->cond_pull);
349 121 : g_mutex_unlock (&data->lock);
350 121 : return buf;
351 : }
352 :
353 : /**
354 : * @brief Remove nth GstTensorRepoData from GstTensorRepo.
355 : */
356 : gboolean
357 13 : gst_tensor_repo_remove_repodata (guint nth)
358 : {
359 13 : gboolean ret = FALSE;
360 : GstTensorRepoData *data;
361 :
362 13 : g_return_val_if_fail (_repo.initialized, FALSE);
363 :
364 13 : data = gst_tensor_repo_get_repodata (nth);
365 :
366 13 : if (data) {
367 13 : GST_REPO_LOCK ();
368 13 : ret = g_hash_table_remove (_repo.hash, GINT_TO_POINTER (nth));
369 :
370 13 : if (ret) {
371 13 : _repo.num_data--;
372 : if (DBG)
373 : GST_DEBUG ("key[%d] is removed\n", nth);
374 : }
375 :
376 13 : GST_REPO_UNLOCK ();
377 : }
378 :
379 13 : return ret;
380 : }
381 :
382 : /**
383 : * @brief GstTensorRepo initialization.
384 : */
385 : void
386 14 : gst_tensor_repo_init (void)
387 : {
388 14 : if (_repo.initialized)
389 4 : return;
390 :
391 10 : g_mutex_init (&_repo.repo_lock);
392 10 : g_cond_init (&_repo.repo_cond);
393 10 : GST_REPO_LOCK ();
394 10 : _repo.num_data = 0;
395 10 : _repo.hash = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
396 : gst_tensor_repo_release_repodata);
397 10 : _repo.initialized = TRUE;
398 10 : GST_REPO_BROADCAST ();
399 10 : GST_REPO_UNLOCK ();
400 : }
401 :
402 : /**
403 : * @brief Wait for finish of initialization.
404 : */
405 : gboolean
406 135 : gst_tensor_repo_wait (void)
407 : {
408 135 : GST_REPO_LOCK ();
409 135 : while (!_repo.initialized)
410 0 : GST_REPO_WAIT ();
411 135 : GST_REPO_UNLOCK ();
412 135 : return TRUE;
413 : }
|