Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2023 Samsung Electronics Co., Ltd.
4 : *
5 : * @file gstdatareposink.c
6 : * @date 30 March 2023
7 : * @brief GStreamer plugin that writes data from buffers to files in in MLOps Data repository
8 : * @see https://github.com/nnstreamer/nnstreamer
9 : * @author Hyunil Park <hyunil46.park@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : *
12 : * ## Example launch line
13 : * |[
14 : * gst-launch-1.0 videotestsrc ! datareposink location=filename json=video.json
15 : * gst-launch-1.0 videotestsrc ! pngenc ! datareposink location=image_%02d.png json=video.json
16 : * gst-launch-1.0 audiotestsrc samplesperbuffer=44100 ! audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! \
17 : * datareposink location=filename json=audio.json
18 : * gst-launch-1.0 datareposrc location=file.dat json=file.json tensors-sequence=2,3 start-sample-index=0 stop-sample-index=199 epochs=1 ! \
19 : * other/tensors, format=static, num_tensors=2, framerate=0/1, dimensions=1:1:784:1.1:1:10:1, types=float32.float32 ! \
20 : * datareposink location=hyunil.dat json=file.json
21 : * ]|
22 : */
23 :
24 : #ifdef HAVE_CONFIG_H
25 : #include "config.h"
26 : #endif
27 : #include <gst/gst.h>
28 : #include <gst/video/video-info.h>
29 : #include <gst/audio/audio-info.h>
30 : #include <glib/gstdio.h>
31 : #include <sys/types.h>
32 : #include <fcntl.h>
33 : #include <unistd.h>
34 : #include <nnstreamer_plugin_api.h>
35 : #include <tensor_common.h>
36 : #include <nnstreamer_util.h>
37 : #include "gstdatareposink.h"
38 :
39 : /**
40 : * @brief Tensors caps
41 : */
42 : #define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible, sparse }")
43 : /**
44 : * @brief Video caps
45 : */
46 : #define SUPPORTED_VIDEO_FORMAT \
47 : "{RGB, BGR, RGBx, BGRx, xRGB, xBGR, RGBA, BGRA, ARGB, ABGR, GRAY8}"
48 : #define VIDEO_CAPS GST_VIDEO_CAPS_MAKE (SUPPORTED_VIDEO_FORMAT) "," \
49 : "interlace-mode = (string) progressive"
50 : /**
51 : * @brief Audio caps
52 : */
53 : #define SUPPORTED_AUDIO_FORMAT \
54 : "{S8, U8, S16LE, S16BE, U16LE, U16BE, S32LE, S32BE, U32LE, U32BE, F32LE, F32BE, F64LE, F64BE}"
55 : #define AUDIO_CAPS GST_AUDIO_CAPS_MAKE (SUPPORTED_AUDIO_FORMAT) "," \
56 : "layout = (string) interleaved"
57 : /**
58 : * @brief Text caps
59 : */
60 : #define TEXT_CAPS "text/x-raw, format = (string) utf8"
61 : /**
62 : * @brief Octet caps
63 : */
64 : #define OCTET_CAPS "application/octet-stream"
65 : /**
66 : * @brief Image caps
67 : */
68 : #define IMAGE_CAPS \
69 : "image/png, width = (int) [ 16, 1000000 ], height = (int) [ 16, 1000000 ], framerate = (fraction) [ 0/1, MAX];" \
70 : "image/jpeg, width = (int) [ 16, 65535 ], height = (int) [ 16, 65535 ], framerate = (fraction) [ 0/1, MAX], sof-marker = (int) { 0, 1, 2, 4, 9 };" \
71 : "image/tiff, endianness = (int) { BIG_ENDIAN, LITTLE_ENDIAN };" \
72 : "image/gif;" \
73 : "image/bmp"
74 :
75 : static GstStaticPadTemplate sinktemplate =
76 : GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS,
77 : GST_STATIC_CAPS (TENSOR_CAPS ";" VIDEO_CAPS ";" AUDIO_CAPS ";" IMAGE_CAPS
78 : ";" TEXT_CAPS ";" OCTET_CAPS));
79 :
80 : /**
81 : * @brief datareposink properties.
82 : */
83 : enum
84 : {
85 : PROP_0,
86 : PROP_LOCATION,
87 : PROP_JSON
88 : };
89 :
90 : GST_DEBUG_CATEGORY_STATIC (gst_data_repo_sink_debug);
91 : #define GST_CAT_DEFAULT gst_data_repo_sink_debug
92 : #define _do_init \
93 : GST_DEBUG_CATEGORY_INIT (gst_data_repo_sink_debug, "datareposink", 0, "datareposink element");
94 : #define gst_data_repo_sink_parent_class parent_class
95 798 : G_DEFINE_TYPE_WITH_CODE (GstDataRepoSink, gst_data_repo_sink,
96 : GST_TYPE_BASE_SINK, _do_init);
97 :
98 : static void gst_data_repo_sink_set_property (GObject * object, guint prop_id,
99 : const GValue * value, GParamSpec * pspec);
100 : static void gst_data_repo_sink_get_property (GObject * object, guint prop_id,
101 : GValue * value, GParamSpec * pspec);
102 : static void gst_data_repo_sink_finalize (GObject * object);
103 : static gboolean gst_data_repo_sink_stop (GstBaseSink * basesink);
104 : static GstStateChangeReturn gst_data_repo_sink_change_state (GstElement *
105 : element, GstStateChange transition);
106 : static GstFlowReturn gst_data_repo_sink_render (GstBaseSink * bsink,
107 : GstBuffer * buffer);
108 : static GstCaps *gst_data_repo_sink_get_caps (GstBaseSink * bsink,
109 : GstCaps * filter);
110 : static gboolean gst_data_repo_sink_set_caps (GstBaseSink * bsink,
111 : GstCaps * caps);
112 : static gboolean gst_data_repo_sink_query (GstBaseSink * sink, GstQuery * query);
113 :
114 : /**
115 : * @brief Initialize datareposink class.
116 : */
117 : static void
118 2 : gst_data_repo_sink_class_init (GstDataRepoSinkClass * klass)
119 : {
120 : GObjectClass *gobject_class;
121 : GstElementClass *gstelement_class;
122 : GstBaseSinkClass *gstbasesink_class;
123 :
124 2 : gobject_class = G_OBJECT_CLASS (klass);
125 2 : gstelement_class = GST_ELEMENT_CLASS (klass);
126 2 : gstbasesink_class = GST_BASE_SINK_CLASS (klass);
127 :
128 2 : gobject_class->set_property = gst_data_repo_sink_set_property;
129 2 : gobject_class->get_property = gst_data_repo_sink_get_property;
130 2 : gobject_class->finalize = gst_data_repo_sink_finalize;
131 :
132 2 : g_object_class_install_property (gobject_class, PROP_LOCATION,
133 : g_param_spec_string ("location", "File Location",
134 : "Location to write files to MLOps Data Repository. "
135 : "if the files are images, use placeholder in indexes for filename"
136 : "(e.g., filenmae%04d.png).",
137 : NULL,
138 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
139 : GST_PARAM_MUTABLE_READY));
140 :
141 2 : g_object_class_install_property (gobject_class, PROP_JSON,
142 : g_param_spec_string ("json", "JSON file path",
143 : "JSON file path to write the meta information of a sample", NULL,
144 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
145 : GST_PARAM_MUTABLE_READY));
146 :
147 2 : gst_element_class_set_static_metadata (gstelement_class,
148 : "NNStreamer MLOps Data Repository Sink",
149 : "Sink/File",
150 : "Write files to MLOps Data Repository", "Samsung Electronics Co., Ltd.");
151 :
152 2 : gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
153 :
154 2 : gstelement_class->change_state =
155 2 : GST_DEBUG_FUNCPTR (gst_data_repo_sink_change_state);
156 2 : gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_data_repo_sink_render);
157 2 : gstbasesink_class->get_caps = GST_DEBUG_FUNCPTR (gst_data_repo_sink_get_caps);
158 2 : gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_data_repo_sink_set_caps);
159 2 : gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_data_repo_sink_query);
160 2 : gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_data_repo_sink_stop);
161 :
162 : /*A value of 8 typically indicates a 64-bit system. */
163 : if (sizeof (off_t) < 8) {
164 : GST_WARNING
165 : ("64-bit file support unavailable due to system limitations, sizeof (off_t) = %"
166 : G_GSIZE_FORMAT "!", sizeof (off_t));
167 : }
168 2 : }
169 :
170 : /**
171 : * @brief Initialize datareposink.
172 : */
173 : static void
174 26 : gst_data_repo_sink_init (GstDataRepoSink * sink)
175 : {
176 26 : sink->filename = NULL;
177 26 : sink->fd = 0;
178 26 : sink->fd_offset = 0;
179 26 : sink->data_type = GST_DATA_REPO_DATA_UNKNOWN;
180 26 : sink->is_static_tensors = FALSE;
181 26 : sink->fixed_caps = NULL;
182 26 : sink->json_object = NULL;
183 26 : sink->total_samples = 0;
184 26 : sink->cumulative_tensors = 0;
185 26 : sink->json_object = json_object_new ();
186 26 : sink->sample_offset_array = json_array_new ();
187 26 : sink->tensor_size_array = json_array_new ();
188 26 : sink->tensor_count_array = json_array_new ();
189 26 : }
190 :
191 : /**
192 : * @brief finalize datareposink.
193 : */
194 : static void
195 24 : gst_data_repo_sink_finalize (GObject * object)
196 : {
197 24 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (object);
198 :
199 24 : g_free (sink->filename);
200 24 : g_free (sink->json_filename);
201 :
202 24 : if (sink->fd) {
203 0 : g_close (sink->fd, NULL);
204 0 : sink->fd = 0;
205 : }
206 :
207 24 : if (sink->fixed_caps)
208 22 : gst_caps_unref (sink->fixed_caps);
209 :
210 24 : if (sink->sample_offset_array)
211 14 : json_array_unref (sink->sample_offset_array);
212 24 : if (sink->tensor_size_array)
213 14 : json_array_unref (sink->tensor_size_array);
214 24 : if (sink->tensor_count_array)
215 14 : json_array_unref (sink->tensor_count_array);
216 24 : if (sink->json_object) {
217 2 : json_object_unref (sink->json_object);
218 2 : sink->json_object = NULL;
219 : }
220 24 : G_OBJECT_CLASS (parent_class)->finalize (object);
221 24 : }
222 :
223 : /**
224 : * @brief Setter for datareposink properties.
225 : */
226 : static void
227 52 : gst_data_repo_sink_set_property (GObject * object, guint prop_id,
228 : const GValue * value, GParamSpec * pspec)
229 : {
230 52 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (object);
231 :
232 52 : switch (prop_id) {
233 26 : case PROP_LOCATION:
234 26 : sink->filename = g_value_dup_string (value);
235 26 : GST_INFO_OBJECT (sink, "filename: %s", sink->filename);
236 26 : break;
237 26 : case PROP_JSON:
238 26 : sink->json_filename = g_value_dup_string (value);
239 26 : GST_INFO_OBJECT (sink, "JSON filename: %s", sink->json_filename);
240 26 : break;
241 0 : default:
242 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
243 0 : break;
244 : }
245 52 : }
246 :
247 : /**
248 : * @brief Getter datareposink properties.
249 : */
250 : static void
251 2 : gst_data_repo_sink_get_property (GObject * object, guint prop_id,
252 : GValue * value, GParamSpec * pspec)
253 : {
254 : GstDataRepoSink *sink;
255 :
256 2 : sink = GST_DATA_REPO_SINK (object);
257 :
258 2 : switch (prop_id) {
259 1 : case PROP_LOCATION:
260 1 : g_value_set_string (value, sink->filename);
261 1 : break;
262 1 : case PROP_JSON:
263 1 : g_value_set_string (value, sink->json_filename);
264 1 : break;
265 0 : default:
266 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
267 0 : break;
268 : }
269 2 : }
270 :
271 : /**
272 : * @brief Function to write others media type (tensors(fixed), video, audio, octet and text)
273 : */
274 : static GstFlowReturn
275 34 : gst_data_repo_sink_write_others (GstDataRepoSink * sink, GstBuffer * buffer)
276 : {
277 34 : ssize_t write_size = 0;
278 : GstMapInfo info;
279 34 : GstFlowReturn ret = GST_FLOW_OK;
280 :
281 68 : g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
282 34 : g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
283 34 : g_return_val_if_fail (sink->fd != 0, GST_FLOW_ERROR);
284 :
285 34 : if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
286 0 : GST_ERROR_OBJECT (sink, "Failed to map the incoming buffer.");
287 0 : return GST_FLOW_ERROR;
288 : }
289 :
290 34 : GST_OBJECT_LOCK (sink);
291 34 : sink->sample_size = info.size;
292 :
293 34 : GST_LOG_OBJECT (sink,
294 : "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
295 : (long long) info.size, sink->fd_offset, (long long) sink->fd_offset);
296 :
297 34 : write_size = write (sink->fd, info.data, info.size);
298 :
299 34 : if ((write_size == -1) || (write_size != (ssize_t) info.size)) {
300 0 : GST_ERROR_OBJECT (sink, "Error writing data to file");
301 0 : ret = GST_FLOW_ERROR;
302 : } else {
303 34 : sink->fd_offset += write_size;
304 34 : sink->total_samples++;
305 : }
306 :
307 34 : GST_OBJECT_UNLOCK (sink);
308 34 : gst_buffer_unmap (buffer, &info);
309 :
310 34 : return ret;
311 : }
312 :
313 : /**
314 : * @brief Function to write flexible tensors or sparse tensors
315 : */
316 : static GstFlowReturn
317 161 : gst_data_repo_sink_write_flexible_or_sparse_tensors (GstDataRepoSink * sink,
318 : GstBuffer * buffer)
319 : {
320 : guint num_tensors, i;
321 161 : gsize total_write = 0, tensor_size;
322 161 : ssize_t write_size = 0;
323 : GstMapInfo info;
324 161 : GstMemory *mem = NULL;
325 : GstTensorMetaInfo meta;
326 :
327 322 : g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
328 161 : g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
329 161 : g_return_val_if_fail (sink->fd != 0, GST_FLOW_ERROR);
330 161 : g_return_val_if_fail (sink->json_object != NULL, GST_FLOW_ERROR);
331 161 : g_return_val_if_fail (sink->sample_offset_array != NULL, GST_FLOW_ERROR);
332 161 : g_return_val_if_fail (sink->tensor_size_array != NULL, GST_FLOW_ERROR);
333 161 : g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR);
334 :
335 161 : GST_OBJECT_LOCK (sink);
336 :
337 161 : num_tensors = gst_tensor_buffer_get_count (buffer);
338 161 : GST_INFO_OBJECT (sink, "num_tensors: %u", num_tensors);
339 :
340 350 : for (i = 0; i < num_tensors; i++) {
341 191 : mem = gst_tensor_buffer_get_nth_memory (buffer, i);
342 191 : if (!gst_memory_map (mem, &info, GST_MAP_READ)) {
343 0 : GST_ERROR_OBJECT (sink, "Failed to map memory");
344 0 : goto mem_map_error;
345 : }
346 :
347 191 : if (!gst_tensor_meta_info_parse_header (&meta, info.data)) {
348 2 : GST_ERROR_OBJECT (sink,
349 : "Invalid format of tensors, the format is static.");
350 2 : goto error;
351 : }
352 189 : tensor_size = info.size;
353 :
354 189 : GST_LOG_OBJECT (sink, "tensor[%u] size: %zd", i, tensor_size);
355 189 : GST_LOG_OBJECT (sink,
356 : "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
357 : (long long) tensor_size, sink->fd_offset + total_write,
358 : (long long) sink->fd_offset + total_write);
359 :
360 189 : write_size = write (sink->fd, info.data, tensor_size);
361 189 : if ((write_size == -1) || (write_size != (ssize_t) tensor_size)) {
362 0 : GST_ERROR_OBJECT (sink, "Error writing data to file");
363 0 : goto error;
364 : }
365 :
366 189 : json_array_add_int_element (sink->tensor_size_array, tensor_size);
367 189 : total_write += (gsize) write_size;
368 :
369 189 : gst_memory_unmap (mem, &info);
370 189 : gst_memory_unref (mem);
371 : }
372 :
373 159 : json_array_add_int_element (sink->sample_offset_array, sink->fd_offset);
374 159 : sink->fd_offset += total_write;
375 :
376 159 : GST_LOG_OBJECT (sink, "cumulative_tensors: %u", sink->cumulative_tensors);
377 159 : json_array_add_int_element (sink->tensor_count_array,
378 159 : sink->cumulative_tensors);
379 159 : sink->cumulative_tensors += num_tensors;
380 :
381 159 : sink->total_samples++;
382 :
383 159 : GST_OBJECT_UNLOCK (sink);
384 :
385 159 : return GST_FLOW_OK;
386 :
387 2 : error:
388 2 : gst_memory_unmap (mem, &info);
389 2 : mem_map_error:
390 2 : gst_memory_unref (mem);
391 2 : GST_OBJECT_UNLOCK (sink);
392 :
393 2 : return GST_FLOW_ERROR;
394 : }
395 :
396 : /**
397 : * @brief Get image filename
398 : */
399 : static gchar *
400 20 : gst_data_repo_sink_get_image_filename (GstDataRepoSink * sink)
401 : {
402 20 : gchar *filename = NULL;
403 :
404 20 : g_return_val_if_fail (sink != NULL, NULL);
405 20 : g_return_val_if_fail (sink->data_type == GST_DATA_REPO_DATA_IMAGE, NULL);
406 20 : g_return_val_if_fail (sink->filename != NULL, NULL);
407 :
408 : #ifdef __GNUC__
409 : #pragma GCC diagnostic push
410 : #pragma GCC diagnostic ignored "-Wformat-nonliteral"
411 : #endif
412 20 : filename = g_strdup_printf (sink->filename, sink->total_samples);
413 : #ifdef __GNUC__
414 : #pragma GCC diagnostic pop
415 : #endif
416 :
417 20 : return filename;
418 : }
419 :
420 : /**
421 : * @brief Function to read multi image files
422 : */
423 : static GstFlowReturn
424 20 : gst_data_repo_sink_write_multi_images (GstDataRepoSink * sink,
425 : GstBuffer * buffer)
426 : {
427 20 : g_autofree gchar *filename = NULL;
428 20 : GstFlowReturn ret = GST_FLOW_OK;
429 20 : GError *error = NULL;
430 : GstMapInfo info;
431 :
432 20 : g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
433 20 : g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
434 :
435 20 : if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
436 0 : GST_ERROR_OBJECT (sink, "Failed to map the incoming buffer.");
437 0 : return GST_FLOW_ERROR;
438 : }
439 :
440 20 : filename = gst_data_repo_sink_get_image_filename (sink);
441 :
442 20 : GST_OBJECT_LOCK (sink);
443 20 : sink->sample_size = info.size;
444 :
445 20 : GST_DEBUG_OBJECT (sink, "Writing to file \"%s\", size(%zd)", filename,
446 : info.size);
447 :
448 20 : if (!g_file_set_contents (filename, (char *) info.data, info.size, &error)) {
449 0 : GST_ERROR_OBJECT (sink, "Could not write data to file: %s",
450 : error ? error->message : "unknown error");
451 0 : g_clear_error (&error);
452 0 : ret = GST_FLOW_ERROR;
453 : } else {
454 20 : sink->total_samples++;
455 : }
456 :
457 20 : GST_OBJECT_UNLOCK (sink);
458 20 : gst_buffer_unmap (buffer, &info);
459 :
460 20 : return ret;
461 : }
462 :
463 : /**
464 : * @brief Called when a buffer should be presented or output.
465 : */
466 : static GstFlowReturn
467 215 : gst_data_repo_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
468 : {
469 215 : GstDataRepoSink *sink = GST_DATA_REPO_SINK_CAST (bsink);
470 :
471 215 : switch (sink->data_type) {
472 24 : case GST_DATA_REPO_DATA_VIDEO:
473 : case GST_DATA_REPO_DATA_AUDIO:
474 : case GST_DATA_REPO_DATA_TEXT:
475 : case GST_DATA_REPO_DATA_OCTET:
476 24 : return gst_data_repo_sink_write_others (sink, buffer);
477 171 : case GST_DATA_REPO_DATA_TENSOR:
478 : {
479 171 : if (sink->is_static_tensors)
480 10 : return gst_data_repo_sink_write_others (sink, buffer);
481 161 : return gst_data_repo_sink_write_flexible_or_sparse_tensors (sink, buffer);
482 : }
483 20 : case GST_DATA_REPO_DATA_IMAGE:
484 20 : return gst_data_repo_sink_write_multi_images (sink, buffer);
485 0 : default:
486 0 : return GST_FLOW_ERROR;
487 : }
488 : }
489 :
490 : /**
491 : * @brief Get caps of datareposink.
492 : */
493 : static GstCaps *
494 552 : gst_data_repo_sink_get_caps (GstBaseSink * bsink, GstCaps * filter)
495 : {
496 552 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (bsink);
497 552 : GstCaps *caps = NULL;
498 :
499 552 : GST_OBJECT_LOCK (sink);
500 552 : caps = sink->fixed_caps;
501 :
502 552 : GST_INFO_OBJECT (sink, "Got caps %" GST_PTR_FORMAT, caps);
503 552 : if (caps) {
504 418 : if (filter)
505 0 : caps = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
506 : else
507 418 : caps = gst_caps_ref (caps);
508 : }
509 :
510 552 : GST_DEBUG_OBJECT (sink, "result get caps: %" GST_PTR_FORMAT, caps);
511 552 : GST_OBJECT_UNLOCK (sink);
512 :
513 552 : return caps;
514 : }
515 :
516 : /**
517 : * @brief Set whether the given pad caps are static or not.
518 : */
519 : static void
520 22 : gst_data_repo_sink_set_is_static_tensors (GstDataRepoSink * sink)
521 : {
522 : GstStructure *structure;
523 : GstTensorsConfig config;
524 :
525 33 : g_return_if_fail (sink != NULL);
526 22 : g_return_if_fail (sink->fixed_caps != NULL);
527 22 : g_return_if_fail (sink->data_type == GST_DATA_REPO_DATA_TENSOR);
528 :
529 11 : structure = gst_caps_get_structure (sink->fixed_caps, 0);
530 11 : gst_tensors_config_from_structure (&config, structure);
531 11 : sink->is_static_tensors = gst_tensors_config_is_static (&config);
532 11 : gst_tensors_config_free (&config);
533 : }
534 :
535 : /**
536 : * @brief Set caps of datareposink.
537 : */
538 : static gboolean
539 22 : gst_data_repo_sink_set_caps (GstBaseSink * bsink, GstCaps * caps)
540 : {
541 : GstDataRepoSink *sink;
542 :
543 22 : sink = GST_DATA_REPO_SINK (bsink);
544 22 : GST_INFO_OBJECT (sink, "set caps %" GST_PTR_FORMAT, caps);
545 :
546 22 : if (sink->fixed_caps) {
547 0 : gst_caps_unref (sink->fixed_caps);
548 0 : sink->fixed_caps = NULL;
549 : }
550 :
551 22 : sink->data_type = gst_data_repo_get_data_type_from_caps (caps);
552 22 : sink->fixed_caps = gst_caps_copy (caps);
553 :
554 22 : gst_data_repo_sink_set_is_static_tensors (sink);
555 :
556 22 : GST_DEBUG_OBJECT (sink, "data type: %d", sink->data_type);
557 22 : return (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN);
558 : }
559 :
560 : /**
561 : * @brief Perform a GstQuery on datareposink.
562 : */
563 : static gboolean
564 567 : gst_data_repo_sink_query (GstBaseSink * bsink, GstQuery * query)
565 : {
566 : gboolean ret;
567 :
568 567 : switch (GST_QUERY_TYPE (query)) {
569 0 : case GST_QUERY_SEEKING:{
570 : GstFormat fmt;
571 :
572 : /* we don't supporting seeking */
573 0 : gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
574 0 : gst_query_set_seeking (query, fmt, FALSE, 0, -1);
575 0 : ret = TRUE;
576 0 : break;
577 : }
578 567 : default:
579 567 : ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
580 567 : break;
581 : }
582 :
583 567 : return ret;
584 : }
585 :
586 : /**
587 : * @brief Function to open file
588 : */
589 : static gboolean
590 21 : gst_data_repo_sink_open_file (GstDataRepoSink * sink)
591 : {
592 21 : gchar *filename = NULL;
593 21 : int flags = O_CREAT | O_WRONLY;
594 :
595 21 : g_return_val_if_fail (sink != NULL, FALSE);
596 21 : g_return_val_if_fail (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN, FALSE);
597 :
598 21 : if (sink->filename == NULL || sink->filename[0] == '\0')
599 0 : goto no_filename;
600 :
601 21 : if (sink->data_type == GST_DATA_REPO_DATA_IMAGE) {
602 4 : return TRUE;
603 : }
604 :
605 : /* need to get filename by media type */
606 17 : filename = g_strdup (sink->filename);
607 :
608 17 : GST_INFO_OBJECT (sink, "opening file %s", filename);
609 :
610 17 : flags |= O_TRUNC; /* "wb" */
611 17 : sink->fd = g_open (filename, flags, 0644);
612 :
613 17 : if (sink->fd < 0)
614 0 : goto open_failed;
615 :
616 17 : g_free (filename);
617 :
618 17 : return TRUE;
619 :
620 0 : no_filename:
621 : {
622 0 : GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND,
623 : (("No file name specified for writing.")), (NULL));
624 0 : goto error_exit;
625 : }
626 0 : open_failed:
627 : {
628 0 : switch (errno) {
629 0 : case ENOENT:
630 0 : GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
631 : ("No such file \"%s\"", sink->filename));
632 0 : break;
633 0 : default:
634 0 : GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ,
635 : (("Could not open file \"%s\" for reading."), sink->filename),
636 : GST_ERROR_SYSTEM);
637 0 : break;
638 : }
639 0 : goto error_exit;
640 : }
641 :
642 0 : error_exit:
643 0 : g_free (filename);
644 :
645 0 : return FALSE;
646 : }
647 :
648 : /**
649 : * @brief Stop datareposink
650 : */
651 : static gboolean
652 22 : gst_data_repo_sink_stop (GstBaseSink * basesink)
653 : {
654 : GstDataRepoSink *sink;
655 :
656 22 : sink = GST_DATA_REPO_SINK_CAST (basesink);
657 :
658 22 : g_close (sink->fd, NULL);
659 22 : sink->fd = 0;
660 :
661 22 : return TRUE;
662 : }
663 :
664 : /**
665 : * @brief Write json to file
666 : */
667 : static gboolean
668 22 : __write_json (JsonObject * object, const gchar * filename)
669 : {
670 : JsonNode *root;
671 : JsonGenerator *generator;
672 22 : gboolean ret = TRUE;
673 :
674 22 : g_return_val_if_fail (object != NULL, FALSE);
675 22 : g_return_val_if_fail (filename != NULL, FALSE);
676 :
677 22 : root = json_node_init_object (json_node_alloc (), object);
678 22 : generator = json_generator_new ();
679 22 : json_generator_set_root (generator, root);
680 22 : json_generator_set_pretty (generator, TRUE);
681 22 : ret = json_generator_to_file (generator, filename, NULL);
682 22 : if (!ret) {
683 0 : GST_ERROR ("Failed to write JSON to file %s", filename);
684 : }
685 :
686 22 : g_object_unref (generator);
687 22 : json_node_free (root);
688 :
689 22 : return ret;
690 : }
691 :
692 : /**
693 : * @brief write the meta information to a JSON file
694 : */
695 : static gboolean
696 22 : gst_data_repo_sink_write_json_meta_file (GstDataRepoSink * sink)
697 : {
698 22 : gchar *caps_str = NULL;
699 22 : gboolean ret = TRUE;
700 :
701 22 : g_return_val_if_fail (sink != NULL, FALSE);
702 22 : g_return_val_if_fail (sink->json_filename != NULL, FALSE);
703 22 : g_return_val_if_fail (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN, FALSE);
704 22 : g_return_val_if_fail (sink->fixed_caps != NULL, FALSE);
705 22 : g_return_val_if_fail (sink->json_object != NULL, FALSE);
706 22 : g_return_val_if_fail (sink->sample_offset_array != NULL, FALSE);
707 22 : g_return_val_if_fail (sink->tensor_size_array != NULL, FALSE);
708 22 : g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR);
709 :
710 22 : caps_str = gst_caps_to_string (sink->fixed_caps);
711 22 : GST_DEBUG_OBJECT (sink, "caps string: %s", caps_str);
712 :
713 22 : json_object_set_string_member (sink->json_object, "gst_caps", caps_str);
714 :
715 22 : json_object_set_int_member (sink->json_object, "total_samples",
716 22 : sink->total_samples);
717 :
718 22 : if (sink->data_type == GST_DATA_REPO_DATA_TENSOR && !sink->is_static_tensors) {
719 10 : json_object_set_array_member (sink->json_object, "sample_offset",
720 : sink->sample_offset_array);
721 10 : json_object_set_array_member (sink->json_object, "tensor_size",
722 : sink->tensor_size_array);
723 10 : json_object_set_array_member (sink->json_object, "tensor_count",
724 : sink->tensor_count_array);
725 :
726 10 : sink->sample_offset_array = NULL;
727 10 : sink->tensor_size_array = NULL;
728 10 : sink->tensor_count_array = NULL;
729 : } else {
730 12 : json_object_set_int_member (sink->json_object, "sample_size",
731 12 : sink->sample_size);
732 : }
733 22 : ret = __write_json (sink->json_object, sink->json_filename);
734 22 : if (!ret) {
735 0 : GST_ERROR_OBJECT (sink, "Failed to write json meta file: %s",
736 : sink->json_filename);
737 : }
738 :
739 22 : json_object_unref (sink->json_object);
740 22 : g_free (caps_str);
741 22 : sink->json_object = NULL;
742 :
743 22 : return ret;
744 : }
745 :
746 : /**
747 : * @brief Change state of datareposink.
748 : */
749 : static GstStateChangeReturn
750 138 : gst_data_repo_sink_change_state (GstElement * element,
751 : GstStateChange transition)
752 : {
753 138 : GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
754 138 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (element);
755 :
756 138 : switch (transition) {
757 26 : case GST_STATE_CHANGE_NULL_TO_READY:
758 26 : GST_INFO_OBJECT (sink, "NULL_TO_READY");
759 26 : if (sink->filename == NULL || sink->json_filename == NULL) {
760 2 : GST_ERROR_OBJECT (sink, "Set filenmae and json");
761 2 : goto state_change_failed;
762 : }
763 24 : break;
764 :
765 24 : case GST_STATE_CHANGE_READY_TO_PAUSED:
766 24 : GST_INFO_OBJECT (sink, "READY_TO_PAUSED");
767 24 : break;
768 :
769 21 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
770 21 : GST_INFO_OBJECT (sink, "PAUSED_TO_PLAYING");
771 21 : if (!gst_data_repo_sink_open_file (sink))
772 0 : goto state_change_failed;
773 21 : break;
774 :
775 67 : default:
776 67 : break;
777 : }
778 :
779 136 : ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
780 :
781 136 : switch (transition) {
782 21 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
783 21 : GST_INFO_OBJECT (sink, "PLAYING_TO_PAUSED");
784 21 : break;
785 :
786 22 : case GST_STATE_CHANGE_PAUSED_TO_READY:
787 22 : GST_INFO_OBJECT (sink, "PAUSED_TO_READY");
788 22 : break;
789 :
790 22 : case GST_STATE_CHANGE_READY_TO_NULL:
791 22 : GST_INFO_OBJECT (sink, "READY_TO_NULL");
792 22 : if (!gst_data_repo_sink_write_json_meta_file (sink))
793 0 : goto state_change_failed;
794 22 : break;
795 :
796 71 : default:
797 71 : break;
798 : }
799 136 : return ret;
800 :
801 2 : state_change_failed:
802 2 : GST_ERROR_OBJECT (sink, "state change failed");
803 :
804 2 : return GST_STATE_CHANGE_FAILURE;
805 : }
|