LCOV - code coverage report
Current view: top level - nnstreamer-edge-0.2.6/src/libnnstreamer-edge - nnstreamer-edge-queue.c (source / functions) Coverage Total Hit
Test: NNStreamer-edge 0.2.6-1 nnstreamer/nnstreamer-edge#121619a22eefb07aef74ab765d1eb0ec59b1416e Lines: 95.3 % 150 143
Test Date: 2025-06-06 05:20:40 Functions: 100.0 % 9 9

            Line data    Source code
       1              : /* SPDX-License-Identifier: Apache-2.0 */
       2              : /**
       3              :  * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
       4              :  *
       5              :  * @file   nnstreamer-edge-queue.c
       6              :  * @date   24 August 2022
       7              :  * @brief  Thread-safe queue.
       8              :  * @see    https://github.com/nnstreamer/nnstreamer-edge
       9              :  * @author Jaeyun Jung <jy1210.jung@samsung.com>
      10              :  * @bug    No known bugs except for NYI items.
      11              :  */
      12              : 
      13              : #include "nnstreamer-edge-log.h"
      14              : #include "nnstreamer-edge-queue.h"
      15              : #include "nnstreamer-edge-util.h"
      16              : 
      17              : /**
      18              :  * @brief Internal structure for queue data.
      19              :  */
      20              : typedef struct _nns_edge_queue_data_s nns_edge_queue_data_s;
      21              : 
      22              : /**
      23              :  * @brief Internal structure for queue data.
      24              :  */
      25              : struct _nns_edge_queue_data_s
      26              : {
      27              :   nns_edge_raw_data_s data;
      28              :   nns_edge_queue_data_s *next;
      29              : };
      30              : 
      31              : /**
      32              :  * @brief Internal structure for queue.
      33              :  */
      34              : typedef struct
      35              : {
      36              :   uint32_t magic;
      37              :   pthread_mutex_t lock;
      38              :   pthread_cond_t cond;
      39              : 
      40              :   nns_edge_queue_leak_e leaky;
      41              :   unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */
      42              :   unsigned int length;
      43              :   nns_edge_queue_data_s *head;
      44              :   nns_edge_queue_data_s *tail;
      45              : } nns_edge_queue_s;
      46              : 
      47              : /**
      48              :  * @brief Pop data from queue. If the param 'clear' is true, release old data and return null.
      49              :  * @note This function should be called with lock.
      50              :  */
      51              : static bool
      52           70 : _pop_data (nns_edge_queue_s * q, bool clear, void **data, nns_size_t * size)
      53              : {
      54              :   nns_edge_queue_data_s *qdata;
      55           70 :   bool popped = false;
      56              : 
      57           70 :   qdata = q->head;
      58           70 :   if (qdata) {
      59           60 :     q->head = qdata->next;
      60           60 :     if ((--q->length) == 0U)
      61           46 :       q->head = q->tail = NULL;
      62              : 
      63           60 :     if (clear) {
      64           11 :       if (qdata->data.destroy_cb)
      65            8 :         qdata->data.destroy_cb (qdata->data.data);
      66              :     } else {
      67           49 :       if (data)
      68           49 :         *data = qdata->data.data;
      69           49 :       if (size)
      70           49 :         *size = qdata->data.data_len;
      71           49 :       popped = true;
      72              :     }
      73              : 
      74           60 :     SAFE_FREE (qdata);
      75              :   }
      76              : 
      77           70 :   return popped;
      78              : }
      79              : 
      80              : /**
      81              :  * @brief Create queue.
      82              :  */
      83              : int
      84           72 : nns_edge_queue_create (nns_edge_queue_h * handle)
      85              : {
      86              :   nns_edge_queue_s *q;
      87              : 
      88           72 :   if (!handle) {
      89            1 :     nns_edge_loge ("[Queue] Invalid param, handle is null.");
      90            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
      91              :   }
      92              : 
      93           71 :   q = calloc (1, sizeof (nns_edge_queue_s));
      94           71 :   if (!q) {
      95            0 :     nns_edge_loge ("[Queue] Failed to allocate new memory.");
      96            0 :     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
      97              :   }
      98              : 
      99           71 :   nns_edge_lock_init (q);
     100           71 :   nns_edge_cond_init (q);
     101           71 :   nns_edge_handle_set_magic (q, NNS_EDGE_MAGIC);
     102           71 :   q->leaky = NNS_EDGE_QUEUE_LEAK_NEW;
     103              : 
     104           71 :   *handle = q;
     105           71 :   return NNS_EDGE_ERROR_NONE;
     106              : }
     107              : 
     108              : /**
     109              :  * @brief Destroy queue.
     110              :  */
     111              : int
     112           73 : nns_edge_queue_destroy (nns_edge_queue_h handle)
     113              : {
     114           73 :   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
     115              : 
     116           73 :   if (!nns_edge_handle_is_valid (q)) {
     117            2 :     nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
     118            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     119              :   }
     120              : 
     121              :   /* Stop waiting and clear all data. */
     122           71 :   nns_edge_queue_clear (handle);
     123              : 
     124           71 :   nns_edge_handle_set_magic (q, NNS_EDGE_MAGIC_DEAD);
     125           71 :   nns_edge_cond_destroy (q);
     126           71 :   nns_edge_lock_destroy (q);
     127           71 :   SAFE_FREE (q);
     128              : 
     129           71 :   return NNS_EDGE_ERROR_NONE;
     130              : }
     131              : 
     132              : /**
     133              :  * @brief Get the number of data in the queue.
     134              :  */
     135              : int
     136           25 : nns_edge_queue_get_length (nns_edge_queue_h handle, unsigned int *length)
     137              : {
     138           25 :   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
     139              : 
     140           25 :   if (!nns_edge_handle_is_valid (q)) {
     141            2 :     nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
     142            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     143              :   }
     144              : 
     145           23 :   if (!length) {
     146            1 :     nns_edge_loge ("[Queue] Invalid param, length is null.");
     147            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     148              :   }
     149              : 
     150           22 :   nns_edge_lock (q);
     151           22 :   *length = q->length;
     152           22 :   nns_edge_unlock (q);
     153              : 
     154           22 :   return NNS_EDGE_ERROR_NONE;
     155              : }
     156              : 
     157              : /**
     158              :  * @brief Set the max length of the queue.
     159              :  */
     160              : int
     161            7 : nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit,
     162              :     nns_edge_queue_leak_e leaky)
     163              : {
     164            7 :   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
     165              : 
     166            7 :   if (!nns_edge_handle_is_valid (q)) {
     167            2 :     nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
     168            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     169              :   }
     170              : 
     171            5 :   nns_edge_lock (q);
     172            5 :   q->max_data = limit;
     173            5 :   q->leaky = leaky;
     174            5 :   nns_edge_unlock (q);
     175              : 
     176            5 :   return NNS_EDGE_ERROR_NONE;
     177              : }
     178              : 
     179              : /**
     180              :  * @brief Add new data into queue.
     181              :  */
     182              : int
     183           68 : nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_size_t size,
     184              :     nns_edge_data_destroy_cb destroy)
     185              : {
     186           68 :   int ret = NNS_EDGE_ERROR_NONE;
     187           68 :   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
     188              :   nns_edge_queue_data_s *qdata;
     189              : 
     190           68 :   if (!nns_edge_handle_is_valid (q)) {
     191            2 :     nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
     192            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     193              :   }
     194              : 
     195           66 :   if (!data) {
     196            1 :     nns_edge_loge ("[Queue] Invalid param, data is null.");
     197            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     198              :   }
     199              : 
     200           65 :   if (size == 0U) {
     201            1 :     nns_edge_loge ("[Queue] Invalid param, size should be larger than zero.");
     202            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     203              :   }
     204              : 
     205           64 :   nns_edge_lock (q);
     206           64 :   if (q->max_data > 0U && q->length >= q->max_data) {
     207              :     /* Clear old data in queue if leaky option is 'old'. */
     208            6 :     if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) {
     209            2 :       _pop_data (q, true, NULL, NULL);
     210              :     } else {
     211            4 :       nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
     212              :           q->max_data);
     213            4 :       ret = NNS_EDGE_ERROR_IO;
     214            4 :       goto done;
     215              :     }
     216              :   }
     217              : 
     218           60 :   qdata = calloc (1, sizeof (nns_edge_queue_data_s));
     219           60 :   if (!qdata) {
     220            0 :     nns_edge_loge ("[Queue] Failed to allocate new memory for data.");
     221            0 :     ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
     222            0 :     goto done;
     223              :   }
     224              : 
     225           60 :   qdata->data.data = data;
     226           60 :   qdata->data.data_len = size;
     227           60 :   qdata->data.destroy_cb = destroy;
     228              : 
     229           60 :   if (!q->head)
     230           46 :     q->head = qdata;
     231           60 :   if (q->tail)
     232           14 :     q->tail->next = qdata;
     233           60 :   q->tail = qdata;
     234           60 :   q->length++;
     235              : 
     236           64 : done:
     237           64 :   nns_edge_cond_signal (q);
     238           64 :   nns_edge_unlock (q);
     239              : 
     240           64 :   return ret;
     241              : }
     242              : 
     243              : /**
     244              :  * @brief Remove and return the first data in queue.
     245              :  */
     246              : int
     247           13 : nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t * size)
     248              : {
     249           13 :   bool popped = false;
     250           13 :   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
     251              : 
     252           13 :   if (!nns_edge_handle_is_valid (q)) {
     253            2 :     nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
     254            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     255              :   }
     256              : 
     257           11 :   if (!data) {
     258            1 :     nns_edge_loge ("[Queue] Invalid param, data is null.");
     259            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     260              :   }
     261              : 
     262           10 :   if (!size) {
     263            1 :     nns_edge_loge ("[Queue] Invalid param, size is null.");
     264            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     265              :   }
     266              : 
     267              :   /* init data */
     268            9 :   *data = NULL;
     269            9 :   *size = 0U;
     270              : 
     271            9 :   nns_edge_lock (q);
     272            9 :   popped = _pop_data (q, false, data, size);
     273            9 :   nns_edge_unlock (q);
     274              : 
     275            9 :   return (popped && *data != NULL) ? NNS_EDGE_ERROR_NONE : NNS_EDGE_ERROR_IO;
     276              : }
     277              : 
     278              : /**
     279              :  * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue.
     280              :  */
     281              : int
     282           55 : nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
     283              :     void **data, nns_size_t * size)
     284              : {
     285           55 :   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
     286           55 :   bool popped = false;
     287              : 
     288           55 :   if (!nns_edge_handle_is_valid (q)) {
     289            2 :     nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
     290            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     291              :   }
     292              : 
     293           53 :   if (!data) {
     294            1 :     nns_edge_loge ("[Queue] Invalid param, data is null.");
     295            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     296              :   }
     297              : 
     298           52 :   if (!size) {
     299            2 :     nns_edge_loge ("[Queue] Invalid param, size is null.");
     300            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     301              :   }
     302              : 
     303              :   /* init data */
     304           50 :   *data = NULL;
     305           50 :   *size = 0U;
     306              : 
     307           50 :   nns_edge_lock (q);
     308           50 :   if (q->length == 0U)
     309           50 :     nns_edge_cond_wait_until (q, timeout);
     310              : 
     311           50 :   popped = _pop_data (q, false, data, size);
     312           50 :   nns_edge_unlock (q);
     313              : 
     314           50 :   return (popped && *data != NULL) ? NNS_EDGE_ERROR_NONE : NNS_EDGE_ERROR_IO;
     315              : }
     316              : 
     317              : /**
     318              :  * @brief Clear all data in the queue.
     319              :  * @note When this function is called, nns_edge_queue_wait_pop will stop the waiting.
     320              :  */
     321              : int
     322          107 : nns_edge_queue_clear (nns_edge_queue_h handle)
     323              : {
     324          107 :   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
     325              : 
     326          107 :   if (!nns_edge_handle_is_valid (q)) {
     327            0 :     nns_edge_loge ("[Queue] Invalid param, queue is invalid.");
     328            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     329              :   }
     330              : 
     331          107 :   nns_edge_lock (q);
     332          107 :   nns_edge_cond_signal (q);
     333              : 
     334          116 :   while (q->length > 0U)
     335            9 :     _pop_data (q, true, NULL, NULL);
     336              : 
     337          107 :   nns_edge_unlock (q);
     338          107 :   return NNS_EDGE_ERROR_NONE;
     339              : }
        

Generated by: LCOV version 2.0-1