LCOV - code coverage report
Current view: top level - nnstreamer-edge-0.2.6/src/libnnstreamer-edge - nnstreamer-edge-internal.c (source / functions) Coverage Total Hit
Test: NNStreamer-edge 0.2.6-1 nnstreamer/nnstreamer-edge#121619a22eefb07aef74ab765d1eb0ec59b1416e Lines: 77.0 % 1077 829
Test Date: 2025-06-06 05:20:40 Functions: 100.0 % 43 43

            Line data    Source code
       1              : /* SPDX-License-Identifier: Apache-2.0 */
       2              : /**
       3              :  * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
       4              :  *
       5              :  * @file   nnstreamer-edge-internal.c
       6              :  * @date   6 April 2022
       7              :  * @brief  Common library to support communication among devices.
       8              :  * @see    https://github.com/nnstreamer/nnstreamer
       9              :  * @author Gichan Jang <gichan2.jang@samsung.com>
      10              :  * @bug    No known bugs except for NYI items
      11              :  */
      12              : 
      13              : #include <arpa/inet.h>
      14              : #include <netdb.h>
      15              : #include <poll.h>
      16              : 
      17              : #include "nnstreamer-edge-data.h"
      18              : #include "nnstreamer-edge-event.h"
      19              : #include "nnstreamer-edge-log.h"
      20              : #include "nnstreamer-edge-util.h"
      21              : #include "nnstreamer-edge-queue.h"
      22              : #include "nnstreamer-edge-metadata.h"
      23              : #include "nnstreamer-edge-mqtt.h"
      24              : #include "nnstreamer-edge-custom-impl.h"
      25              : 
      26              : #ifndef MSG_NOSIGNAL
      27              : #define MSG_NOSIGNAL 0
      28              : #endif
      29              : 
      30              : /**
      31              :  * @brief The maximum length of pending connections to accept socket.
      32              :  */
      33              : #define N_BACKLOG 10
      34              : 
      35              : /**
      36              :  * @brief Data structure for edge handle.
      37              :  */
      38              : typedef struct
      39              : {
      40              :   uint32_t magic;
      41              :   pthread_mutex_t lock;
      42              :   pthread_cond_t cond;
      43              :   char *id;
      44              :   char *topic;
      45              :   nns_edge_connect_type_e connect_type;
      46              :   char *host; /**< host name or IP address */
      47              :   int port; /**< port number (0~65535, default 0 to get available port.) */
      48              :   char *dest_host; /**< destination IP address (broker or target device) */
      49              :   int dest_port; /**< destination port number (broker or target device) */
      50              :   nns_edge_node_type_e node_type;
      51              :   nns_edge_metadata_h metadata;
      52              :   bool is_started;
      53              : 
      54              :   /* Edge event callback and user data */
      55              :   nns_edge_event_cb event_cb;
      56              :   void *user_data;
      57              : 
      58              :   int64_t client_id;
      59              :   char *caps_str;
      60              : 
      61              :   /* list of connection data */
      62              :   void *connections;
      63              : 
      64              :   /* socket listener */
      65              :   bool listening;
      66              :   int listener_fd;
      67              :   pthread_t listener_thread;
      68              : 
      69              :   /* thread and queue to send data */
      70              :   bool sending;
      71              :   nns_edge_queue_h send_queue;
      72              :   pthread_t send_thread;
      73              : 
      74              :   /* MQTT handle */
      75              :   void *broker_h;
      76              : 
      77              :   /* Data for custom connection */
      78              :   nns_edge_custom_connection_h custom_connection_h;
      79              : } nns_edge_handle_s;
      80              : 
      81              : /**
      82              :  * @brief enum for nnstreamer edge query commands.
      83              :  */
      84              : typedef enum
      85              : {
      86              :   _NNS_EDGE_CMD_ERROR = 0,
      87              :   _NNS_EDGE_CMD_TRANSFER_DATA,
      88              :   _NNS_EDGE_CMD_HOST_INFO,
      89              :   _NNS_EDGE_CMD_CAPABILITY,
      90              :   _NNS_EDGE_CMD_END
      91              : } nns_edge_cmd_e;
      92              : 
      93              : /**
      94              :  * @brief Structure for edge command info. It should be fixed size.
      95              :  */
      96              : typedef struct
      97              : {
      98              :   uint32_t magic;
      99              :   uint32_t cmd; /**< enum for query commands, see nns_edge_cmd_e. */
     100              :   uint64_t version;
     101              :   int64_t client_id;
     102              : 
     103              :   /* memory info */
     104              :   uint32_t num;
     105              :   nns_size_t mem_size[NNS_EDGE_DATA_LIMIT];
     106              :   nns_size_t meta_size;
     107              : } nns_edge_cmd_info_s;
     108              : 
     109              : /**
     110              :  * @brief Structure for edge command and buffers.
     111              :  */
     112              : typedef struct
     113              : {
     114              :   nns_edge_cmd_info_s info;
     115              :   void *mem[NNS_EDGE_DATA_LIMIT];
     116              :   void *meta;
     117              : } nns_edge_cmd_s;
     118              : 
     119              : /**
     120              :  * @brief Data structure for connection data.
     121              :  */
     122              : typedef struct _nns_edge_conn_data_s nns_edge_conn_data_s;
     123              : 
     124              : /**
     125              :  * @brief Data structure for edge connection.
     126              :  */
     127              : typedef struct
     128              : {
     129              :   char *host;
     130              :   int port;
     131              :   bool running;
     132              :   pthread_t msg_thread;
     133              :   int sockfd;
     134              : } nns_edge_conn_s;
     135              : 
     136              : /**
     137              :  * @brief Data structure for connection data.
     138              :  */
     139              : struct _nns_edge_conn_data_s
     140              : {
     141              :   nns_edge_conn_s *src_conn;
     142              :   nns_edge_conn_s *sink_conn;
     143              :   int64_t id;
     144              :   nns_edge_conn_data_s *next;
     145              : };
     146              : 
     147              : /**
     148              :  * @brief Structures for thread data of message handling.
     149              :  */
     150              : typedef struct
     151              : {
     152              :   nns_edge_handle_s *eh;
     153              :   int64_t client_id;
     154              :   nns_edge_conn_s *conn;
     155              : } nns_edge_thread_data_s;
     156              : 
     157              : /**
     158              :  * @brief Parse the message received from the MQTT broker and connect to the server directly.
     159              :  */
     160              : static int _mqtt_hybrid_direct_connection (nns_edge_handle_s * eh);
     161              : 
     162              : /**
     163              :  * @brief Set socket option. nnstreamer-edge handles TCP connection now.
     164              :  */
     165              : static void
     166           12 : _set_socket_option (int fd)
     167              : {
     168           12 :   int nodelay = 1;
     169              : 
     170              :   /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
     171           12 :   if (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof (int)) < 0)
     172            0 :     nns_edge_logw ("Failed to set TCP delay option.");
     173           12 : }
     174              : 
     175              : /**
     176              :  * @brief Fill socket address struct from host name and port number.
     177              :  */
     178              : static bool
     179           12 : _fill_socket_addr (struct sockaddr_in *saddr, const char *host, const int port)
     180              : {
     181              :   /** @todo handle protocol (ipv4 and ipv6) */
     182           12 :   saddr->sin_family = AF_INET;
     183           12 :   saddr->sin_port = htons (port);
     184              : 
     185           12 :   if ((saddr->sin_addr.s_addr = inet_addr (host)) == INADDR_NONE) {
     186              :     int ret;
     187            8 :     char *port_str = NULL;
     188              :     struct addrinfo hints;
     189            8 :     struct addrinfo *addrs = NULL;
     190              : 
     191            8 :     memset (&hints, 0, sizeof (hints));
     192            8 :     hints.ai_family = AF_INET;
     193            8 :     hints.ai_socktype = SOCK_STREAM;
     194              : 
     195            8 :     if (port > 0)
     196            8 :       port_str = nns_edge_strdup_printf ("%d", port);
     197            8 :     ret = getaddrinfo (host, port_str, &hints, &addrs);
     198            8 :     SAFE_FREE (port_str);
     199              : 
     200            8 :     if (ret != 0 || addrs == NULL)
     201            0 :       return false;
     202              : 
     203            8 :     memcpy (saddr, addrs->ai_addr, addrs->ai_addrlen);
     204            8 :     freeaddrinfo (addrs);
     205              :   }
     206              : 
     207           12 :   return true;
     208              : }
     209              : 
     210              : /**
     211              :  * @brief Send data to connected socket.
     212              :  */
     213              : static bool
     214          114 : _send_raw_data (nns_edge_conn_s * conn, void *data, nns_size_t size)
     215              : {
     216          114 :   nns_size_t sent = 0;
     217              :   nns_ssize_t rret;
     218              : 
     219          228 :   while (sent < size) {
     220          114 :     rret = send (conn->sockfd, (char *) data + sent, size - sent, MSG_NOSIGNAL);
     221              : 
     222          114 :     if (rret <= 0) {
     223            0 :       nns_edge_loge ("Failed to send raw data.");
     224            0 :       return false;
     225              :     }
     226              : 
     227          114 :     sent += rret;
     228              :   }
     229              : 
     230          114 :   return true;
     231              : }
     232              : 
     233              : /**
     234              :  * @brief Receive data from connected socket.
     235              :  */
     236              : static bool
     237          105 : _receive_raw_data (nns_edge_conn_s * conn, void *data, nns_size_t size)
     238              : {
     239          105 :   nns_size_t received = 0;
     240              :   nns_ssize_t rret;
     241              : 
     242          210 :   while (received < size) {
     243          105 :     rret = recv (conn->sockfd, (char *) data + received, size - received, 0);
     244              : 
     245          105 :     if (rret <= 0) {
     246            0 :       nns_edge_loge ("Failed to receive raw data.");
     247            0 :       return false;
     248              :     }
     249              : 
     250          105 :     received += rret;
     251              :   }
     252              : 
     253          105 :   return true;
     254              : }
     255              : 
     256              : /**
     257              :  * @brief Internal function to check connection.
     258              :  */
     259              : static bool
     260          117 : _nns_edge_check_connection (nns_edge_conn_s * conn)
     261              : {
     262              :   struct pollfd poll_fd;
     263              :   int n;
     264              : 
     265          117 :   if (!conn || conn->sockfd < 0)
     266          117 :     return false;
     267              : 
     268          117 :   poll_fd.fd = conn->sockfd;
     269          117 :   poll_fd.events = POLLIN | POLLOUT | POLLPRI | POLLERR | POLLHUP;
     270          117 :   poll_fd.revents = 0;
     271              : 
     272              :   /** Timeout zero means that the poll() is returned immediately. */
     273          117 :   n = poll (&poll_fd, 1, 0);
     274              :   /**
     275              :    * Return value zero indicates that the system call timed out.
     276              :    * let's skip the check `n == 0` because timeout is set to 0.
     277              :    */
     278          117 :   if (n < 0 || poll_fd.revents & (POLLERR | POLLHUP)) {
     279            0 :     nns_edge_logw ("Socket is not available, possibly closed.");
     280            0 :     return false;
     281              :   }
     282              : 
     283          117 :   return true;
     284              : }
     285              : 
     286              : /**
     287              :  * @brief initialize edge command.
     288              :  */
     289              : static void
     290           87 : _nns_edge_cmd_init (nns_edge_cmd_s * cmd, nns_edge_cmd_e c, int64_t cid)
     291              : {
     292           87 :   if (!cmd)
     293            0 :     return;
     294              : 
     295           87 :   memset (cmd, 0, sizeof (nns_edge_cmd_s));
     296           87 :   nns_edge_handle_set_magic (&cmd->info, NNS_EDGE_MAGIC);
     297           87 :   cmd->info.cmd = c;
     298           87 :   cmd->info.version = nns_edge_generate_version_key ();
     299           87 :   cmd->info.client_id = cid;
     300           87 :   cmd->info.num = 0;
     301           87 :   cmd->info.meta_size = 0;
     302              : }
     303              : 
     304              : /**
     305              :  * @brief Clear allocated memory in edge command.
     306              :  */
     307              : static void
     308           42 : _nns_edge_cmd_clear (nns_edge_cmd_s * cmd)
     309              : {
     310              :   unsigned int i;
     311              : 
     312           42 :   if (!cmd)
     313            0 :     return;
     314              : 
     315           42 :   nns_edge_handle_set_magic (&cmd->info, NNS_EDGE_MAGIC_DEAD);
     316              : 
     317           81 :   for (i = 0; i < cmd->info.num; i++) {
     318           39 :     SAFE_FREE (cmd->mem[i]);
     319           39 :     cmd->info.mem_size[i] = 0U;
     320              :   }
     321              : 
     322           42 :   SAFE_FREE (cmd->meta);
     323              : 
     324           42 :   cmd->info.cmd = _NNS_EDGE_CMD_ERROR;
     325           42 :   cmd->info.version = 0;
     326           42 :   cmd->info.client_id = 0;
     327           42 :   cmd->info.num = 0;
     328           42 :   cmd->info.meta_size = 0;
     329              : }
     330              : 
     331              : /**
     332              :  * @brief Validate edge command.
     333              :  */
     334              : static bool
     335           87 : _nns_edge_cmd_is_valid (nns_edge_cmd_s * cmd)
     336              : {
     337              :   int command;
     338              : 
     339           87 :   if (!cmd)
     340            0 :     return false;
     341              : 
     342           87 :   command = (int) cmd->info.cmd;
     343              : 
     344           87 :   if (!nns_edge_handle_is_valid (&cmd->info) ||
     345           87 :       (command < 0 || command >= _NNS_EDGE_CMD_END)) {
     346            0 :     return false;
     347              :   }
     348              : 
     349           87 :   if (!nns_edge_parse_version_key (cmd->info.version, NULL, NULL, NULL))
     350            0 :     return false;
     351              : 
     352              :   /**
     353              :    * @todo The number of memories in data.
     354              :    * Total number of memories in edge-data should be less than NNS_EDGE_DATA_LIMIT.
     355              :    * Fetch nns-edge version info and check allowed memories if NNS_EDGE_DATA_LIMIT is updated.
     356              :    */
     357           87 :   if (cmd->info.num > NNS_EDGE_DATA_LIMIT)
     358            0 :     return false;
     359              : 
     360           87 :   return true;
     361              : }
     362              : 
     363              : /**
     364              :  * @brief Send edge command to connected device.
     365              :  */
     366              : static int
     367           48 : _nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
     368              : {
     369              :   unsigned int n;
     370              : 
     371           48 :   if (!conn) {
     372            0 :     nns_edge_loge ("Failed to send command, edge connection is null.");
     373            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     374              :   }
     375              : 
     376           48 :   if (!_nns_edge_cmd_is_valid (cmd)) {
     377            0 :     nns_edge_loge ("Failed to send command, invalid command.");
     378            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     379              :   }
     380              : 
     381           48 :   if (!_nns_edge_check_connection (conn)) {
     382            0 :     nns_edge_loge ("Failed to send command, socket has error.");
     383            0 :     return NNS_EDGE_ERROR_IO;
     384              :   }
     385              : 
     386           48 :   if (!_send_raw_data (conn, &cmd->info, sizeof (nns_edge_cmd_info_s))) {
     387            0 :     nns_edge_loge ("Failed to send command to socket.");
     388            0 :     return NNS_EDGE_ERROR_IO;
     389              :   }
     390              : 
     391           84 :   for (n = 0; n < cmd->info.num; n++) {
     392           36 :     if (!_send_raw_data (conn, cmd->mem[n], cmd->info.mem_size[n])) {
     393            0 :       nns_edge_loge ("Failed to send %uth memory to socket.", n);
     394            0 :       return NNS_EDGE_ERROR_IO;
     395              :     }
     396              :   }
     397              : 
     398           48 :   if (cmd->info.meta_size > 0) {
     399           30 :     if (!_send_raw_data (conn, cmd->meta, cmd->info.meta_size)) {
     400            0 :       nns_edge_loge ("Failed to send metadata to socket.");
     401            0 :       return NNS_EDGE_ERROR_IO;
     402              :     }
     403              :   }
     404              : 
     405           48 :   return NNS_EDGE_ERROR_NONE;
     406              : }
     407              : 
     408              : /**
     409              :  * @brief Receive edge command from connected device.
     410              :  * @note Before calling this function, you should initialize edge-cmd by using _nns_edge_cmd_init().
     411              :  */
     412              : static int
     413           39 : _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
     414              : {
     415              :   unsigned int n;
     416           39 :   int ret = NNS_EDGE_ERROR_NONE;
     417              : 
     418           39 :   if (!conn || !cmd)
     419            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
     420              : 
     421           39 :   if (!_nns_edge_check_connection (conn)) {
     422            0 :     nns_edge_loge ("Failed to receive command, socket has error.");
     423            0 :     return NNS_EDGE_ERROR_IO;
     424              :   }
     425              : 
     426           39 :   if (!_receive_raw_data (conn, &cmd->info, sizeof (nns_edge_cmd_info_s))) {
     427            0 :     nns_edge_loge ("Failed to receive command from socket.");
     428            0 :     return NNS_EDGE_ERROR_IO;
     429              :   }
     430              : 
     431           39 :   if (!_nns_edge_cmd_is_valid (cmd)) {
     432            0 :     nns_edge_loge ("Failed to receive command, invalid command.");
     433            0 :     return NNS_EDGE_ERROR_IO;
     434              :   }
     435              : 
     436           39 :   nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num);
     437           39 :   if (cmd->info.num >= NNS_EDGE_DATA_LIMIT) {
     438            0 :     nns_edge_loge ("Invalid request, the max memories for data transfer is %d.",
     439              :         NNS_EDGE_DATA_LIMIT);
     440            0 :     return NNS_EDGE_ERROR_IO;
     441              :   }
     442              : 
     443           75 :   for (n = 0; n < cmd->info.num; n++) {
     444           36 :     cmd->mem[n] = nns_edge_malloc (cmd->info.mem_size[n]);
     445           36 :     if (!cmd->mem[n]) {
     446            0 :       nns_edge_loge ("Failed to allocate memory to receive data from socket.");
     447            0 :       ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
     448            0 :       goto error;
     449              :     }
     450              : 
     451           36 :     if (!_receive_raw_data (conn, cmd->mem[n], cmd->info.mem_size[n])) {
     452            0 :       nns_edge_loge ("Failed to receive %uth memory from socket.", n++);
     453            0 :       ret = NNS_EDGE_ERROR_IO;
     454            0 :       goto error;
     455              :     }
     456              :   }
     457              : 
     458           39 :   if (cmd->info.meta_size > 0) {
     459           30 :     cmd->meta = nns_edge_malloc (cmd->info.meta_size);
     460           30 :     if (!cmd->meta) {
     461            0 :       nns_edge_loge ("Failed to allocate memory to receive meta from socket.");
     462            0 :       ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
     463            0 :       goto error;
     464              :     }
     465              : 
     466           30 :     if (!_receive_raw_data (conn, cmd->meta, cmd->info.meta_size)) {
     467            0 :       nns_edge_loge ("Failed to receive metadata from socket.");
     468            0 :       ret = NNS_EDGE_ERROR_IO;
     469            0 :       goto error;
     470              :     }
     471              :   }
     472              : 
     473           39 :   return NNS_EDGE_ERROR_NONE;
     474              : 
     475            0 : error:
     476            0 :   _nns_edge_cmd_clear (cmd);
     477            0 :   return ret;
     478              : }
     479              : 
     480              : /**
     481              :  * @brief Internal function to send edge data.
     482              :  */
     483              : static int
     484           30 : _nns_edge_transfer_data (nns_edge_conn_s * conn, nns_edge_data_h data_h,
     485              :     int64_t client_id)
     486              : {
     487              :   nns_edge_cmd_s cmd;
     488              :   unsigned int i;
     489              :   int ret;
     490              : 
     491           30 :   _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, client_id);
     492              : 
     493           30 :   ret = nns_edge_data_get_count (data_h, &cmd.info.num);
     494           30 :   if (ret != NNS_EDGE_ERROR_NONE) {
     495            0 :     nns_edge_loge ("Failed to get data count");
     496           30 :     return ret;
     497              :   }
     498              : 
     499           60 :   for (i = 0; i < cmd.info.num; i++) {
     500           30 :     ret = nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]);
     501           30 :     if (ret != NNS_EDGE_ERROR_NONE) {
     502            0 :       nns_edge_loge ("Failed to get data");
     503            0 :       return ret;
     504              :     }
     505              :   }
     506              : 
     507           30 :   ret = nns_edge_data_serialize_meta (data_h, &cmd.meta, &cmd.info.meta_size);
     508           30 :   if (ret != NNS_EDGE_ERROR_NONE) {
     509            0 :     nns_edge_loge ("Failed to serialize meta");
     510            0 :     return ret;
     511              :   }
     512              : 
     513           30 :   ret = _nns_edge_cmd_send (conn, &cmd);
     514           30 :   SAFE_FREE (cmd.meta);
     515              : 
     516           30 :   if (ret != NNS_EDGE_ERROR_NONE) {
     517            0 :     nns_edge_loge ("Failed to send edge data to destination (%s:%d).",
     518              :         conn->host, conn->port);
     519              :   }
     520              : 
     521           30 :   return ret;
     522              : }
     523              : 
     524              : /**
     525              :  * @brief Close connection
     526              :  */
     527              : static bool
     528           24 : _nns_edge_close_connection (nns_edge_conn_s * conn)
     529              : {
     530           24 :   if (!conn)
     531           12 :     return false;
     532              : 
     533              :   /* Stop and clear the message thread. */
     534           12 :   conn->running = false;
     535           12 :   if (conn->msg_thread) {
     536            6 :     pthread_join (conn->msg_thread, NULL);
     537            6 :     conn->msg_thread = 0;
     538              :   }
     539              : 
     540           12 :   if (conn->sockfd >= 0) {
     541              :     nns_edge_cmd_s cmd;
     542              : 
     543              :     /* Send error before closing the socket. */
     544           12 :     nns_edge_logd ("Send error cmd to close connection.");
     545           12 :     _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0);
     546           12 :     _nns_edge_cmd_send (conn, &cmd);
     547              : 
     548           12 :     if (close (conn->sockfd) < 0)
     549            0 :       nns_edge_logw ("Failed to close socket.");
     550           12 :     conn->sockfd = -1;
     551              :   }
     552              : 
     553           12 :   SAFE_FREE (conn->host);
     554           12 :   SAFE_FREE (conn);
     555           12 :   return true;
     556              : }
     557              : 
     558              : /**
     559              :  * @brief Release connection data and its resources.
     560              :  */
     561              : static void
     562            6 : _nns_edge_release_connection_data (nns_edge_conn_data_s * cdata)
     563              : {
     564            6 :   if (cdata) {
     565            6 :     _nns_edge_close_connection (cdata->src_conn);
     566            6 :     _nns_edge_close_connection (cdata->sink_conn);
     567            6 :     SAFE_FREE (cdata);
     568              :   }
     569            6 : }
     570              : 
     571              : /**
     572              :  * @brief Get nnstreamer-edge connection data.
     573              :  * @note This function should be called with handle lock.
     574              :  */
     575              : static nns_edge_conn_data_s *
     576           42 : _nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id)
     577              : {
     578              :   nns_edge_conn_data_s *cdata;
     579              : 
     580           42 :   cdata = (nns_edge_conn_data_s *) eh->connections;
     581              : 
     582           48 :   while (cdata) {
     583           42 :     if (cdata->id == client_id)
     584           36 :       return cdata;
     585              : 
     586            6 :     cdata = cdata->next;
     587              :   }
     588              : 
     589            6 :   return NULL;
     590              : }
     591              : 
     592              : /**
     593              :  * @brief Get nnstreamer-edge connection data.
     594              :  * @note This function should be called with handle lock.
     595              :  */
     596              : static nns_edge_conn_data_s *
     597           12 : _nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id)
     598              : {
     599              :   nns_edge_conn_data_s *cdata;
     600              : 
     601           12 :   cdata = _nns_edge_get_connection (eh, client_id);
     602              : 
     603           12 :   if (NULL == cdata) {
     604            6 :     cdata = (nns_edge_conn_data_s *) calloc (1, sizeof (nns_edge_conn_data_s));
     605            6 :     if (NULL == cdata) {
     606            0 :       nns_edge_loge ("Failed to allocate memory for connection data.");
     607            0 :       return NULL;
     608              :     }
     609              : 
     610              :     /* prepend connection data */
     611            6 :     cdata->id = client_id;
     612            6 :     cdata->next = eh->connections;
     613            6 :     eh->connections = cdata;
     614              :   }
     615              : 
     616           12 :   return cdata;
     617              : }
     618              : 
     619              : /**
     620              :  * @brief Remove nnstreamer-edge connection data.
     621              :  * @note This function should be called with handle lock.
     622              :  */
     623              : static void
     624            3 : _nns_edge_remove_connection (nns_edge_handle_s * eh, int64_t client_id)
     625              : {
     626              :   nns_edge_conn_data_s *cdata, *prev;
     627              : 
     628            3 :   cdata = (nns_edge_conn_data_s *) eh->connections;
     629            3 :   prev = NULL;
     630              : 
     631            3 :   while (cdata) {
     632            3 :     if (cdata->id == client_id) {
     633            3 :       if (prev)
     634            0 :         prev->next = cdata->next;
     635              :       else
     636            3 :         eh->connections = cdata->next;
     637              : 
     638            3 :       _nns_edge_release_connection_data (cdata);
     639            3 :       return;
     640              :     }
     641            0 :     prev = cdata;
     642            0 :     cdata = cdata->next;
     643              :   }
     644              : }
     645              : 
     646              : /**
     647              :  * @brief Remove all connection data.
     648              :  * @note This function should be called with handle lock.
     649              :  */
     650              : static void
     651           38 : _nns_edge_remove_all_connection (nns_edge_handle_s * eh)
     652              : {
     653              :   nns_edge_conn_data_s *cdata, *next;
     654              : 
     655           38 :   cdata = (nns_edge_conn_data_s *) eh->connections;
     656           38 :   eh->connections = NULL;
     657              : 
     658           41 :   while (cdata) {
     659            3 :     next = cdata->next;
     660              : 
     661            3 :     _nns_edge_release_connection_data (cdata);
     662              : 
     663            3 :     cdata = next;
     664              :   }
     665           38 : }
     666              : 
     667              : /**
     668              :  * @brief Connect to requested socket.
     669              :  */
     670              : static bool
     671            6 : _nns_edge_connect_socket (nns_edge_conn_s * conn)
     672              : {
     673            6 :   struct sockaddr_in saddr = { 0 };
     674            6 :   socklen_t saddr_len = sizeof (struct sockaddr_in);
     675              : 
     676            6 :   if (!_fill_socket_addr (&saddr, conn->host, conn->port)) {
     677            0 :     nns_edge_loge ("Failed to connect socket, invalid host %s.", conn->host);
     678            6 :     return false;
     679              :   }
     680              : 
     681            6 :   conn->sockfd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
     682            6 :   if (conn->sockfd < 0) {
     683            0 :     nns_edge_loge ("Failed to create new socket.");
     684            0 :     return false;
     685              :   }
     686              : 
     687            6 :   _set_socket_option (conn->sockfd);
     688              : 
     689            6 :   if (connect (conn->sockfd, (struct sockaddr *) &saddr, saddr_len) < 0) {
     690            0 :     nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port);
     691            0 :     return false;
     692              :   }
     693              : 
     694            6 :   return true;
     695              : }
     696              : 
     697              : /**
     698              :  * @brief Message thread, receive buffer from the client.
     699              :  */
     700              : static void *
     701            6 : _nns_edge_message_handler (void *thread_data)
     702              : {
     703            6 :   nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data;
     704              :   nns_edge_handle_s *eh;
     705              :   nns_edge_conn_s *conn;
     706            6 :   bool remove_connection = false;
     707              :   int64_t client_id;
     708              :   int ret;
     709              : 
     710            6 :   if (!_tdata) {
     711            0 :     nns_edge_loge ("Internal error, thread data is null.");
     712            0 :     return NULL;
     713              :   }
     714              : 
     715            6 :   eh = (nns_edge_handle_s *) _tdata->eh;
     716            6 :   conn = _tdata->conn;
     717            6 :   client_id = _tdata->client_id;
     718            6 :   SAFE_FREE (_tdata);
     719              : 
     720            6 :   conn->running = true;
     721         1491 :   while (conn->running) {
     722              :     struct pollfd poll_fd;
     723              : 
     724              :     /* Validate edge handle */
     725         1488 :     if (!nns_edge_handle_is_valid (eh)) {
     726            0 :       nns_edge_loge ("The edge handle is invalid, it would be expired.");
     727            3 :       break;
     728              :     }
     729              : 
     730         1488 :     poll_fd.fd = conn->sockfd;
     731         1488 :     poll_fd.events = POLLIN | POLLHUP | POLLERR;
     732         1488 :     poll_fd.revents = 0;
     733              : 
     734              :     /* 10 milliseconds */
     735         1488 :     if (poll (&poll_fd, 1, 10) > 0) {
     736              :       nns_edge_cmd_s cmd;
     737              :       nns_edge_data_h data_h;
     738              :       char *val;
     739              :       unsigned int i;
     740              : 
     741              :       /* Receive data from the client */
     742           33 :       _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
     743           33 :       ret = _nns_edge_cmd_receive (conn, &cmd);
     744           33 :       if (ret != NNS_EDGE_ERROR_NONE) {
     745            0 :         nns_edge_loge ("Failed to receive data from the connected node.");
     746            0 :         remove_connection = true;
     747            3 :         break;
     748              :       }
     749              : 
     750           33 :       if (cmd.info.cmd == _NNS_EDGE_CMD_ERROR) {
     751            3 :         nns_edge_loge ("Received error, stop msg thread.");
     752            3 :         _nns_edge_cmd_clear (&cmd);
     753            3 :         remove_connection = true;
     754            3 :         break;
     755              :       }
     756              : 
     757           30 :       if (cmd.info.cmd != _NNS_EDGE_CMD_TRANSFER_DATA) {
     758              :         /** @todo handle other cmd later */
     759            0 :         _nns_edge_cmd_clear (&cmd);
     760            0 :         continue;
     761              :       }
     762              : 
     763           30 :       ret = nns_edge_data_create (&data_h);
     764           30 :       if (ret != NNS_EDGE_ERROR_NONE) {
     765            0 :         nns_edge_loge ("Failed to create data handle in msg thread.");
     766            0 :         _nns_edge_cmd_clear (&cmd);
     767            0 :         continue;
     768              :       }
     769              : 
     770           60 :       for (i = 0; i < cmd.info.num; i++)
     771           30 :         nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL);
     772              : 
     773           30 :       if (cmd.info.meta_size > 0)
     774           30 :         nns_edge_data_deserialize_meta (data_h, cmd.meta, cmd.info.meta_size);
     775              : 
     776              :       /* Set client ID in edge data */
     777           30 :       val = nns_edge_strdup_printf ("%lld", (long long) client_id);
     778           30 :       nns_edge_data_set_info (data_h, "client_id", val);
     779           30 :       SAFE_FREE (val);
     780              : 
     781           30 :       ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
     782              :           NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
     783              :           NULL);
     784           30 :       if (ret != NNS_EDGE_ERROR_NONE) {
     785              :         /* Try to get next request if server does not accept data from client. */
     786            0 :         nns_edge_logw ("The server does not accept data from client.");
     787              :       }
     788              : 
     789           30 :       nns_edge_data_destroy (data_h);
     790           30 :       _nns_edge_cmd_clear (&cmd);
     791              :     }
     792              :   }
     793            6 :   conn->running = false;
     794              : 
     795              :   /* Received error message from client, remove connection from table. */
     796            6 :   if (remove_connection) {
     797            3 :     nns_edge_loge
     798              :         ("Received error from client, remove connection of client (ID: %lld).",
     799              :         (long long) client_id);
     800            3 :     _nns_edge_remove_connection (eh, client_id);
     801            3 :     ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
     802              : 
     803            3 :     if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
     804            1 :       nns_edge_logi ("Connection lost! Reconnect to available node.");
     805            1 :       ret = _mqtt_hybrid_direct_connection (eh);
     806              :     }
     807              : 
     808            3 :     if (ret != NNS_EDGE_ERROR_NONE) {
     809            3 :       nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
     810              :           NNS_EDGE_EVENT_CONNECTION_CLOSED, NULL, 0, NULL);
     811              :     }
     812              :   }
     813              : 
     814            6 :   return NULL;
     815              : }
     816              : 
     817              : /**
     818              :  * @brief Create message handle thread.
     819              :  */
     820              : static int
     821            6 : _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
     822              :     int64_t client_id)
     823              : {
     824              :   int status;
     825            6 :   nns_edge_thread_data_s *thread_data = NULL;
     826              : 
     827              :   thread_data =
     828            6 :       (nns_edge_thread_data_s *) calloc (1, sizeof (nns_edge_thread_data_s));
     829            6 :   if (!thread_data) {
     830            0 :     nns_edge_loge ("Failed to allocate edge thread data.");
     831            0 :     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
     832              :   }
     833              : 
     834              :    /** Create message receiving thread */
     835            6 :   thread_data->eh = eh;
     836            6 :   thread_data->conn = conn;
     837            6 :   thread_data->client_id = client_id;
     838              : 
     839            6 :   status = pthread_create (&conn->msg_thread, NULL, _nns_edge_message_handler,
     840              :       thread_data);
     841              : 
     842            6 :   if (status != 0) {
     843            0 :     nns_edge_loge ("Failed to create message handler thread.");
     844            0 :     conn->running = false;
     845            0 :     conn->msg_thread = 0;
     846            0 :     SAFE_FREE (thread_data);
     847            0 :     return NNS_EDGE_ERROR_IO;
     848              :   }
     849              : 
     850            6 :   return NNS_EDGE_ERROR_NONE;
     851              : }
     852              : 
     853              : /**
     854              :  * @brief Thread to send data.
     855              :  */
     856              : static void *
     857            7 : _nns_edge_send_thread (void *thread_data)
     858              : {
     859            7 :   nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
     860              :   nns_edge_conn_data_s *conn_data;
     861              :   nns_edge_conn_s *conn;
     862              :   nns_edge_data_h data_h;
     863              :   nns_size_t data_size;
     864              :   int64_t client_id;
     865              :   char *val;
     866              :   int ret;
     867              : 
     868            7 :   nns_edge_lock (eh);
     869            7 :   eh->sending = true;
     870            7 :   nns_edge_cond_signal (eh);
     871            7 :   nns_edge_unlock (eh);
     872              : 
     873           43 :   while (eh->sending &&
     874           43 :       NNS_EDGE_ERROR_NONE == nns_edge_queue_wait_pop (eh->send_queue, 0U,
     875              :           &data_h, &data_size)) {
     876           36 :     if (!eh->sending) {
     877            0 :       nns_edge_data_destroy (data_h);
     878            0 :       break;
     879              :     }
     880              : 
     881              :     /* Send data to destination */
     882           36 :     switch (eh->connect_type) {
     883           30 :       case NNS_EDGE_CONNECT_TYPE_TCP:
     884              :       case NNS_EDGE_CONNECT_TYPE_HYBRID:
     885           30 :         ret = nns_edge_data_get_info (data_h, "client_id", &val);
     886           30 :         if (ret != NNS_EDGE_ERROR_NONE) {
     887            0 :           nns_edge_logd
     888              :               ("Cannot find client ID in edge data. Send to all connected nodes.");
     889              : 
     890            0 :           conn_data = (nns_edge_conn_data_s *) eh->connections;
     891            0 :           while (conn_data) {
     892            0 :             client_id = conn_data->id;
     893            0 :             conn = conn_data->sink_conn;
     894            0 :             ret = _nns_edge_transfer_data (conn, data_h, client_id);
     895            0 :             conn_data = conn_data->next;
     896              : 
     897            0 :             if (NNS_EDGE_ERROR_NONE != ret) {
     898            0 :               nns_edge_loge ("Failed to transfer data. Close the connection.");
     899            0 :               _nns_edge_remove_connection (eh, client_id);
     900              :             }
     901              :           }
     902              :         } else {
     903           30 :           client_id = (int64_t) strtoll (val, NULL, 10);
     904           30 :           SAFE_FREE (val);
     905              : 
     906           30 :           conn_data = _nns_edge_get_connection (eh, client_id);
     907           30 :           if (conn_data) {
     908           30 :             conn = conn_data->sink_conn;
     909           30 :             _nns_edge_transfer_data (conn, data_h, client_id);
     910              :           } else {
     911            0 :             nns_edge_loge
     912              :                 ("Cannot find connection, invalid client ID or connection closed.");
     913              :           }
     914              :         }
     915           30 :         break;
     916            5 :       case NNS_EDGE_CONNECT_TYPE_MQTT:
     917            5 :         ret = nns_edge_mqtt_publish_data (eh->broker_h, data_h);
     918            5 :         if (NNS_EDGE_ERROR_NONE != ret)
     919            0 :           nns_edge_loge ("Failed to send data via MQTT connection.");
     920            5 :         break;
     921            1 :       case NNS_EDGE_CONNECT_TYPE_CUSTOM:
     922            1 :         ret = nns_edge_custom_send_data (eh->custom_connection_h, data_h);
     923            1 :         if (NNS_EDGE_ERROR_NONE != ret)
     924            0 :           nns_edge_loge ("Failed to send data via custom connection.");
     925            1 :         break;
     926            0 :       default:
     927            0 :         break;
     928              :     }
     929           36 :     nns_edge_data_destroy (data_h);
     930              :   }
     931            7 :   eh->sending = false;
     932              : 
     933            7 :   return NULL;
     934              : }
     935              : 
     936              : /**
     937              :  * @brief Create thread to send data.
     938              :  * @note This should be called with lock.
     939              :  */
     940              : static int
     941            7 : _nns_edge_create_send_thread (nns_edge_handle_s * eh)
     942              : {
     943              :   int status;
     944              : 
     945            7 :   status = pthread_create (&eh->send_thread, NULL, _nns_edge_send_thread, eh);
     946              : 
     947            7 :   if (status != 0) {
     948            0 :     nns_edge_loge ("Failed to create sender thread.");
     949            0 :     eh->send_thread = 0;
     950            0 :     eh->sending = false;
     951            0 :     return NNS_EDGE_ERROR_IO;
     952              :   }
     953              : 
     954              :   /* Wait for starting thread. */
     955            7 :   nns_edge_cond_wait (eh);
     956              : 
     957            7 :   return NNS_EDGE_ERROR_NONE;
     958              : }
     959              : 
     960              : /**
     961              :  * @brief Connect to the destination node. (host:sender(sink) - dest:receiver(listener, src))
     962              :  */
     963              : static int
     964            6 : _nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id,
     965              :     const char *host, int port)
     966              : {
     967            6 :   nns_edge_conn_s *conn = NULL;
     968              :   nns_edge_conn_data_s *conn_data;
     969              :   nns_edge_cmd_s cmd;
     970              :   char *host_str;
     971            6 :   bool done = false;
     972              :   int ret;
     973              : 
     974            6 :   conn = (nns_edge_conn_s *) calloc (1, sizeof (nns_edge_conn_s));
     975            6 :   if (!conn) {
     976            0 :     nns_edge_loge ("Failed to allocate client data.");
     977            0 :     goto error;
     978              :   }
     979              : 
     980            6 :   conn->host = nns_edge_strdup (host);
     981            6 :   conn->port = port;
     982            6 :   conn->sockfd = -1;
     983              : 
     984            6 :   if (!_nns_edge_connect_socket (conn)) {
     985            0 :     goto error;
     986              :   }
     987              : 
     988            6 :   if ((NNS_EDGE_NODE_TYPE_QUERY_CLIENT == eh->node_type)
     989            3 :       || (NNS_EDGE_NODE_TYPE_SUB == eh->node_type)) {
     990              :     /* Receive capability and client ID from server. */
     991            3 :     _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
     992            3 :     ret = _nns_edge_cmd_receive (conn, &cmd);
     993            3 :     if (ret != NNS_EDGE_ERROR_NONE) {
     994            0 :       nns_edge_loge ("Failed to receive capability.");
     995            0 :       goto error;
     996              :     }
     997              : 
     998            3 :     if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
     999            0 :       nns_edge_loge ("Failed to get capability.");
    1000            0 :       _nns_edge_cmd_clear (&cmd);
    1001            0 :       goto error;
    1002              :     }
    1003              : 
    1004            3 :     client_id = eh->client_id = cmd.info.client_id;
    1005              : 
    1006              :     /* Check compatibility. */
    1007            3 :     ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
    1008              :         NNS_EDGE_EVENT_CAPABILITY, cmd.mem[0], cmd.info.mem_size[0], NULL);
    1009            3 :     _nns_edge_cmd_clear (&cmd);
    1010              : 
    1011            3 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1012            0 :       nns_edge_loge ("The event returns error, capability is not acceptable.");
    1013            0 :       _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
    1014              :     } else {
    1015              :       /* Send host and port to destination. */
    1016            3 :       _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
    1017              : 
    1018            3 :       host_str = nns_edge_get_host_string (eh->host, eh->port);
    1019            3 :       cmd.info.num = 1;
    1020            3 :       cmd.info.mem_size[0] = strlen (host_str) + 1;
    1021            3 :       cmd.mem[0] = host_str;
    1022              :     }
    1023              : 
    1024            3 :     ret = _nns_edge_cmd_send (conn, &cmd);
    1025            3 :     _nns_edge_cmd_clear (&cmd);
    1026              : 
    1027            3 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1028            0 :       nns_edge_loge ("Failed to send host info.");
    1029            0 :       goto error;
    1030              :     }
    1031              :   }
    1032              : 
    1033            6 :   if (NNS_EDGE_NODE_TYPE_SUB == eh->node_type) {
    1034            0 :     ret = _nns_edge_create_message_thread (eh, conn, client_id);
    1035            0 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1036            0 :       nns_edge_loge ("Failed to create message handle thread.");
    1037            0 :       goto error;
    1038              :     }
    1039              :   }
    1040              : 
    1041            6 :   conn_data = _nns_edge_add_connection (eh, client_id);
    1042            6 :   if (conn_data) {
    1043              :     /* Close old connection and set new one. */
    1044            6 :     _nns_edge_close_connection (conn_data->sink_conn);
    1045            6 :     conn_data->sink_conn = conn;
    1046            6 :     done = true;
    1047              :   }
    1048              : 
    1049            0 : error:
    1050            6 :   if (!done) {
    1051            0 :     _nns_edge_close_connection (conn);
    1052            6 :     return NNS_EDGE_ERROR_CONNECTION_FAILURE;
    1053              :   }
    1054              : 
    1055            6 :   return NNS_EDGE_ERROR_NONE;
    1056              : }
    1057              : 
    1058              : /**
    1059              :  * @brief Accept socket and create message thread in socket listener thread.
    1060              :  */
    1061              : static void
    1062            6 : _nns_edge_accept_socket (nns_edge_handle_s * eh)
    1063              : {
    1064            6 :   bool done = false;
    1065              :   nns_edge_conn_s *conn;
    1066              :   nns_edge_conn_data_s *conn_data;
    1067              :   nns_edge_cmd_s cmd;
    1068              :   int64_t client_id;
    1069            6 :   char *dest_host = NULL;
    1070              :   int dest_port, ret;
    1071              : 
    1072            6 :   conn = (nns_edge_conn_s *) calloc (1, sizeof (nns_edge_conn_s));
    1073            6 :   if (!conn) {
    1074            0 :     nns_edge_loge ("Failed to allocate edge connection.");
    1075            0 :     goto error;
    1076              :   }
    1077              : 
    1078            6 :   conn->sockfd = accept (eh->listener_fd, NULL, NULL);
    1079            6 :   if (conn->sockfd < 0) {
    1080            0 :     nns_edge_loge ("Failed to accept socket.");
    1081            0 :     goto error;
    1082              :   }
    1083              : 
    1084            6 :   _set_socket_option (conn->sockfd);
    1085              : 
    1086            6 :   if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
    1087            3 :       || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
    1088            3 :     client_id = nns_edge_generate_id ();
    1089              :   } else {
    1090            3 :     client_id = eh->client_id;
    1091              :   }
    1092              : 
    1093              :   /* Send capability and info to check compatibility. */
    1094            6 :   if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
    1095            3 :       || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
    1096            3 :     _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
    1097            3 :     cmd.info.num = 1;
    1098            3 :     cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
    1099            3 :     cmd.mem[0] = eh->caps_str;
    1100              : 
    1101            3 :     ret = _nns_edge_cmd_send (conn, &cmd);
    1102            3 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1103            0 :       nns_edge_loge ("Failed to send capability.");
    1104            0 :       goto error;
    1105              :     }
    1106              :   }
    1107              : 
    1108            6 :   if (NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type) {
    1109              :     /* Receive host info from destination. */
    1110            3 :     _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
    1111            3 :     ret = _nns_edge_cmd_receive (conn, &cmd);
    1112            3 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1113            0 :       nns_edge_loge ("Failed to receive node info.");
    1114            0 :       goto error;
    1115              :     }
    1116              : 
    1117            3 :     if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
    1118            0 :       nns_edge_loge ("Failed to get host info.");
    1119            0 :       _nns_edge_cmd_clear (&cmd);
    1120            0 :       goto error;
    1121              :     }
    1122              : 
    1123            3 :     nns_edge_parse_host_string (cmd.mem[0], &dest_host, &dest_port);
    1124            3 :     _nns_edge_cmd_clear (&cmd);
    1125              : 
    1126              :     /* Connect to client listener. */
    1127            3 :     ret = _nns_edge_connect_to (eh, client_id, dest_host, dest_port);
    1128            3 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1129            0 :       nns_edge_loge ("Failed to connect host %s:%d.", dest_host, dest_port);
    1130            0 :       goto error;
    1131              :     }
    1132              :   }
    1133              : 
    1134            6 :   conn_data = _nns_edge_add_connection (eh, client_id);
    1135            6 :   if (!conn_data) {
    1136            0 :     nns_edge_loge ("Failed to add client connection.");
    1137            0 :     goto error;
    1138              :   }
    1139              : 
    1140              :   /* Close old connection and set new one for each node type. */
    1141            6 :   if (eh->node_type == NNS_EDGE_NODE_TYPE_QUERY_CLIENT ||
    1142            3 :       eh->node_type == NNS_EDGE_NODE_TYPE_QUERY_SERVER) {
    1143            6 :     ret = _nns_edge_create_message_thread (eh, conn, client_id);
    1144            6 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1145            0 :       nns_edge_loge ("Failed to create message handle thread.");
    1146            0 :       goto error;
    1147              :     }
    1148            6 :     _nns_edge_close_connection (conn_data->src_conn);
    1149            6 :     conn_data->src_conn = conn;
    1150              :   } else {
    1151            0 :     _nns_edge_close_connection (conn_data->sink_conn);
    1152            0 :     conn_data->sink_conn = conn;
    1153              :   }
    1154              : 
    1155            6 :   ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
    1156              :       NNS_EDGE_EVENT_CONNECTION_COMPLETED, NULL, 0, NULL);
    1157            6 :   if (ret != NNS_EDGE_ERROR_NONE) {
    1158            0 :     nns_edge_loge ("Failed to send an event for new connection.");
    1159            0 :     goto error;
    1160              :   }
    1161            6 :   done = true;
    1162              : 
    1163            6 : error:
    1164            6 :   if (!done)
    1165            0 :     _nns_edge_close_connection (conn);
    1166              : 
    1167            6 :   SAFE_FREE (dest_host);
    1168            6 : }
    1169              : 
    1170              : /**
    1171              :  * @brief Socket listener thread.
    1172              :  */
    1173              : static void *
    1174            6 : _nns_edge_socket_listener_thread (void *thread_data)
    1175              : {
    1176            6 :   nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
    1177              : 
    1178            6 :   nns_edge_lock (eh);
    1179            6 :   eh->listening = true;
    1180            6 :   nns_edge_cond_signal (eh);
    1181            6 :   nns_edge_unlock (eh);
    1182              : 
    1183         1567 :   while (eh->listening) {
    1184              :     struct pollfd poll_fd;
    1185              : 
    1186         1561 :     poll_fd.fd = eh->listener_fd;
    1187         1561 :     poll_fd.events = POLLIN | POLLHUP | POLLERR;
    1188         1561 :     poll_fd.revents = 0;
    1189              : 
    1190              :     /* 10 milliseconds */
    1191         1561 :     if (poll (&poll_fd, 1, 10) > 0) {
    1192            6 :       if (!eh->listening)
    1193            0 :         break;
    1194              : 
    1195            6 :       if (poll_fd.revents & (POLLERR | POLLHUP)) {
    1196            0 :         nns_edge_loge ("Invalid state, possibly socket is closed in listener.");
    1197            0 :         break;
    1198              :       }
    1199              : 
    1200            6 :       if (poll_fd.revents & POLLIN)
    1201            6 :         _nns_edge_accept_socket (eh);
    1202              :     }
    1203              :   }
    1204            6 :   eh->listening = false;
    1205              : 
    1206            6 :   return NULL;
    1207              : }
    1208              : 
    1209              : /**
    1210              :  * @brief Create socket listener.
    1211              :  * @note This function should be called with handle lock.
    1212              :  */
    1213              : static bool
    1214            6 : _nns_edge_create_socket_listener (nns_edge_handle_s * eh)
    1215              : {
    1216            6 :   bool done = false;
    1217            6 :   struct sockaddr_in saddr = { 0 };
    1218            6 :   socklen_t saddr_len = sizeof (struct sockaddr_in);
    1219              :   int status;
    1220              : 
    1221            6 :   if (!_fill_socket_addr (&saddr, eh->host, eh->port)) {
    1222            0 :     nns_edge_loge ("Failed to create listener, invalid host: %s.", eh->host);
    1223            6 :     return false;
    1224              :   }
    1225              : 
    1226            6 :   eh->listener_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
    1227            6 :   if (eh->listener_fd < 0) {
    1228            0 :     nns_edge_loge ("Failed to create listener socket.");
    1229            0 :     return false;
    1230              :   }
    1231              : 
    1232            6 :   if (bind (eh->listener_fd, (struct sockaddr *) &saddr, saddr_len) < 0) {
    1233            0 :     nns_edge_loge ("Failed to create listener, cannot bind socket.");
    1234            0 :     goto error;
    1235              :   }
    1236              : 
    1237            6 :   if (listen (eh->listener_fd, N_BACKLOG) < 0) {
    1238            0 :     nns_edge_loge ("Failed to create listener, cannot listen socket.");
    1239            0 :     goto error;
    1240              :   }
    1241              : 
    1242            6 :   status = pthread_create (&eh->listener_thread, NULL,
    1243              :       _nns_edge_socket_listener_thread, eh);
    1244              : 
    1245            6 :   if (status != 0) {
    1246            0 :     nns_edge_loge ("Failed to create listener thread.");
    1247            0 :     eh->listening = false;
    1248            0 :     eh->listener_thread = 0;
    1249            0 :     goto error;
    1250              :   }
    1251              : 
    1252              :   /* Wait for the listener thread to be started */
    1253            6 :   nns_edge_cond_wait (eh);
    1254              : 
    1255            6 :   done = true;
    1256              : 
    1257            6 : error:
    1258            6 :   if (!done) {
    1259            0 :     close (eh->listener_fd);
    1260            0 :     eh->listener_fd = -1;
    1261              :   }
    1262              : 
    1263            6 :   return done;
    1264              : }
    1265              : 
    1266              : /**
    1267              :  * @brief Internal function to create edge handle.
    1268              :  */
    1269              : static int
    1270           36 : _nns_edge_create_handle (const char *id, nns_edge_node_type_e node_type,
    1271              :     nns_edge_h * edge_h)
    1272              : {
    1273           36 :   int ret = NNS_EDGE_ERROR_NONE;
    1274              :   nns_edge_handle_s *eh;
    1275              : 
    1276           36 :   eh = (nns_edge_handle_s *) calloc (1, sizeof (nns_edge_handle_s));
    1277           36 :   if (!eh) {
    1278            0 :     nns_edge_loge ("Failed to allocate memory for edge handle.");
    1279            0 :     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
    1280              :   }
    1281              : 
    1282           36 :   nns_edge_lock_init (eh);
    1283           36 :   nns_edge_cond_init (eh);
    1284           36 :   nns_edge_handle_set_magic (eh, NNS_EDGE_MAGIC);
    1285           36 :   eh->id = STR_IS_VALID (id) ? nns_edge_strdup (id) :
    1286            0 :       nns_edge_strdup_printf ("%lld", (long long) nns_edge_generate_id ());
    1287           36 :   eh->host = nns_edge_strdup ("localhost");
    1288           36 :   eh->port = 0;
    1289           36 :   eh->dest_host = nns_edge_strdup ("localhost");
    1290           36 :   eh->dest_port = 0;
    1291           36 :   eh->node_type = node_type;
    1292           36 :   eh->is_started = false;
    1293           36 :   eh->broker_h = NULL;
    1294           36 :   eh->connections = NULL;
    1295           36 :   eh->listening = false;
    1296           36 :   eh->sending = false;
    1297           36 :   eh->listener_fd = -1;
    1298           36 :   eh->caps_str = nns_edge_strdup ("");
    1299           36 :   eh->custom_connection_h = NULL;
    1300              : 
    1301           36 :   ret = nns_edge_metadata_create (&eh->metadata);
    1302           36 :   if (ret != NNS_EDGE_ERROR_NONE) {
    1303            0 :     nns_edge_loge ("Failed to create edge metadata.");
    1304            0 :     goto error;
    1305              :   }
    1306              : 
    1307           36 :   ret = nns_edge_queue_create (&eh->send_queue);
    1308           36 :   if (NNS_EDGE_ERROR_NONE != ret) {
    1309            0 :     nns_edge_loge ("Failed to create edge queue.");
    1310            0 :     goto error;
    1311              :   }
    1312              : 
    1313           36 : error:
    1314           36 :   if (ret == NNS_EDGE_ERROR_NONE)
    1315           36 :     *edge_h = eh;
    1316              :   else
    1317            0 :     nns_edge_release_handle (eh);
    1318              : 
    1319           36 :   return ret;
    1320              : }
    1321              : 
    1322              : /**
    1323              :  * @brief Create edge custom handle.
    1324              :  */
    1325              : int
    1326            5 : nns_edge_custom_create_handle (const char *id, const char *lib_path,
    1327              :     nns_edge_node_type_e node_type, nns_edge_h * edge_h)
    1328              : {
    1329            5 :   int ret = NNS_EDGE_ERROR_NONE;
    1330              :   nns_edge_handle_s *eh;
    1331              : 
    1332            5 :   if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) {
    1333            1 :     nns_edge_loge ("Invalid param, set exact node type.");
    1334            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1335              :   }
    1336              : 
    1337            4 :   if (!STR_IS_VALID (lib_path)) {
    1338            1 :     nns_edge_loge ("Invalid param, given custom lib path is invalid.");
    1339            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1340              :   }
    1341              : 
    1342            3 :   if (!edge_h) {
    1343            1 :     nns_edge_loge ("Invalid param, edge_h should not be null.");
    1344            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1345              :   }
    1346              : 
    1347            2 :   ret = _nns_edge_create_handle (id, node_type, edge_h);
    1348            2 :   if (ret != NNS_EDGE_ERROR_NONE) {
    1349            0 :     nns_edge_loge ("Failed to create edge handle.");
    1350            0 :     return ret;
    1351              :   }
    1352              : 
    1353            2 :   eh = (nns_edge_handle_s *) (*edge_h);
    1354            2 :   eh->connect_type = NNS_EDGE_CONNECT_TYPE_CUSTOM;
    1355              : 
    1356            2 :   ret = nns_edge_custom_load (lib_path, &eh->custom_connection_h);
    1357            2 :   if (ret != NNS_EDGE_ERROR_NONE)
    1358            0 :     nns_edge_release_handle (eh);
    1359              : 
    1360            2 :   return ret;
    1361              : }
    1362              : 
    1363              : /**
    1364              :  * @brief Create edge handle.
    1365              :  */
    1366              : int
    1367           37 : nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
    1368              :     nns_edge_node_type_e node_type, nns_edge_h * edge_h)
    1369              : {
    1370           37 :   int ret = NNS_EDGE_ERROR_NONE;
    1371              :   nns_edge_handle_s *eh;
    1372              : 
    1373           37 :   if (connect_type < 0 || connect_type >= NNS_EDGE_CONNECT_TYPE_UNKNOWN ||
    1374              :       connect_type == NNS_EDGE_CONNECT_TYPE_CUSTOM) {
    1375            1 :     nns_edge_loge ("Invalid param, set valid connect type.");
    1376            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1377              :   }
    1378              : 
    1379              :   /**
    1380              :    * @todo handle flag (receive | send)
    1381              :    * e.g., send only case: listener is unnecessary.
    1382              :    */
    1383           36 :   if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) {
    1384            1 :     nns_edge_loge ("Invalid param, set exact node type.");
    1385            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1386              :   }
    1387              : 
    1388           35 :   if (!edge_h) {
    1389            1 :     nns_edge_loge ("Invalid param, edge_h should not be null.");
    1390            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1391              :   }
    1392              : 
    1393           34 :   ret = _nns_edge_create_handle (id, node_type, edge_h);
    1394           34 :   if (ret != NNS_EDGE_ERROR_NONE) {
    1395            0 :     nns_edge_loge ("Failed to create edge handle.");
    1396            0 :     return ret;
    1397              :   }
    1398              : 
    1399           34 :   eh = (nns_edge_handle_s *) (*edge_h);
    1400           34 :   eh->connect_type = connect_type;
    1401              : 
    1402           34 :   return NNS_EDGE_ERROR_NONE;
    1403              : }
    1404              : 
    1405              : /**
    1406              :  * @brief Start the nnstreamer edge.
    1407              :  */
    1408              : int
    1409           11 : nns_edge_start (nns_edge_h edge_h)
    1410              : {
    1411              :   nns_edge_handle_s *eh;
    1412           11 :   int ret = NNS_EDGE_ERROR_NONE;
    1413              : 
    1414           11 :   eh = (nns_edge_handle_s *) edge_h;
    1415           11 :   if (!eh) {
    1416            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1417            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1418              :   }
    1419              : 
    1420           10 :   if (!nns_edge_handle_is_valid (eh)) {
    1421            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1422            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1423              :   }
    1424              : 
    1425            9 :   nns_edge_lock (eh);
    1426              : 
    1427            9 :   if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    1428            1 :     ret = nns_edge_custom_start (eh->custom_connection_h);
    1429            1 :     if (NNS_EDGE_ERROR_NONE == ret)
    1430            1 :       ret = _nns_edge_create_send_thread (eh);
    1431              : 
    1432            1 :     if (NNS_EDGE_ERROR_NONE != ret)
    1433            0 :       nns_edge_loge ("Failed to start edge custom connection.");
    1434            1 :     goto done;
    1435              :   }
    1436              : 
    1437            8 :   if (eh->port <= 0) {
    1438            6 :     eh->port = nns_edge_get_available_port ();
    1439            6 :     if (eh->port <= 0) {
    1440            0 :       nns_edge_loge ("Failed to start edge. Cannot get available port.");
    1441            0 :       nns_edge_unlock (eh);
    1442            0 :       return NNS_EDGE_ERROR_CONNECTION_FAILURE;
    1443              :     }
    1444              :   }
    1445              : 
    1446            8 :   if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
    1447            6 :       || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
    1448            3 :     if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
    1449            2 :         || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
    1450              :       char *topic;
    1451              : 
    1452              :       /** @todo Set unique device name.
    1453              :        * Device name should be unique. Consider using MAC address later.
    1454              :        * Now, use ID received from the user.
    1455              :        */
    1456            2 :       topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/",
    1457              :           eh->id, eh->topic);
    1458              : 
    1459            4 :       ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
    1460            2 :           &eh->broker_h);
    1461            2 :       SAFE_FREE (topic);
    1462              : 
    1463            2 :       if (NNS_EDGE_ERROR_NONE != ret) {
    1464            0 :         nns_edge_loge
    1465              :             ("Failed to start nnstreamer-edge, cannot connect to broker.");
    1466            0 :         goto done;
    1467              :       }
    1468              : 
    1469            2 :       if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
    1470              :         char *msg;
    1471            1 :         msg = nns_edge_get_host_string (eh->host, eh->port);
    1472              : 
    1473            1 :         ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
    1474            1 :         SAFE_FREE (msg);
    1475              : 
    1476            1 :         if (NNS_EDGE_ERROR_NONE != ret) {
    1477            0 :           nns_edge_loge ("Failed to publish the message to broker.");
    1478            0 :           goto done;
    1479              :         }
    1480              :       } else {
    1481            1 :         ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
    1482              :             eh->user_data);
    1483            1 :         if (NNS_EDGE_ERROR_NONE != ret) {
    1484            0 :           nns_edge_loge ("Failed to set event callback to MQTT broker.");
    1485            0 :           goto done;
    1486              :         }
    1487              :       }
    1488              :     }
    1489              :   }
    1490              : 
    1491            8 :   if ((NNS_EDGE_NODE_TYPE_QUERY_CLIENT == eh->node_type)
    1492            5 :       || (NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
    1493            3 :       || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
    1494              :     /* Start listener thread to accept socket. */
    1495            6 :     if (!_nns_edge_create_socket_listener (eh)) {
    1496            0 :       nns_edge_loge ("Failed to create socket listener.");
    1497            0 :       ret = NNS_EDGE_ERROR_IO;
    1498            0 :       goto done;
    1499              :     }
    1500              : 
    1501            6 :     ret = _nns_edge_create_send_thread (eh);
    1502              :   }
    1503              : 
    1504            2 : done:
    1505            9 :   eh->is_started = (ret == NNS_EDGE_ERROR_NONE);
    1506            9 :   nns_edge_unlock (eh);
    1507            9 :   return ret;
    1508              : }
    1509              : 
    1510              : /**
    1511              :  * @brief Stop the nnstreamer edge.
    1512              :  */
    1513              : int
    1514           37 : nns_edge_stop (nns_edge_h edge_h)
    1515              : {
    1516              :   nns_edge_handle_s *eh;
    1517           37 :   int ret = NNS_EDGE_ERROR_NONE;
    1518              : 
    1519           37 :   eh = (nns_edge_handle_s *) edge_h;
    1520           37 :   if (!eh) {
    1521            0 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1522            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1523              :   }
    1524              : 
    1525           37 :   if (!nns_edge_handle_is_valid (eh)) {
    1526            0 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1527            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1528              :   }
    1529              : 
    1530           37 :   nns_edge_lock (eh);
    1531           37 :   if (!eh->is_started) {
    1532           28 :     nns_edge_logi ("Edge is not started yet. Nothing to stop.");
    1533           28 :     goto done;
    1534              :   }
    1535              : 
    1536            9 :   if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    1537            1 :     ret = nns_edge_custom_stop (eh->custom_connection_h);
    1538              :   }
    1539              : 
    1540            9 :   if (NNS_EDGE_ERROR_NONE == ret)
    1541            9 :     eh->is_started = FALSE;
    1542              : 
    1543            0 : done:
    1544           37 :   nns_edge_unlock (eh);
    1545           37 :   return ret;
    1546              : }
    1547              : 
    1548              : /**
    1549              :  * @brief Release the given handle.
    1550              :  */
    1551              : int
    1552           38 : nns_edge_release_handle (nns_edge_h edge_h)
    1553              : {
    1554              :   nns_edge_handle_s *eh;
    1555              : 
    1556           38 :   eh = (nns_edge_handle_s *) edge_h;
    1557           38 :   if (!eh) {
    1558            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1559            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1560              :   }
    1561              : 
    1562           37 :   if (!nns_edge_handle_is_valid (eh)) {
    1563            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1564            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1565              :   }
    1566              : 
    1567           36 :   nns_edge_stop (eh);
    1568              : 
    1569           36 :   nns_edge_lock (eh);
    1570              : 
    1571              :   /* Clear message queue and stop thread first */
    1572           36 :   nns_edge_queue_clear (eh->send_queue);
    1573              : 
    1574           36 :   eh->sending = false;
    1575           36 :   if (eh->send_thread) {
    1576            7 :     pthread_join (eh->send_thread, NULL);
    1577            7 :     eh->send_thread = 0;
    1578              :   }
    1579              : 
    1580           36 :   eh->listening = false;
    1581           36 :   if (eh->listener_thread) {
    1582            6 :     pthread_join (eh->listener_thread, NULL);
    1583            6 :     eh->listener_thread = 0;
    1584              :   }
    1585              : 
    1586           36 :   if (eh->listener_fd >= 0) {
    1587            6 :     close (eh->listener_fd);
    1588            6 :     eh->listener_fd = -1;
    1589              :   }
    1590              : 
    1591           36 :   _nns_edge_remove_all_connection (eh);
    1592              : 
    1593           36 :   switch (eh->connect_type) {
    1594            5 :     case NNS_EDGE_CONNECT_TYPE_HYBRID:
    1595              :     case NNS_EDGE_CONNECT_TYPE_MQTT:
    1596            5 :       if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) {
    1597            0 :         nns_edge_logw ("Failed to close mqtt connection.");
    1598              :       }
    1599            5 :       break;
    1600            2 :     case NNS_EDGE_CONNECT_TYPE_CUSTOM:
    1601            2 :       if (nns_edge_custom_release (eh->custom_connection_h) !=
    1602              :           NNS_EDGE_ERROR_NONE) {
    1603            0 :         nns_edge_logw ("Failed to close custom connection.");
    1604              :       }
    1605            2 :       break;
    1606           29 :     default:
    1607           29 :       break;
    1608              :   }
    1609              : 
    1610              :   /* Clear event callback and handles */
    1611           36 :   nns_edge_handle_set_magic (eh, NNS_EDGE_MAGIC_DEAD);
    1612           36 :   eh->event_cb = NULL;
    1613           36 :   eh->user_data = NULL;
    1614           36 :   eh->broker_h = NULL;
    1615           36 :   eh->custom_connection_h = NULL;
    1616              : 
    1617           36 :   nns_edge_queue_destroy (eh->send_queue);
    1618           36 :   eh->send_queue = NULL;
    1619           36 :   nns_edge_metadata_destroy (eh->metadata);
    1620           36 :   eh->metadata = NULL;
    1621           36 :   SAFE_FREE (eh->id);
    1622           36 :   SAFE_FREE (eh->topic);
    1623           36 :   SAFE_FREE (eh->host);
    1624           36 :   SAFE_FREE (eh->dest_host);
    1625           36 :   SAFE_FREE (eh->caps_str);
    1626              : 
    1627           36 :   nns_edge_unlock (eh);
    1628           36 :   nns_edge_cond_destroy (eh);
    1629           36 :   nns_edge_lock_destroy (eh);
    1630           36 :   SAFE_FREE (eh);
    1631              : 
    1632           36 :   return NNS_EDGE_ERROR_NONE;
    1633              : }
    1634              : 
    1635              : /**
    1636              :  * @brief Set the event callback.
    1637              :  */
    1638              : int
    1639           17 : nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
    1640              :     void *user_data)
    1641              : {
    1642              :   nns_edge_handle_s *eh;
    1643              :   int ret;
    1644              : 
    1645           17 :   eh = (nns_edge_handle_s *) edge_h;
    1646           17 :   if (!eh) {
    1647            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1648            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1649              :   }
    1650              : 
    1651           16 :   if (!nns_edge_handle_is_valid (eh)) {
    1652            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1653            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1654              :   }
    1655              : 
    1656           15 :   nns_edge_lock (eh);
    1657              : 
    1658           15 :   ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
    1659              :       NNS_EDGE_EVENT_CALLBACK_RELEASED, NULL, 0, NULL);
    1660           15 :   if (ret != NNS_EDGE_ERROR_NONE) {
    1661            0 :     nns_edge_loge ("Failed to set new event callback.");
    1662            0 :     goto error;
    1663              :   }
    1664              : 
    1665           15 :   if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    1666            1 :     ret = nns_edge_custom_set_event_callback (eh->custom_connection_h,
    1667              :         cb, user_data);
    1668            1 :     if (NNS_EDGE_ERROR_NONE != ret) {
    1669            0 :       goto error;
    1670              :     }
    1671              :   }
    1672              : 
    1673           15 :   eh->event_cb = cb;
    1674           15 :   eh->user_data = user_data;
    1675              : 
    1676           15 : error:
    1677           15 :   nns_edge_unlock (eh);
    1678           15 :   return ret;
    1679              : }
    1680              : 
    1681              : /**
    1682              :  * @brief Parse the message received from the MQTT broker and connect to the server directly.
    1683              :  */
    1684              : static int
    1685            2 : _mqtt_hybrid_direct_connection (nns_edge_handle_s * eh)
    1686              : {
    1687              :   int ret;
    1688              : 
    1689            0 :   do {
    1690            2 :     char *msg = NULL;
    1691            2 :     char *server_ip = NULL;
    1692            2 :     int server_port = 0;
    1693            2 :     nns_size_t msg_len = 0;
    1694              : 
    1695              :     ret =
    1696            2 :         nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len, 0U);
    1697            2 :     if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
    1698              :       break;
    1699              : 
    1700            1 :     nns_edge_parse_host_string (msg, &server_ip, &server_port);
    1701            1 :     SAFE_FREE (msg);
    1702              : 
    1703            1 :     nns_edge_logd ("Parsed server info: Server [%s:%d] ", server_ip,
    1704              :         server_port);
    1705              : 
    1706            1 :     ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
    1707            1 :     SAFE_FREE (server_ip);
    1708              : 
    1709            1 :     if (NNS_EDGE_ERROR_NONE == ret)
    1710            1 :       break;
    1711              :   } while (TRUE);
    1712              : 
    1713            2 :   return ret;
    1714              : }
    1715              : 
    1716              : /**
    1717              :  * @brief Start subscription to MQTT message
    1718              :  */
    1719              : static int
    1720            3 : _nns_edge_start_mqtt_sub (nns_edge_handle_s * eh)
    1721              : {
    1722              :   char *topic;
    1723              :   int ret;
    1724              : 
    1725            3 :   if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
    1726            3 :     topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
    1727              : 
    1728            6 :     ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
    1729            3 :         &eh->broker_h);
    1730            3 :     SAFE_FREE (topic);
    1731              : 
    1732            3 :     if (NNS_EDGE_ERROR_NONE != ret) {
    1733            0 :       return NNS_EDGE_ERROR_CONNECTION_FAILURE;
    1734              :     }
    1735              : 
    1736            3 :     ret = nns_edge_mqtt_subscribe (eh->broker_h);
    1737            3 :     if (NNS_EDGE_ERROR_NONE != ret) {
    1738            0 :       nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
    1739            0 :       return ret;
    1740              :     }
    1741              :   }
    1742              : 
    1743            3 :   return NNS_EDGE_ERROR_NONE;
    1744              : }
    1745              : 
    1746              : /**
    1747              :  * @brief Connect to the destination node.
    1748              :  */
    1749              : int
    1750           15 : nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
    1751              : {
    1752              :   nns_edge_handle_s *eh;
    1753           15 :   int ret = NNS_EDGE_ERROR_NONE;
    1754              : 
    1755           15 :   eh = (nns_edge_handle_s *) edge_h;
    1756           15 :   if (!eh) {
    1757            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1758            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1759              :   }
    1760              : 
    1761           14 :   if (!STR_IS_VALID (dest_host)) {
    1762            2 :     nns_edge_loge ("Invalid param, given host is invalid.");
    1763            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1764              :   }
    1765              : 
    1766           12 :   if (!PORT_IS_VALID (dest_port)) {
    1767            3 :     nns_edge_loge ("Invalid port number %d.", dest_port);
    1768            3 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1769              :   }
    1770              : 
    1771            9 :   if (!nns_edge_handle_is_valid (eh)) {
    1772            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1773            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1774              :   }
    1775              : 
    1776            8 :   nns_edge_lock (eh);
    1777            8 :   if (!eh->is_started) {
    1778            1 :     nns_edge_loge ("Invalid state, the edge handle is not started.");
    1779            1 :     nns_edge_unlock (eh);
    1780            1 :     return NNS_EDGE_ERROR_IO;
    1781              :   }
    1782              : 
    1783            7 :   if (!eh->event_cb) {
    1784            0 :     nns_edge_loge ("NNStreamer-edge event callback is not registered.");
    1785            0 :     nns_edge_unlock (eh);
    1786            0 :     return NNS_EDGE_ERROR_CONNECTION_FAILURE;
    1787              :   }
    1788              : 
    1789            7 :   if (NNS_EDGE_ERROR_NONE == nns_edge_is_connected (eh)) {
    1790            0 :     nns_edge_logi ("NNStreamer-edge is already connected.");
    1791            0 :     nns_edge_unlock (eh);
    1792            0 :     return NNS_EDGE_ERROR_NONE;
    1793              :   }
    1794              : 
    1795            7 :   SAFE_FREE (eh->dest_host);
    1796            7 :   eh->dest_host = nns_edge_strdup (dest_host);
    1797            7 :   eh->dest_port = dest_port;
    1798              : 
    1799            7 :   if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
    1800            6 :       || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
    1801            3 :     if (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)
    1802            0 :       goto done;
    1803            3 :     ret = _nns_edge_start_mqtt_sub (eh);
    1804            3 :     if (NNS_EDGE_ERROR_NONE != ret)
    1805            0 :       goto done;
    1806              : 
    1807            3 :     if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
    1808            1 :       ret = _mqtt_hybrid_direct_connection (eh);
    1809              :     } else {
    1810            2 :       ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
    1811              :           eh->user_data);
    1812            2 :       if (NNS_EDGE_ERROR_NONE != ret) {
    1813            0 :         nns_edge_loge ("Failed to set event callback to MQTT broker.");
    1814            0 :         goto done;
    1815              :       }
    1816              :     }
    1817            4 :   } else if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    1818            2 :     ret = nns_edge_custom_connect (eh->custom_connection_h);
    1819            2 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1820            0 :       goto done;
    1821              :     }
    1822              :   } else {
    1823            2 :     if (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)
    1824            0 :       goto done;
    1825            2 :     ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port);
    1826            2 :     if (ret != NNS_EDGE_ERROR_NONE) {
    1827            0 :       nns_edge_loge ("Failed to connect to %s:%d", dest_host, dest_port);
    1828              :     }
    1829              :   }
    1830              : 
    1831            2 : done:
    1832            7 :   nns_edge_unlock (eh);
    1833            7 :   return ret;
    1834              : }
    1835              : 
    1836              : /**
    1837              :  * @brief Disconnect from the destination node.
    1838              :  */
    1839              : int
    1840            5 : nns_edge_disconnect (nns_edge_h edge_h)
    1841              : {
    1842              :   nns_edge_handle_s *eh;
    1843            5 :   int ret = NNS_EDGE_ERROR_NONE;
    1844              : 
    1845            5 :   eh = (nns_edge_handle_s *) edge_h;
    1846            5 :   if (!eh) {
    1847            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1848            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1849              :   }
    1850              : 
    1851            4 :   if (!nns_edge_handle_is_valid (eh)) {
    1852            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1853            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1854              :   }
    1855              : 
    1856            3 :   nns_edge_lock (eh);
    1857            3 :   if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    1858            1 :     ret = nns_edge_custom_disconnect (eh->custom_connection_h);
    1859              :   } else {
    1860            2 :     _nns_edge_remove_all_connection (eh);
    1861              :   }
    1862            3 :   nns_edge_unlock (eh);
    1863              : 
    1864            3 :   return ret;
    1865              : }
    1866              : 
    1867              : /**
    1868              :  * @brief Check whether edge is connected or not.
    1869              :  */
    1870              : int
    1871           47 : nns_edge_is_connected (nns_edge_h edge_h)
    1872              : {
    1873           47 :   nns_edge_handle_s *eh = (nns_edge_handle_s *) edge_h;
    1874              :   nns_edge_conn_data_s *conn_data;
    1875              :   nns_edge_conn_s *conn;
    1876              : 
    1877           47 :   if (!eh) {
    1878            0 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1879            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1880              :   }
    1881              : 
    1882           47 :   if (!nns_edge_handle_is_valid (eh)) {
    1883            0 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1884            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1885              :   }
    1886              : 
    1887           47 :   if (NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type &&
    1888            7 :       nns_edge_mqtt_is_connected (eh->broker_h))
    1889            5 :     return NNS_EDGE_ERROR_NONE;
    1890              : 
    1891           42 :   if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    1892            7 :     return nns_edge_custom_is_connected (eh->custom_connection_h);
    1893              :   }
    1894              : 
    1895           35 :   conn_data = (nns_edge_conn_data_s *) eh->connections;
    1896           35 :   while (conn_data) {
    1897           30 :     conn = conn_data->sink_conn;
    1898           30 :     if (_nns_edge_check_connection (conn)) {
    1899           30 :       return NNS_EDGE_ERROR_NONE;
    1900              :     }
    1901            0 :     conn_data = conn_data->next;
    1902              :   }
    1903              : 
    1904            5 :   return NNS_EDGE_ERROR_CONNECTION_FAILURE;
    1905              : }
    1906              : 
    1907              : /**
    1908              :  * @brief Send data to destination (broker or connected node), asynchronously.
    1909              :  */
    1910              : int
    1911           39 : nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h)
    1912              : {
    1913           39 :   int ret = NNS_EDGE_ERROR_NONE;
    1914              :   nns_edge_handle_s *eh;
    1915              :   nns_edge_data_h new_data_h;
    1916              : 
    1917           39 :   eh = (nns_edge_handle_s *) edge_h;
    1918           39 :   if (!eh) {
    1919            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1920           39 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1921              :   }
    1922              : 
    1923           38 :   if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) {
    1924            1 :     nns_edge_loge ("Invalid param, given edge data is invalid.");
    1925            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1926              :   }
    1927              : 
    1928           37 :   if (!nns_edge_handle_is_valid (eh)) {
    1929            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1930            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1931              :   }
    1932              : 
    1933           36 :   nns_edge_lock (eh);
    1934              : 
    1935           36 :   if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (eh)) {
    1936            0 :     nns_edge_loge ("There is no available connection.");
    1937            0 :     nns_edge_unlock (eh);
    1938            0 :     return NNS_EDGE_ERROR_IO;
    1939              :   }
    1940              : 
    1941           36 :   if (!eh->send_thread) {
    1942            0 :     nns_edge_loge ("Invalid state, start edge before sending a data.");
    1943            0 :     nns_edge_unlock (eh);
    1944            0 :     return NNS_EDGE_ERROR_IO;
    1945              :   }
    1946              : 
    1947              :   /* Create new data handle and push it into send-queue. */
    1948           36 :   ret = nns_edge_data_copy (data_h, &new_data_h);
    1949           36 :   if (NNS_EDGE_ERROR_NONE != ret) {
    1950            0 :     nns_edge_loge ("Failed to send data, cannot copy data.");
    1951            0 :     nns_edge_unlock (eh);
    1952            0 :     return ret;
    1953              :   }
    1954              : 
    1955           36 :   ret = nns_edge_queue_push (eh->send_queue, new_data_h,
    1956              :       sizeof (nns_edge_data_h), nns_edge_data_release_handle);
    1957           36 :   if (NNS_EDGE_ERROR_NONE != ret) {
    1958            0 :     nns_edge_loge ("Failed to send data, cannot push data into queue.");
    1959            0 :     nns_edge_data_destroy (new_data_h);
    1960              :   }
    1961              : 
    1962           36 :   nns_edge_unlock (eh);
    1963           36 :   return ret;
    1964              : }
    1965              : 
    1966              : /**
    1967              :  * @brief Set nnstreamer edge info.
    1968              :  */
    1969              : int
    1970           41 : nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
    1971              : {
    1972              :   nns_edge_handle_s *eh;
    1973           41 :   int ret = NNS_EDGE_ERROR_NONE;
    1974              : 
    1975           41 :   eh = (nns_edge_handle_s *) edge_h;
    1976           41 :   if (!eh) {
    1977            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    1978            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1979              :   }
    1980              : 
    1981           40 :   if (!STR_IS_VALID (key)) {
    1982            2 :     nns_edge_loge ("Invalid param, given key is invalid.");
    1983            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1984              :   }
    1985              : 
    1986           38 :   if (!STR_IS_VALID (value)) {
    1987            2 :     nns_edge_loge ("Invalid param, given value is invalid.");
    1988            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1989              :   }
    1990              : 
    1991           36 :   if (!nns_edge_handle_is_valid (eh)) {
    1992            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    1993            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    1994              :   }
    1995              : 
    1996           35 :   nns_edge_lock (eh);
    1997              : 
    1998           35 :   if (0 == strcasecmp (key, "CAPS") || 0 == strcasecmp (key, "CAPABILITY")) {
    1999            6 :     SAFE_FREE (eh->caps_str);
    2000            6 :     eh->caps_str = nns_edge_strdup (value);
    2001           29 :   } else if (0 == strcasecmp (key, "IP") || 0 == strcasecmp (key, "HOST")) {
    2002            3 :     SAFE_FREE (eh->host);
    2003            3 :     eh->host = nns_edge_strdup (value);
    2004           26 :   } else if (0 == strcasecmp (key, "PORT")) {
    2005            5 :     int port = nns_edge_parse_port_number (value);
    2006              : 
    2007            5 :     if (port < 0) {
    2008            2 :       ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
    2009              :     } else {
    2010            3 :       eh->port = port;
    2011              :     }
    2012           21 :   } else if (0 == strcasecmp (key, "DEST_IP")
    2013           19 :       || 0 == strcasecmp (key, "DEST_HOST")) {
    2014            3 :     SAFE_FREE (eh->dest_host);
    2015            3 :     eh->dest_host = nns_edge_strdup (value);
    2016           18 :   } else if (0 == strcasecmp (key, "DEST_PORT")) {
    2017            3 :     int port = nns_edge_parse_port_number (value);
    2018              : 
    2019            3 :     if (port < 0) {
    2020            0 :       ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
    2021              :     } else {
    2022            3 :       eh->dest_port = port;
    2023              :     }
    2024           15 :   } else if (0 == strcasecmp (key, "TOPIC")) {
    2025            6 :     SAFE_FREE (eh->topic);
    2026            6 :     eh->topic = nns_edge_strdup (value);
    2027            9 :   } else if (0 == strcasecmp (key, "ID") || 0 == strcasecmp (key, "CLIENT_ID")) {
    2028              :     /* Not allowed key */
    2029            2 :     nns_edge_loge ("Cannot update %s.", key);
    2030            2 :     ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
    2031            7 :   } else if (0 == strcasecmp (key, "QUEUE_SIZE")) {
    2032              :     char *s;
    2033              :     unsigned int limit;
    2034            3 :     nns_edge_queue_leak_e leaky = NNS_EDGE_QUEUE_LEAK_NEW;
    2035              : 
    2036            3 :     s = strstr (value, ":");
    2037            3 :     if (s) {
    2038            3 :       char *v = nns_edge_strndup (value, s - value);
    2039              : 
    2040            3 :       limit = (unsigned int) strtoull (v, NULL, 10);
    2041            3 :       SAFE_FREE (v);
    2042              : 
    2043            3 :       if (strcasecmp (s + 1, "NEW") == 0) {
    2044            1 :         leaky = NNS_EDGE_QUEUE_LEAK_NEW;
    2045            2 :       } else if (strcasecmp (s + 1, "OLD") == 0) {
    2046            1 :         leaky = NNS_EDGE_QUEUE_LEAK_OLD;
    2047              :       } else {
    2048            1 :         nns_edge_loge ("Cannot set queue leaky option (%s).", s + 1);
    2049            1 :         ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
    2050              :       }
    2051              :     } else {
    2052            0 :       limit = (unsigned int) strtoull (value, NULL, 10);
    2053              :     }
    2054              : 
    2055            3 :     if (ret == NNS_EDGE_ERROR_NONE)
    2056            2 :       nns_edge_queue_set_limit (eh->send_queue, limit, leaky);
    2057              :   } else {
    2058            4 :     ret = nns_edge_metadata_set (eh->metadata, key, value);
    2059              :   }
    2060              : 
    2061           35 :   if (ret == NNS_EDGE_ERROR_NONE &&
    2062           30 :       NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    2063              :     /* Pass value to custom library and ignore error. */
    2064            1 :     if (nns_edge_custom_set_info (eh->custom_connection_h, key, value) !=
    2065              :         NNS_EDGE_ERROR_NONE) {
    2066            0 :       nns_edge_logw ("Failed to set info '%s' in custom connection.", key);
    2067              :     }
    2068              :   }
    2069              : 
    2070           35 :   nns_edge_unlock (eh);
    2071           35 :   return ret;
    2072              : }
    2073              : 
    2074              : /**
    2075              :  * @brief Get nnstreamer edge info.
    2076              :  */
    2077              : int
    2078           20 : nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value)
    2079              : {
    2080              :   nns_edge_handle_s *eh;
    2081           20 :   int ret = NNS_EDGE_ERROR_NONE;
    2082              : 
    2083           20 :   eh = (nns_edge_handle_s *) edge_h;
    2084           20 :   if (!eh) {
    2085            1 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    2086            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2087              :   }
    2088              : 
    2089           19 :   if (!STR_IS_VALID (key)) {
    2090            2 :     nns_edge_loge ("Invalid param, given key is invalid.");
    2091            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2092              :   }
    2093              : 
    2094           17 :   if (!value) {
    2095            2 :     nns_edge_loge ("Invalid param, value should not be null.");
    2096            2 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2097              :   }
    2098              : 
    2099              :   /* Init null */
    2100           15 :   *value = NULL;
    2101              : 
    2102           15 :   if (!nns_edge_handle_is_valid (eh)) {
    2103            1 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    2104            1 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2105              :   }
    2106              : 
    2107           14 :   nns_edge_lock (eh);
    2108              : 
    2109           14 :   if (0 == strcasecmp (key, "CAPS") || 0 == strcasecmp (key, "CAPABILITY")) {
    2110            1 :     *value = nns_edge_strdup (eh->caps_str);
    2111           13 :   } else if (0 == strcasecmp (key, "IP") || 0 == strcasecmp (key, "HOST")) {
    2112            1 :     *value = nns_edge_strdup (eh->host);
    2113           12 :   } else if (0 == strcasecmp (key, "PORT")) {
    2114            1 :     *value = nns_edge_strdup_printf ("%d", eh->port);
    2115           11 :   } else if (0 == strcasecmp (key, "TOPIC")) {
    2116            1 :     *value = nns_edge_strdup (eh->topic);
    2117           10 :   } else if (0 == strcasecmp (key, "ID")) {
    2118            1 :     *value = nns_edge_strdup (eh->id);
    2119            9 :   } else if (0 == strcasecmp (key, "DEST_IP")
    2120            8 :       || 0 == strcasecmp (key, "DEST_HOST")) {
    2121            1 :     *value = nns_edge_strdup (eh->dest_host);
    2122            8 :   } else if (0 == strcasecmp (key, "DEST_PORT")) {
    2123            1 :     *value = nns_edge_strdup_printf ("%d", eh->dest_port);
    2124            7 :   } else if (0 == strcasecmp (key, "CLIENT_ID")) {
    2125            3 :     if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
    2126            3 :         || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
    2127            0 :       nns_edge_loge ("Cannot get the client ID, it was started as a server.");
    2128            0 :       ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
    2129              :     } else {
    2130            3 :       *value = nns_edge_strdup_printf ("%lld", (long long) eh->client_id);
    2131              :     }
    2132              :   } else {
    2133            4 :     ret = nns_edge_metadata_get (eh->metadata, key, value);
    2134              :   }
    2135              : 
    2136           14 :   if (ret == NNS_EDGE_ERROR_NONE &&
    2137           14 :       NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    2138            1 :     char *val = NULL;
    2139              : 
    2140            1 :     if (nns_edge_custom_get_info (eh->custom_connection_h, key, &val) ==
    2141              :         NNS_EDGE_ERROR_NONE) {
    2142              :       /* Replace value from custom library. */
    2143            1 :       SAFE_FREE (*value);
    2144            1 :       *value = val;
    2145              :     }
    2146              :   }
    2147              : 
    2148           14 :   nns_edge_unlock (eh);
    2149              : 
    2150           14 :   if (ret != NNS_EDGE_ERROR_NONE)
    2151            0 :     SAFE_FREE (*value);
    2152              : 
    2153           14 :   return ret;
    2154              : }
    2155              : 
    2156              : /**
    2157              :  * @brief Start discovery connectable devices within the network.
    2158              :  */
    2159            1 : int nns_edge_start_discovery (nns_edge_h edge_h)
    2160              : {
    2161              :   nns_edge_handle_s *eh;
    2162            1 :   int ret = NNS_EDGE_ERROR_NONE;
    2163              : 
    2164            1 :   eh = (nns_edge_handle_s *) edge_h;
    2165            1 :   if (!eh) {
    2166            0 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    2167            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2168              :   }
    2169              : 
    2170            1 :   if (!nns_edge_handle_is_valid (eh)) {
    2171            0 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    2172            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2173              :   }
    2174              : 
    2175            1 :   nns_edge_lock (eh);
    2176            1 :   if (!eh->event_cb) {
    2177            0 :     nns_edge_loge ("NNStreamer-edge event callback is not registered.");
    2178            0 :     nns_edge_unlock (eh);
    2179            0 :     return NNS_EDGE_ERROR_CONNECTION_FAILURE;
    2180              :   }
    2181              : 
    2182            1 :   if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    2183            1 :     ret = nns_edge_custom_start_discovery (eh->custom_connection_h);
    2184              :   }
    2185              : 
    2186            1 :   nns_edge_unlock (eh);
    2187              : 
    2188            1 :   return ret;
    2189              : }
    2190              : 
    2191              : /**
    2192              :  * @brief Stop discovery connectable devices within the network.
    2193              :  */
    2194            1 : int nns_edge_stop_discovery (nns_edge_h edge_h)
    2195              : {
    2196              :   nns_edge_handle_s *eh;
    2197            1 :   int ret = NNS_EDGE_ERROR_NONE;
    2198              : 
    2199            1 :   eh = (nns_edge_handle_s *) edge_h;
    2200            1 :   if (!eh) {
    2201            0 :     nns_edge_loge ("Invalid param, given edge handle is null.");
    2202            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2203              :   }
    2204              : 
    2205            1 :   if (!nns_edge_handle_is_valid (eh)) {
    2206            0 :     nns_edge_loge ("Invalid param, given edge handle is invalid.");
    2207            0 :     return NNS_EDGE_ERROR_INVALID_PARAMETER;
    2208              :   }
    2209              : 
    2210            1 :   nns_edge_lock (eh);
    2211              : 
    2212            1 :   if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
    2213            1 :     ret = nns_edge_custom_stop_discovery (eh->custom_connection_h);
    2214              :   }
    2215              : 
    2216            1 :   nns_edge_unlock (eh);
    2217              : 
    2218            1 :   return ret;
    2219              : }
    2220              : 
        

Generated by: LCOV version 2.0-1