Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file nnstreamer-edge-internal.c
6 : * @date 6 April 2022
7 : * @brief Common library to support communication among devices.
8 : * @see https://github.com/nnstreamer/nnstreamer
9 : * @author Gichan Jang <gichan2.jang@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #include <arpa/inet.h>
14 : #include <netdb.h>
15 : #include <poll.h>
16 :
17 : #include "nnstreamer-edge-data.h"
18 : #include "nnstreamer-edge-event.h"
19 : #include "nnstreamer-edge-log.h"
20 : #include "nnstreamer-edge-util.h"
21 : #include "nnstreamer-edge-queue.h"
22 : #include "nnstreamer-edge-metadata.h"
23 : #include "nnstreamer-edge-mqtt.h"
24 : #include "nnstreamer-edge-custom-impl.h"
25 :
26 : #ifndef MSG_NOSIGNAL
27 : #define MSG_NOSIGNAL 0
28 : #endif
29 :
30 : /**
31 : * @brief The maximum length of pending connections to accept socket.
32 : */
33 : #define N_BACKLOG 10
34 :
35 : /**
36 : * @brief Data structure for edge handle.
37 : */
38 : typedef struct
39 : {
40 : uint32_t magic;
41 : pthread_mutex_t lock;
42 : pthread_cond_t cond;
43 : char *id;
44 : char *topic;
45 : nns_edge_connect_type_e connect_type;
46 : char *host; /**< host name or IP address */
47 : int port; /**< port number (0~65535, default 0 to get available port.) */
48 : char *dest_host; /**< destination IP address (broker or target device) */
49 : int dest_port; /**< destination port number (broker or target device) */
50 : nns_edge_node_type_e node_type;
51 : nns_edge_metadata_h metadata;
52 : bool is_started;
53 :
54 : /* Edge event callback and user data */
55 : nns_edge_event_cb event_cb;
56 : void *user_data;
57 :
58 : int64_t client_id;
59 : char *caps_str;
60 :
61 : /* list of connection data */
62 : void *connections;
63 :
64 : /* socket listener */
65 : bool listening;
66 : int listener_fd;
67 : pthread_t listener_thread;
68 :
69 : /* thread and queue to send data */
70 : bool sending;
71 : nns_edge_queue_h send_queue;
72 : pthread_t send_thread;
73 :
74 : /* MQTT handle */
75 : void *broker_h;
76 :
77 : /* Data for custom connection */
78 : nns_edge_custom_connection_h custom_connection_h;
79 : } nns_edge_handle_s;
80 :
81 : /**
82 : * @brief enum for nnstreamer edge query commands.
83 : */
84 : typedef enum
85 : {
86 : _NNS_EDGE_CMD_ERROR = 0,
87 : _NNS_EDGE_CMD_TRANSFER_DATA,
88 : _NNS_EDGE_CMD_HOST_INFO,
89 : _NNS_EDGE_CMD_CAPABILITY,
90 : _NNS_EDGE_CMD_END
91 : } nns_edge_cmd_e;
92 :
93 : /**
94 : * @brief Structure for edge command info. It should be fixed size.
95 : */
96 : typedef struct
97 : {
98 : uint32_t magic;
99 : uint32_t cmd; /**< enum for query commands, see nns_edge_cmd_e. */
100 : uint64_t version;
101 : int64_t client_id;
102 :
103 : /* memory info */
104 : uint32_t num;
105 : nns_size_t mem_size[NNS_EDGE_DATA_LIMIT];
106 : nns_size_t meta_size;
107 : } nns_edge_cmd_info_s;
108 :
109 : /**
110 : * @brief Structure for edge command and buffers.
111 : */
112 : typedef struct
113 : {
114 : nns_edge_cmd_info_s info;
115 : void *mem[NNS_EDGE_DATA_LIMIT];
116 : void *meta;
117 : } nns_edge_cmd_s;
118 :
119 : /**
120 : * @brief Data structure for connection data.
121 : */
122 : typedef struct _nns_edge_conn_data_s nns_edge_conn_data_s;
123 :
124 : /**
125 : * @brief Data structure for edge connection.
126 : */
127 : typedef struct
128 : {
129 : char *host;
130 : int port;
131 : bool running;
132 : pthread_t msg_thread;
133 : int sockfd;
134 : } nns_edge_conn_s;
135 :
136 : /**
137 : * @brief Data structure for connection data.
138 : */
139 : struct _nns_edge_conn_data_s
140 : {
141 : nns_edge_conn_s *src_conn;
142 : nns_edge_conn_s *sink_conn;
143 : int64_t id;
144 : nns_edge_conn_data_s *next;
145 : };
146 :
147 : /**
148 : * @brief Structures for thread data of message handling.
149 : */
150 : typedef struct
151 : {
152 : nns_edge_handle_s *eh;
153 : int64_t client_id;
154 : nns_edge_conn_s *conn;
155 : } nns_edge_thread_data_s;
156 :
157 : /**
158 : * @brief Parse the message received from the MQTT broker and connect to the server directly.
159 : */
160 : static int _mqtt_hybrid_direct_connection (nns_edge_handle_s * eh);
161 :
162 : /**
163 : * @brief Set socket option. nnstreamer-edge handles TCP connection now.
164 : */
165 : static void
166 12 : _set_socket_option (int fd)
167 : {
168 12 : int nodelay = 1;
169 :
170 : /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
171 12 : if (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof (int)) < 0)
172 0 : nns_edge_logw ("Failed to set TCP delay option.");
173 12 : }
174 :
175 : /**
176 : * @brief Fill socket address struct from host name and port number.
177 : */
178 : static bool
179 12 : _fill_socket_addr (struct sockaddr_in *saddr, const char *host, const int port)
180 : {
181 : /** @todo handle protocol (ipv4 and ipv6) */
182 12 : saddr->sin_family = AF_INET;
183 12 : saddr->sin_port = htons (port);
184 :
185 12 : if ((saddr->sin_addr.s_addr = inet_addr (host)) == INADDR_NONE) {
186 : int ret;
187 8 : char *port_str = NULL;
188 : struct addrinfo hints;
189 8 : struct addrinfo *addrs = NULL;
190 :
191 8 : memset (&hints, 0, sizeof (hints));
192 8 : hints.ai_family = AF_INET;
193 8 : hints.ai_socktype = SOCK_STREAM;
194 :
195 8 : if (port > 0)
196 8 : port_str = nns_edge_strdup_printf ("%d", port);
197 8 : ret = getaddrinfo (host, port_str, &hints, &addrs);
198 8 : SAFE_FREE (port_str);
199 :
200 8 : if (ret != 0 || addrs == NULL)
201 0 : return false;
202 :
203 8 : memcpy (saddr, addrs->ai_addr, addrs->ai_addrlen);
204 8 : freeaddrinfo (addrs);
205 : }
206 :
207 12 : return true;
208 : }
209 :
210 : /**
211 : * @brief Send data to connected socket.
212 : */
213 : static bool
214 114 : _send_raw_data (nns_edge_conn_s * conn, void *data, nns_size_t size)
215 : {
216 114 : nns_size_t sent = 0;
217 : nns_ssize_t rret;
218 :
219 228 : while (sent < size) {
220 114 : rret = send (conn->sockfd, (char *) data + sent, size - sent, MSG_NOSIGNAL);
221 :
222 114 : if (rret <= 0) {
223 0 : nns_edge_loge ("Failed to send raw data.");
224 0 : return false;
225 : }
226 :
227 114 : sent += rret;
228 : }
229 :
230 114 : return true;
231 : }
232 :
233 : /**
234 : * @brief Receive data from connected socket.
235 : */
236 : static bool
237 105 : _receive_raw_data (nns_edge_conn_s * conn, void *data, nns_size_t size)
238 : {
239 105 : nns_size_t received = 0;
240 : nns_ssize_t rret;
241 :
242 210 : while (received < size) {
243 105 : rret = recv (conn->sockfd, (char *) data + received, size - received, 0);
244 :
245 105 : if (rret <= 0) {
246 0 : nns_edge_loge ("Failed to receive raw data.");
247 0 : return false;
248 : }
249 :
250 105 : received += rret;
251 : }
252 :
253 105 : return true;
254 : }
255 :
256 : /**
257 : * @brief Internal function to check connection.
258 : */
259 : static bool
260 117 : _nns_edge_check_connection (nns_edge_conn_s * conn)
261 : {
262 : struct pollfd poll_fd;
263 : int n;
264 :
265 117 : if (!conn || conn->sockfd < 0)
266 117 : return false;
267 :
268 117 : poll_fd.fd = conn->sockfd;
269 117 : poll_fd.events = POLLIN | POLLOUT | POLLPRI | POLLERR | POLLHUP;
270 117 : poll_fd.revents = 0;
271 :
272 : /** Timeout zero means that the poll() is returned immediately. */
273 117 : n = poll (&poll_fd, 1, 0);
274 : /**
275 : * Return value zero indicates that the system call timed out.
276 : * let's skip the check `n == 0` because timeout is set to 0.
277 : */
278 117 : if (n < 0 || poll_fd.revents & (POLLERR | POLLHUP)) {
279 0 : nns_edge_logw ("Socket is not available, possibly closed.");
280 0 : return false;
281 : }
282 :
283 117 : return true;
284 : }
285 :
286 : /**
287 : * @brief initialize edge command.
288 : */
289 : static void
290 87 : _nns_edge_cmd_init (nns_edge_cmd_s * cmd, nns_edge_cmd_e c, int64_t cid)
291 : {
292 87 : if (!cmd)
293 0 : return;
294 :
295 87 : memset (cmd, 0, sizeof (nns_edge_cmd_s));
296 87 : nns_edge_handle_set_magic (&cmd->info, NNS_EDGE_MAGIC);
297 87 : cmd->info.cmd = c;
298 87 : cmd->info.version = nns_edge_generate_version_key ();
299 87 : cmd->info.client_id = cid;
300 87 : cmd->info.num = 0;
301 87 : cmd->info.meta_size = 0;
302 : }
303 :
304 : /**
305 : * @brief Clear allocated memory in edge command.
306 : */
307 : static void
308 42 : _nns_edge_cmd_clear (nns_edge_cmd_s * cmd)
309 : {
310 : unsigned int i;
311 :
312 42 : if (!cmd)
313 0 : return;
314 :
315 42 : nns_edge_handle_set_magic (&cmd->info, NNS_EDGE_MAGIC_DEAD);
316 :
317 81 : for (i = 0; i < cmd->info.num; i++) {
318 39 : SAFE_FREE (cmd->mem[i]);
319 39 : cmd->info.mem_size[i] = 0U;
320 : }
321 :
322 42 : SAFE_FREE (cmd->meta);
323 :
324 42 : cmd->info.cmd = _NNS_EDGE_CMD_ERROR;
325 42 : cmd->info.version = 0;
326 42 : cmd->info.client_id = 0;
327 42 : cmd->info.num = 0;
328 42 : cmd->info.meta_size = 0;
329 : }
330 :
331 : /**
332 : * @brief Validate edge command.
333 : */
334 : static bool
335 87 : _nns_edge_cmd_is_valid (nns_edge_cmd_s * cmd)
336 : {
337 : int command;
338 :
339 87 : if (!cmd)
340 0 : return false;
341 :
342 87 : command = (int) cmd->info.cmd;
343 :
344 87 : if (!nns_edge_handle_is_valid (&cmd->info) ||
345 87 : (command < 0 || command >= _NNS_EDGE_CMD_END)) {
346 0 : return false;
347 : }
348 :
349 87 : if (!nns_edge_parse_version_key (cmd->info.version, NULL, NULL, NULL))
350 0 : return false;
351 :
352 : /**
353 : * @todo The number of memories in data.
354 : * Total number of memories in edge-data should be less than NNS_EDGE_DATA_LIMIT.
355 : * Fetch nns-edge version info and check allowed memories if NNS_EDGE_DATA_LIMIT is updated.
356 : */
357 87 : if (cmd->info.num > NNS_EDGE_DATA_LIMIT)
358 0 : return false;
359 :
360 87 : return true;
361 : }
362 :
363 : /**
364 : * @brief Send edge command to connected device.
365 : */
366 : static int
367 48 : _nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
368 : {
369 : unsigned int n;
370 :
371 48 : if (!conn) {
372 0 : nns_edge_loge ("Failed to send command, edge connection is null.");
373 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
374 : }
375 :
376 48 : if (!_nns_edge_cmd_is_valid (cmd)) {
377 0 : nns_edge_loge ("Failed to send command, invalid command.");
378 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
379 : }
380 :
381 48 : if (!_nns_edge_check_connection (conn)) {
382 0 : nns_edge_loge ("Failed to send command, socket has error.");
383 0 : return NNS_EDGE_ERROR_IO;
384 : }
385 :
386 48 : if (!_send_raw_data (conn, &cmd->info, sizeof (nns_edge_cmd_info_s))) {
387 0 : nns_edge_loge ("Failed to send command to socket.");
388 0 : return NNS_EDGE_ERROR_IO;
389 : }
390 :
391 84 : for (n = 0; n < cmd->info.num; n++) {
392 36 : if (!_send_raw_data (conn, cmd->mem[n], cmd->info.mem_size[n])) {
393 0 : nns_edge_loge ("Failed to send %uth memory to socket.", n);
394 0 : return NNS_EDGE_ERROR_IO;
395 : }
396 : }
397 :
398 48 : if (cmd->info.meta_size > 0) {
399 30 : if (!_send_raw_data (conn, cmd->meta, cmd->info.meta_size)) {
400 0 : nns_edge_loge ("Failed to send metadata to socket.");
401 0 : return NNS_EDGE_ERROR_IO;
402 : }
403 : }
404 :
405 48 : return NNS_EDGE_ERROR_NONE;
406 : }
407 :
408 : /**
409 : * @brief Receive edge command from connected device.
410 : * @note Before calling this function, you should initialize edge-cmd by using _nns_edge_cmd_init().
411 : */
412 : static int
413 39 : _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
414 : {
415 : unsigned int n;
416 39 : int ret = NNS_EDGE_ERROR_NONE;
417 :
418 39 : if (!conn || !cmd)
419 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
420 :
421 39 : if (!_nns_edge_check_connection (conn)) {
422 0 : nns_edge_loge ("Failed to receive command, socket has error.");
423 0 : return NNS_EDGE_ERROR_IO;
424 : }
425 :
426 39 : if (!_receive_raw_data (conn, &cmd->info, sizeof (nns_edge_cmd_info_s))) {
427 0 : nns_edge_loge ("Failed to receive command from socket.");
428 0 : return NNS_EDGE_ERROR_IO;
429 : }
430 :
431 39 : if (!_nns_edge_cmd_is_valid (cmd)) {
432 0 : nns_edge_loge ("Failed to receive command, invalid command.");
433 0 : return NNS_EDGE_ERROR_IO;
434 : }
435 :
436 39 : nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num);
437 39 : if (cmd->info.num >= NNS_EDGE_DATA_LIMIT) {
438 0 : nns_edge_loge ("Invalid request, the max memories for data transfer is %d.",
439 : NNS_EDGE_DATA_LIMIT);
440 0 : return NNS_EDGE_ERROR_IO;
441 : }
442 :
443 75 : for (n = 0; n < cmd->info.num; n++) {
444 36 : cmd->mem[n] = nns_edge_malloc (cmd->info.mem_size[n]);
445 36 : if (!cmd->mem[n]) {
446 0 : nns_edge_loge ("Failed to allocate memory to receive data from socket.");
447 0 : ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
448 0 : goto error;
449 : }
450 :
451 36 : if (!_receive_raw_data (conn, cmd->mem[n], cmd->info.mem_size[n])) {
452 0 : nns_edge_loge ("Failed to receive %uth memory from socket.", n++);
453 0 : ret = NNS_EDGE_ERROR_IO;
454 0 : goto error;
455 : }
456 : }
457 :
458 39 : if (cmd->info.meta_size > 0) {
459 30 : cmd->meta = nns_edge_malloc (cmd->info.meta_size);
460 30 : if (!cmd->meta) {
461 0 : nns_edge_loge ("Failed to allocate memory to receive meta from socket.");
462 0 : ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
463 0 : goto error;
464 : }
465 :
466 30 : if (!_receive_raw_data (conn, cmd->meta, cmd->info.meta_size)) {
467 0 : nns_edge_loge ("Failed to receive metadata from socket.");
468 0 : ret = NNS_EDGE_ERROR_IO;
469 0 : goto error;
470 : }
471 : }
472 :
473 39 : return NNS_EDGE_ERROR_NONE;
474 :
475 0 : error:
476 0 : _nns_edge_cmd_clear (cmd);
477 0 : return ret;
478 : }
479 :
480 : /**
481 : * @brief Internal function to send edge data.
482 : */
483 : static int
484 30 : _nns_edge_transfer_data (nns_edge_conn_s * conn, nns_edge_data_h data_h,
485 : int64_t client_id)
486 : {
487 : nns_edge_cmd_s cmd;
488 : unsigned int i;
489 : int ret;
490 :
491 30 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, client_id);
492 :
493 30 : ret = nns_edge_data_get_count (data_h, &cmd.info.num);
494 30 : if (ret != NNS_EDGE_ERROR_NONE) {
495 0 : nns_edge_loge ("Failed to get data count");
496 30 : return ret;
497 : }
498 :
499 60 : for (i = 0; i < cmd.info.num; i++) {
500 30 : ret = nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]);
501 30 : if (ret != NNS_EDGE_ERROR_NONE) {
502 0 : nns_edge_loge ("Failed to get data");
503 0 : return ret;
504 : }
505 : }
506 :
507 30 : ret = nns_edge_data_serialize_meta (data_h, &cmd.meta, &cmd.info.meta_size);
508 30 : if (ret != NNS_EDGE_ERROR_NONE) {
509 0 : nns_edge_loge ("Failed to serialize meta");
510 0 : return ret;
511 : }
512 :
513 30 : ret = _nns_edge_cmd_send (conn, &cmd);
514 30 : SAFE_FREE (cmd.meta);
515 :
516 30 : if (ret != NNS_EDGE_ERROR_NONE) {
517 0 : nns_edge_loge ("Failed to send edge data to destination (%s:%d).",
518 : conn->host, conn->port);
519 : }
520 :
521 30 : return ret;
522 : }
523 :
524 : /**
525 : * @brief Close connection
526 : */
527 : static bool
528 24 : _nns_edge_close_connection (nns_edge_conn_s * conn)
529 : {
530 24 : if (!conn)
531 12 : return false;
532 :
533 : /* Stop and clear the message thread. */
534 12 : conn->running = false;
535 12 : if (conn->msg_thread) {
536 6 : pthread_join (conn->msg_thread, NULL);
537 6 : conn->msg_thread = 0;
538 : }
539 :
540 12 : if (conn->sockfd >= 0) {
541 : nns_edge_cmd_s cmd;
542 :
543 : /* Send error before closing the socket. */
544 12 : nns_edge_logd ("Send error cmd to close connection.");
545 12 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0);
546 12 : _nns_edge_cmd_send (conn, &cmd);
547 :
548 12 : if (close (conn->sockfd) < 0)
549 0 : nns_edge_logw ("Failed to close socket.");
550 12 : conn->sockfd = -1;
551 : }
552 :
553 12 : SAFE_FREE (conn->host);
554 12 : SAFE_FREE (conn);
555 12 : return true;
556 : }
557 :
558 : /**
559 : * @brief Release connection data and its resources.
560 : */
561 : static void
562 6 : _nns_edge_release_connection_data (nns_edge_conn_data_s * cdata)
563 : {
564 6 : if (cdata) {
565 6 : _nns_edge_close_connection (cdata->src_conn);
566 6 : _nns_edge_close_connection (cdata->sink_conn);
567 6 : SAFE_FREE (cdata);
568 : }
569 6 : }
570 :
571 : /**
572 : * @brief Get nnstreamer-edge connection data.
573 : * @note This function should be called with handle lock.
574 : */
575 : static nns_edge_conn_data_s *
576 42 : _nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id)
577 : {
578 : nns_edge_conn_data_s *cdata;
579 :
580 42 : cdata = (nns_edge_conn_data_s *) eh->connections;
581 :
582 48 : while (cdata) {
583 42 : if (cdata->id == client_id)
584 36 : return cdata;
585 :
586 6 : cdata = cdata->next;
587 : }
588 :
589 6 : return NULL;
590 : }
591 :
592 : /**
593 : * @brief Get nnstreamer-edge connection data.
594 : * @note This function should be called with handle lock.
595 : */
596 : static nns_edge_conn_data_s *
597 12 : _nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id)
598 : {
599 : nns_edge_conn_data_s *cdata;
600 :
601 12 : cdata = _nns_edge_get_connection (eh, client_id);
602 :
603 12 : if (NULL == cdata) {
604 6 : cdata = (nns_edge_conn_data_s *) calloc (1, sizeof (nns_edge_conn_data_s));
605 6 : if (NULL == cdata) {
606 0 : nns_edge_loge ("Failed to allocate memory for connection data.");
607 0 : return NULL;
608 : }
609 :
610 : /* prepend connection data */
611 6 : cdata->id = client_id;
612 6 : cdata->next = eh->connections;
613 6 : eh->connections = cdata;
614 : }
615 :
616 12 : return cdata;
617 : }
618 :
619 : /**
620 : * @brief Remove nnstreamer-edge connection data.
621 : * @note This function should be called with handle lock.
622 : */
623 : static void
624 3 : _nns_edge_remove_connection (nns_edge_handle_s * eh, int64_t client_id)
625 : {
626 : nns_edge_conn_data_s *cdata, *prev;
627 :
628 3 : cdata = (nns_edge_conn_data_s *) eh->connections;
629 3 : prev = NULL;
630 :
631 3 : while (cdata) {
632 3 : if (cdata->id == client_id) {
633 3 : if (prev)
634 0 : prev->next = cdata->next;
635 : else
636 3 : eh->connections = cdata->next;
637 :
638 3 : _nns_edge_release_connection_data (cdata);
639 3 : return;
640 : }
641 0 : prev = cdata;
642 0 : cdata = cdata->next;
643 : }
644 : }
645 :
646 : /**
647 : * @brief Remove all connection data.
648 : * @note This function should be called with handle lock.
649 : */
650 : static void
651 38 : _nns_edge_remove_all_connection (nns_edge_handle_s * eh)
652 : {
653 : nns_edge_conn_data_s *cdata, *next;
654 :
655 38 : cdata = (nns_edge_conn_data_s *) eh->connections;
656 38 : eh->connections = NULL;
657 :
658 41 : while (cdata) {
659 3 : next = cdata->next;
660 :
661 3 : _nns_edge_release_connection_data (cdata);
662 :
663 3 : cdata = next;
664 : }
665 38 : }
666 :
667 : /**
668 : * @brief Connect to requested socket.
669 : */
670 : static bool
671 6 : _nns_edge_connect_socket (nns_edge_conn_s * conn)
672 : {
673 6 : struct sockaddr_in saddr = { 0 };
674 6 : socklen_t saddr_len = sizeof (struct sockaddr_in);
675 :
676 6 : if (!_fill_socket_addr (&saddr, conn->host, conn->port)) {
677 0 : nns_edge_loge ("Failed to connect socket, invalid host %s.", conn->host);
678 6 : return false;
679 : }
680 :
681 6 : conn->sockfd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
682 6 : if (conn->sockfd < 0) {
683 0 : nns_edge_loge ("Failed to create new socket.");
684 0 : return false;
685 : }
686 :
687 6 : _set_socket_option (conn->sockfd);
688 :
689 6 : if (connect (conn->sockfd, (struct sockaddr *) &saddr, saddr_len) < 0) {
690 0 : nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port);
691 0 : return false;
692 : }
693 :
694 6 : return true;
695 : }
696 :
697 : /**
698 : * @brief Message thread, receive buffer from the client.
699 : */
700 : static void *
701 6 : _nns_edge_message_handler (void *thread_data)
702 : {
703 6 : nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data;
704 : nns_edge_handle_s *eh;
705 : nns_edge_conn_s *conn;
706 6 : bool remove_connection = false;
707 : int64_t client_id;
708 : int ret;
709 :
710 6 : if (!_tdata) {
711 0 : nns_edge_loge ("Internal error, thread data is null.");
712 0 : return NULL;
713 : }
714 :
715 6 : eh = (nns_edge_handle_s *) _tdata->eh;
716 6 : conn = _tdata->conn;
717 6 : client_id = _tdata->client_id;
718 6 : SAFE_FREE (_tdata);
719 :
720 6 : conn->running = true;
721 1491 : while (conn->running) {
722 : struct pollfd poll_fd;
723 :
724 : /* Validate edge handle */
725 1488 : if (!nns_edge_handle_is_valid (eh)) {
726 0 : nns_edge_loge ("The edge handle is invalid, it would be expired.");
727 3 : break;
728 : }
729 :
730 1488 : poll_fd.fd = conn->sockfd;
731 1488 : poll_fd.events = POLLIN | POLLHUP | POLLERR;
732 1488 : poll_fd.revents = 0;
733 :
734 : /* 10 milliseconds */
735 1488 : if (poll (&poll_fd, 1, 10) > 0) {
736 : nns_edge_cmd_s cmd;
737 : nns_edge_data_h data_h;
738 : char *val;
739 : unsigned int i;
740 :
741 : /* Receive data from the client */
742 33 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
743 33 : ret = _nns_edge_cmd_receive (conn, &cmd);
744 33 : if (ret != NNS_EDGE_ERROR_NONE) {
745 0 : nns_edge_loge ("Failed to receive data from the connected node.");
746 0 : remove_connection = true;
747 3 : break;
748 : }
749 :
750 33 : if (cmd.info.cmd == _NNS_EDGE_CMD_ERROR) {
751 3 : nns_edge_loge ("Received error, stop msg thread.");
752 3 : _nns_edge_cmd_clear (&cmd);
753 3 : remove_connection = true;
754 3 : break;
755 : }
756 :
757 30 : if (cmd.info.cmd != _NNS_EDGE_CMD_TRANSFER_DATA) {
758 : /** @todo handle other cmd later */
759 0 : _nns_edge_cmd_clear (&cmd);
760 0 : continue;
761 : }
762 :
763 30 : ret = nns_edge_data_create (&data_h);
764 30 : if (ret != NNS_EDGE_ERROR_NONE) {
765 0 : nns_edge_loge ("Failed to create data handle in msg thread.");
766 0 : _nns_edge_cmd_clear (&cmd);
767 0 : continue;
768 : }
769 :
770 60 : for (i = 0; i < cmd.info.num; i++)
771 30 : nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL);
772 :
773 30 : if (cmd.info.meta_size > 0)
774 30 : nns_edge_data_deserialize_meta (data_h, cmd.meta, cmd.info.meta_size);
775 :
776 : /* Set client ID in edge data */
777 30 : val = nns_edge_strdup_printf ("%lld", (long long) client_id);
778 30 : nns_edge_data_set_info (data_h, "client_id", val);
779 30 : SAFE_FREE (val);
780 :
781 30 : ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
782 : NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
783 : NULL);
784 30 : if (ret != NNS_EDGE_ERROR_NONE) {
785 : /* Try to get next request if server does not accept data from client. */
786 0 : nns_edge_logw ("The server does not accept data from client.");
787 : }
788 :
789 30 : nns_edge_data_destroy (data_h);
790 30 : _nns_edge_cmd_clear (&cmd);
791 : }
792 : }
793 6 : conn->running = false;
794 :
795 : /* Received error message from client, remove connection from table. */
796 6 : if (remove_connection) {
797 3 : nns_edge_loge
798 : ("Received error from client, remove connection of client (ID: %lld).",
799 : (long long) client_id);
800 3 : _nns_edge_remove_connection (eh, client_id);
801 3 : ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
802 :
803 3 : if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
804 1 : nns_edge_logi ("Connection lost! Reconnect to available node.");
805 1 : ret = _mqtt_hybrid_direct_connection (eh);
806 : }
807 :
808 3 : if (ret != NNS_EDGE_ERROR_NONE) {
809 3 : nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
810 : NNS_EDGE_EVENT_CONNECTION_CLOSED, NULL, 0, NULL);
811 : }
812 : }
813 :
814 6 : return NULL;
815 : }
816 :
817 : /**
818 : * @brief Create message handle thread.
819 : */
820 : static int
821 6 : _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
822 : int64_t client_id)
823 : {
824 : int status;
825 6 : nns_edge_thread_data_s *thread_data = NULL;
826 :
827 : thread_data =
828 6 : (nns_edge_thread_data_s *) calloc (1, sizeof (nns_edge_thread_data_s));
829 6 : if (!thread_data) {
830 0 : nns_edge_loge ("Failed to allocate edge thread data.");
831 0 : return NNS_EDGE_ERROR_OUT_OF_MEMORY;
832 : }
833 :
834 : /** Create message receiving thread */
835 6 : thread_data->eh = eh;
836 6 : thread_data->conn = conn;
837 6 : thread_data->client_id = client_id;
838 :
839 6 : status = pthread_create (&conn->msg_thread, NULL, _nns_edge_message_handler,
840 : thread_data);
841 :
842 6 : if (status != 0) {
843 0 : nns_edge_loge ("Failed to create message handler thread.");
844 0 : conn->running = false;
845 0 : conn->msg_thread = 0;
846 0 : SAFE_FREE (thread_data);
847 0 : return NNS_EDGE_ERROR_IO;
848 : }
849 :
850 6 : return NNS_EDGE_ERROR_NONE;
851 : }
852 :
853 : /**
854 : * @brief Thread to send data.
855 : */
856 : static void *
857 7 : _nns_edge_send_thread (void *thread_data)
858 : {
859 7 : nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
860 : nns_edge_conn_data_s *conn_data;
861 : nns_edge_conn_s *conn;
862 : nns_edge_data_h data_h;
863 : nns_size_t data_size;
864 : int64_t client_id;
865 : char *val;
866 : int ret;
867 :
868 7 : nns_edge_lock (eh);
869 7 : eh->sending = true;
870 7 : nns_edge_cond_signal (eh);
871 7 : nns_edge_unlock (eh);
872 :
873 43 : while (eh->sending &&
874 43 : NNS_EDGE_ERROR_NONE == nns_edge_queue_wait_pop (eh->send_queue, 0U,
875 : &data_h, &data_size)) {
876 36 : if (!eh->sending) {
877 0 : nns_edge_data_destroy (data_h);
878 0 : break;
879 : }
880 :
881 : /* Send data to destination */
882 36 : switch (eh->connect_type) {
883 30 : case NNS_EDGE_CONNECT_TYPE_TCP:
884 : case NNS_EDGE_CONNECT_TYPE_HYBRID:
885 30 : ret = nns_edge_data_get_info (data_h, "client_id", &val);
886 30 : if (ret != NNS_EDGE_ERROR_NONE) {
887 0 : nns_edge_logd
888 : ("Cannot find client ID in edge data. Send to all connected nodes.");
889 :
890 0 : conn_data = (nns_edge_conn_data_s *) eh->connections;
891 0 : while (conn_data) {
892 0 : client_id = conn_data->id;
893 0 : conn = conn_data->sink_conn;
894 0 : ret = _nns_edge_transfer_data (conn, data_h, client_id);
895 0 : conn_data = conn_data->next;
896 :
897 0 : if (NNS_EDGE_ERROR_NONE != ret) {
898 0 : nns_edge_loge ("Failed to transfer data. Close the connection.");
899 0 : _nns_edge_remove_connection (eh, client_id);
900 : }
901 : }
902 : } else {
903 30 : client_id = (int64_t) strtoll (val, NULL, 10);
904 30 : SAFE_FREE (val);
905 :
906 30 : conn_data = _nns_edge_get_connection (eh, client_id);
907 30 : if (conn_data) {
908 30 : conn = conn_data->sink_conn;
909 30 : _nns_edge_transfer_data (conn, data_h, client_id);
910 : } else {
911 0 : nns_edge_loge
912 : ("Cannot find connection, invalid client ID or connection closed.");
913 : }
914 : }
915 30 : break;
916 5 : case NNS_EDGE_CONNECT_TYPE_MQTT:
917 5 : ret = nns_edge_mqtt_publish_data (eh->broker_h, data_h);
918 5 : if (NNS_EDGE_ERROR_NONE != ret)
919 0 : nns_edge_loge ("Failed to send data via MQTT connection.");
920 5 : break;
921 1 : case NNS_EDGE_CONNECT_TYPE_CUSTOM:
922 1 : ret = nns_edge_custom_send_data (eh->custom_connection_h, data_h);
923 1 : if (NNS_EDGE_ERROR_NONE != ret)
924 0 : nns_edge_loge ("Failed to send data via custom connection.");
925 1 : break;
926 0 : default:
927 0 : break;
928 : }
929 36 : nns_edge_data_destroy (data_h);
930 : }
931 7 : eh->sending = false;
932 :
933 7 : return NULL;
934 : }
935 :
936 : /**
937 : * @brief Create thread to send data.
938 : * @note This should be called with lock.
939 : */
940 : static int
941 7 : _nns_edge_create_send_thread (nns_edge_handle_s * eh)
942 : {
943 : int status;
944 :
945 7 : status = pthread_create (&eh->send_thread, NULL, _nns_edge_send_thread, eh);
946 :
947 7 : if (status != 0) {
948 0 : nns_edge_loge ("Failed to create sender thread.");
949 0 : eh->send_thread = 0;
950 0 : eh->sending = false;
951 0 : return NNS_EDGE_ERROR_IO;
952 : }
953 :
954 : /* Wait for starting thread. */
955 7 : nns_edge_cond_wait (eh);
956 :
957 7 : return NNS_EDGE_ERROR_NONE;
958 : }
959 :
960 : /**
961 : * @brief Connect to the destination node. (host:sender(sink) - dest:receiver(listener, src))
962 : */
963 : static int
964 6 : _nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id,
965 : const char *host, int port)
966 : {
967 6 : nns_edge_conn_s *conn = NULL;
968 : nns_edge_conn_data_s *conn_data;
969 : nns_edge_cmd_s cmd;
970 : char *host_str;
971 6 : bool done = false;
972 : int ret;
973 :
974 6 : conn = (nns_edge_conn_s *) calloc (1, sizeof (nns_edge_conn_s));
975 6 : if (!conn) {
976 0 : nns_edge_loge ("Failed to allocate client data.");
977 0 : goto error;
978 : }
979 :
980 6 : conn->host = nns_edge_strdup (host);
981 6 : conn->port = port;
982 6 : conn->sockfd = -1;
983 :
984 6 : if (!_nns_edge_connect_socket (conn)) {
985 0 : goto error;
986 : }
987 :
988 6 : if ((NNS_EDGE_NODE_TYPE_QUERY_CLIENT == eh->node_type)
989 3 : || (NNS_EDGE_NODE_TYPE_SUB == eh->node_type)) {
990 : /* Receive capability and client ID from server. */
991 3 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
992 3 : ret = _nns_edge_cmd_receive (conn, &cmd);
993 3 : if (ret != NNS_EDGE_ERROR_NONE) {
994 0 : nns_edge_loge ("Failed to receive capability.");
995 0 : goto error;
996 : }
997 :
998 3 : if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
999 0 : nns_edge_loge ("Failed to get capability.");
1000 0 : _nns_edge_cmd_clear (&cmd);
1001 0 : goto error;
1002 : }
1003 :
1004 3 : client_id = eh->client_id = cmd.info.client_id;
1005 :
1006 : /* Check compatibility. */
1007 3 : ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
1008 : NNS_EDGE_EVENT_CAPABILITY, cmd.mem[0], cmd.info.mem_size[0], NULL);
1009 3 : _nns_edge_cmd_clear (&cmd);
1010 :
1011 3 : if (ret != NNS_EDGE_ERROR_NONE) {
1012 0 : nns_edge_loge ("The event returns error, capability is not acceptable.");
1013 0 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
1014 : } else {
1015 : /* Send host and port to destination. */
1016 3 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
1017 :
1018 3 : host_str = nns_edge_get_host_string (eh->host, eh->port);
1019 3 : cmd.info.num = 1;
1020 3 : cmd.info.mem_size[0] = strlen (host_str) + 1;
1021 3 : cmd.mem[0] = host_str;
1022 : }
1023 :
1024 3 : ret = _nns_edge_cmd_send (conn, &cmd);
1025 3 : _nns_edge_cmd_clear (&cmd);
1026 :
1027 3 : if (ret != NNS_EDGE_ERROR_NONE) {
1028 0 : nns_edge_loge ("Failed to send host info.");
1029 0 : goto error;
1030 : }
1031 : }
1032 :
1033 6 : if (NNS_EDGE_NODE_TYPE_SUB == eh->node_type) {
1034 0 : ret = _nns_edge_create_message_thread (eh, conn, client_id);
1035 0 : if (ret != NNS_EDGE_ERROR_NONE) {
1036 0 : nns_edge_loge ("Failed to create message handle thread.");
1037 0 : goto error;
1038 : }
1039 : }
1040 :
1041 6 : conn_data = _nns_edge_add_connection (eh, client_id);
1042 6 : if (conn_data) {
1043 : /* Close old connection and set new one. */
1044 6 : _nns_edge_close_connection (conn_data->sink_conn);
1045 6 : conn_data->sink_conn = conn;
1046 6 : done = true;
1047 : }
1048 :
1049 0 : error:
1050 6 : if (!done) {
1051 0 : _nns_edge_close_connection (conn);
1052 6 : return NNS_EDGE_ERROR_CONNECTION_FAILURE;
1053 : }
1054 :
1055 6 : return NNS_EDGE_ERROR_NONE;
1056 : }
1057 :
1058 : /**
1059 : * @brief Accept socket and create message thread in socket listener thread.
1060 : */
1061 : static void
1062 6 : _nns_edge_accept_socket (nns_edge_handle_s * eh)
1063 : {
1064 6 : bool done = false;
1065 : nns_edge_conn_s *conn;
1066 : nns_edge_conn_data_s *conn_data;
1067 : nns_edge_cmd_s cmd;
1068 : int64_t client_id;
1069 6 : char *dest_host = NULL;
1070 : int dest_port, ret;
1071 :
1072 6 : conn = (nns_edge_conn_s *) calloc (1, sizeof (nns_edge_conn_s));
1073 6 : if (!conn) {
1074 0 : nns_edge_loge ("Failed to allocate edge connection.");
1075 0 : goto error;
1076 : }
1077 :
1078 6 : conn->sockfd = accept (eh->listener_fd, NULL, NULL);
1079 6 : if (conn->sockfd < 0) {
1080 0 : nns_edge_loge ("Failed to accept socket.");
1081 0 : goto error;
1082 : }
1083 :
1084 6 : _set_socket_option (conn->sockfd);
1085 :
1086 6 : if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
1087 3 : || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
1088 3 : client_id = nns_edge_generate_id ();
1089 : } else {
1090 3 : client_id = eh->client_id;
1091 : }
1092 :
1093 : /* Send capability and info to check compatibility. */
1094 6 : if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
1095 3 : || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
1096 3 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
1097 3 : cmd.info.num = 1;
1098 3 : cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
1099 3 : cmd.mem[0] = eh->caps_str;
1100 :
1101 3 : ret = _nns_edge_cmd_send (conn, &cmd);
1102 3 : if (ret != NNS_EDGE_ERROR_NONE) {
1103 0 : nns_edge_loge ("Failed to send capability.");
1104 0 : goto error;
1105 : }
1106 : }
1107 :
1108 6 : if (NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type) {
1109 : /* Receive host info from destination. */
1110 3 : _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
1111 3 : ret = _nns_edge_cmd_receive (conn, &cmd);
1112 3 : if (ret != NNS_EDGE_ERROR_NONE) {
1113 0 : nns_edge_loge ("Failed to receive node info.");
1114 0 : goto error;
1115 : }
1116 :
1117 3 : if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
1118 0 : nns_edge_loge ("Failed to get host info.");
1119 0 : _nns_edge_cmd_clear (&cmd);
1120 0 : goto error;
1121 : }
1122 :
1123 3 : nns_edge_parse_host_string (cmd.mem[0], &dest_host, &dest_port);
1124 3 : _nns_edge_cmd_clear (&cmd);
1125 :
1126 : /* Connect to client listener. */
1127 3 : ret = _nns_edge_connect_to (eh, client_id, dest_host, dest_port);
1128 3 : if (ret != NNS_EDGE_ERROR_NONE) {
1129 0 : nns_edge_loge ("Failed to connect host %s:%d.", dest_host, dest_port);
1130 0 : goto error;
1131 : }
1132 : }
1133 :
1134 6 : conn_data = _nns_edge_add_connection (eh, client_id);
1135 6 : if (!conn_data) {
1136 0 : nns_edge_loge ("Failed to add client connection.");
1137 0 : goto error;
1138 : }
1139 :
1140 : /* Close old connection and set new one for each node type. */
1141 6 : if (eh->node_type == NNS_EDGE_NODE_TYPE_QUERY_CLIENT ||
1142 3 : eh->node_type == NNS_EDGE_NODE_TYPE_QUERY_SERVER) {
1143 6 : ret = _nns_edge_create_message_thread (eh, conn, client_id);
1144 6 : if (ret != NNS_EDGE_ERROR_NONE) {
1145 0 : nns_edge_loge ("Failed to create message handle thread.");
1146 0 : goto error;
1147 : }
1148 6 : _nns_edge_close_connection (conn_data->src_conn);
1149 6 : conn_data->src_conn = conn;
1150 : } else {
1151 0 : _nns_edge_close_connection (conn_data->sink_conn);
1152 0 : conn_data->sink_conn = conn;
1153 : }
1154 :
1155 6 : ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
1156 : NNS_EDGE_EVENT_CONNECTION_COMPLETED, NULL, 0, NULL);
1157 6 : if (ret != NNS_EDGE_ERROR_NONE) {
1158 0 : nns_edge_loge ("Failed to send an event for new connection.");
1159 0 : goto error;
1160 : }
1161 6 : done = true;
1162 :
1163 6 : error:
1164 6 : if (!done)
1165 0 : _nns_edge_close_connection (conn);
1166 :
1167 6 : SAFE_FREE (dest_host);
1168 6 : }
1169 :
1170 : /**
1171 : * @brief Socket listener thread.
1172 : */
1173 : static void *
1174 6 : _nns_edge_socket_listener_thread (void *thread_data)
1175 : {
1176 6 : nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
1177 :
1178 6 : nns_edge_lock (eh);
1179 6 : eh->listening = true;
1180 6 : nns_edge_cond_signal (eh);
1181 6 : nns_edge_unlock (eh);
1182 :
1183 1567 : while (eh->listening) {
1184 : struct pollfd poll_fd;
1185 :
1186 1561 : poll_fd.fd = eh->listener_fd;
1187 1561 : poll_fd.events = POLLIN | POLLHUP | POLLERR;
1188 1561 : poll_fd.revents = 0;
1189 :
1190 : /* 10 milliseconds */
1191 1561 : if (poll (&poll_fd, 1, 10) > 0) {
1192 6 : if (!eh->listening)
1193 0 : break;
1194 :
1195 6 : if (poll_fd.revents & (POLLERR | POLLHUP)) {
1196 0 : nns_edge_loge ("Invalid state, possibly socket is closed in listener.");
1197 0 : break;
1198 : }
1199 :
1200 6 : if (poll_fd.revents & POLLIN)
1201 6 : _nns_edge_accept_socket (eh);
1202 : }
1203 : }
1204 6 : eh->listening = false;
1205 :
1206 6 : return NULL;
1207 : }
1208 :
1209 : /**
1210 : * @brief Create socket listener.
1211 : * @note This function should be called with handle lock.
1212 : */
1213 : static bool
1214 6 : _nns_edge_create_socket_listener (nns_edge_handle_s * eh)
1215 : {
1216 6 : bool done = false;
1217 6 : struct sockaddr_in saddr = { 0 };
1218 6 : socklen_t saddr_len = sizeof (struct sockaddr_in);
1219 : int status;
1220 :
1221 6 : if (!_fill_socket_addr (&saddr, eh->host, eh->port)) {
1222 0 : nns_edge_loge ("Failed to create listener, invalid host: %s.", eh->host);
1223 6 : return false;
1224 : }
1225 :
1226 6 : eh->listener_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
1227 6 : if (eh->listener_fd < 0) {
1228 0 : nns_edge_loge ("Failed to create listener socket.");
1229 0 : return false;
1230 : }
1231 :
1232 6 : if (bind (eh->listener_fd, (struct sockaddr *) &saddr, saddr_len) < 0) {
1233 0 : nns_edge_loge ("Failed to create listener, cannot bind socket.");
1234 0 : goto error;
1235 : }
1236 :
1237 6 : if (listen (eh->listener_fd, N_BACKLOG) < 0) {
1238 0 : nns_edge_loge ("Failed to create listener, cannot listen socket.");
1239 0 : goto error;
1240 : }
1241 :
1242 6 : status = pthread_create (&eh->listener_thread, NULL,
1243 : _nns_edge_socket_listener_thread, eh);
1244 :
1245 6 : if (status != 0) {
1246 0 : nns_edge_loge ("Failed to create listener thread.");
1247 0 : eh->listening = false;
1248 0 : eh->listener_thread = 0;
1249 0 : goto error;
1250 : }
1251 :
1252 : /* Wait for the listener thread to be started */
1253 6 : nns_edge_cond_wait (eh);
1254 :
1255 6 : done = true;
1256 :
1257 6 : error:
1258 6 : if (!done) {
1259 0 : close (eh->listener_fd);
1260 0 : eh->listener_fd = -1;
1261 : }
1262 :
1263 6 : return done;
1264 : }
1265 :
1266 : /**
1267 : * @brief Internal function to create edge handle.
1268 : */
1269 : static int
1270 36 : _nns_edge_create_handle (const char *id, nns_edge_node_type_e node_type,
1271 : nns_edge_h * edge_h)
1272 : {
1273 36 : int ret = NNS_EDGE_ERROR_NONE;
1274 : nns_edge_handle_s *eh;
1275 :
1276 36 : eh = (nns_edge_handle_s *) calloc (1, sizeof (nns_edge_handle_s));
1277 36 : if (!eh) {
1278 0 : nns_edge_loge ("Failed to allocate memory for edge handle.");
1279 0 : return NNS_EDGE_ERROR_OUT_OF_MEMORY;
1280 : }
1281 :
1282 36 : nns_edge_lock_init (eh);
1283 36 : nns_edge_cond_init (eh);
1284 36 : nns_edge_handle_set_magic (eh, NNS_EDGE_MAGIC);
1285 36 : eh->id = STR_IS_VALID (id) ? nns_edge_strdup (id) :
1286 0 : nns_edge_strdup_printf ("%lld", (long long) nns_edge_generate_id ());
1287 36 : eh->host = nns_edge_strdup ("localhost");
1288 36 : eh->port = 0;
1289 36 : eh->dest_host = nns_edge_strdup ("localhost");
1290 36 : eh->dest_port = 0;
1291 36 : eh->node_type = node_type;
1292 36 : eh->is_started = false;
1293 36 : eh->broker_h = NULL;
1294 36 : eh->connections = NULL;
1295 36 : eh->listening = false;
1296 36 : eh->sending = false;
1297 36 : eh->listener_fd = -1;
1298 36 : eh->caps_str = nns_edge_strdup ("");
1299 36 : eh->custom_connection_h = NULL;
1300 :
1301 36 : ret = nns_edge_metadata_create (&eh->metadata);
1302 36 : if (ret != NNS_EDGE_ERROR_NONE) {
1303 0 : nns_edge_loge ("Failed to create edge metadata.");
1304 0 : goto error;
1305 : }
1306 :
1307 36 : ret = nns_edge_queue_create (&eh->send_queue);
1308 36 : if (NNS_EDGE_ERROR_NONE != ret) {
1309 0 : nns_edge_loge ("Failed to create edge queue.");
1310 0 : goto error;
1311 : }
1312 :
1313 36 : error:
1314 36 : if (ret == NNS_EDGE_ERROR_NONE)
1315 36 : *edge_h = eh;
1316 : else
1317 0 : nns_edge_release_handle (eh);
1318 :
1319 36 : return ret;
1320 : }
1321 :
1322 : /**
1323 : * @brief Create edge custom handle.
1324 : */
1325 : int
1326 5 : nns_edge_custom_create_handle (const char *id, const char *lib_path,
1327 : nns_edge_node_type_e node_type, nns_edge_h * edge_h)
1328 : {
1329 5 : int ret = NNS_EDGE_ERROR_NONE;
1330 : nns_edge_handle_s *eh;
1331 :
1332 5 : if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) {
1333 1 : nns_edge_loge ("Invalid param, set exact node type.");
1334 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1335 : }
1336 :
1337 4 : if (!STR_IS_VALID (lib_path)) {
1338 1 : nns_edge_loge ("Invalid param, given custom lib path is invalid.");
1339 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1340 : }
1341 :
1342 3 : if (!edge_h) {
1343 1 : nns_edge_loge ("Invalid param, edge_h should not be null.");
1344 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1345 : }
1346 :
1347 2 : ret = _nns_edge_create_handle (id, node_type, edge_h);
1348 2 : if (ret != NNS_EDGE_ERROR_NONE) {
1349 0 : nns_edge_loge ("Failed to create edge handle.");
1350 0 : return ret;
1351 : }
1352 :
1353 2 : eh = (nns_edge_handle_s *) (*edge_h);
1354 2 : eh->connect_type = NNS_EDGE_CONNECT_TYPE_CUSTOM;
1355 :
1356 2 : ret = nns_edge_custom_load (lib_path, &eh->custom_connection_h);
1357 2 : if (ret != NNS_EDGE_ERROR_NONE)
1358 0 : nns_edge_release_handle (eh);
1359 :
1360 2 : return ret;
1361 : }
1362 :
1363 : /**
1364 : * @brief Create edge handle.
1365 : */
1366 : int
1367 37 : nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
1368 : nns_edge_node_type_e node_type, nns_edge_h * edge_h)
1369 : {
1370 37 : int ret = NNS_EDGE_ERROR_NONE;
1371 : nns_edge_handle_s *eh;
1372 :
1373 37 : if (connect_type < 0 || connect_type >= NNS_EDGE_CONNECT_TYPE_UNKNOWN ||
1374 : connect_type == NNS_EDGE_CONNECT_TYPE_CUSTOM) {
1375 1 : nns_edge_loge ("Invalid param, set valid connect type.");
1376 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1377 : }
1378 :
1379 : /**
1380 : * @todo handle flag (receive | send)
1381 : * e.g., send only case: listener is unnecessary.
1382 : */
1383 36 : if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) {
1384 1 : nns_edge_loge ("Invalid param, set exact node type.");
1385 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1386 : }
1387 :
1388 35 : if (!edge_h) {
1389 1 : nns_edge_loge ("Invalid param, edge_h should not be null.");
1390 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1391 : }
1392 :
1393 34 : ret = _nns_edge_create_handle (id, node_type, edge_h);
1394 34 : if (ret != NNS_EDGE_ERROR_NONE) {
1395 0 : nns_edge_loge ("Failed to create edge handle.");
1396 0 : return ret;
1397 : }
1398 :
1399 34 : eh = (nns_edge_handle_s *) (*edge_h);
1400 34 : eh->connect_type = connect_type;
1401 :
1402 34 : return NNS_EDGE_ERROR_NONE;
1403 : }
1404 :
1405 : /**
1406 : * @brief Start the nnstreamer edge.
1407 : */
1408 : int
1409 11 : nns_edge_start (nns_edge_h edge_h)
1410 : {
1411 : nns_edge_handle_s *eh;
1412 11 : int ret = NNS_EDGE_ERROR_NONE;
1413 :
1414 11 : eh = (nns_edge_handle_s *) edge_h;
1415 11 : if (!eh) {
1416 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
1417 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1418 : }
1419 :
1420 10 : if (!nns_edge_handle_is_valid (eh)) {
1421 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1422 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1423 : }
1424 :
1425 9 : nns_edge_lock (eh);
1426 :
1427 9 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
1428 1 : ret = nns_edge_custom_start (eh->custom_connection_h);
1429 1 : if (NNS_EDGE_ERROR_NONE == ret)
1430 1 : ret = _nns_edge_create_send_thread (eh);
1431 :
1432 1 : if (NNS_EDGE_ERROR_NONE != ret)
1433 0 : nns_edge_loge ("Failed to start edge custom connection.");
1434 1 : goto done;
1435 : }
1436 :
1437 8 : if (eh->port <= 0) {
1438 6 : eh->port = nns_edge_get_available_port ();
1439 6 : if (eh->port <= 0) {
1440 0 : nns_edge_loge ("Failed to start edge. Cannot get available port.");
1441 0 : nns_edge_unlock (eh);
1442 0 : return NNS_EDGE_ERROR_CONNECTION_FAILURE;
1443 : }
1444 : }
1445 :
1446 8 : if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
1447 6 : || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
1448 3 : if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
1449 2 : || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
1450 : char *topic;
1451 :
1452 : /** @todo Set unique device name.
1453 : * Device name should be unique. Consider using MAC address later.
1454 : * Now, use ID received from the user.
1455 : */
1456 2 : topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/",
1457 : eh->id, eh->topic);
1458 :
1459 4 : ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
1460 2 : &eh->broker_h);
1461 2 : SAFE_FREE (topic);
1462 :
1463 2 : if (NNS_EDGE_ERROR_NONE != ret) {
1464 0 : nns_edge_loge
1465 : ("Failed to start nnstreamer-edge, cannot connect to broker.");
1466 0 : goto done;
1467 : }
1468 :
1469 2 : if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
1470 : char *msg;
1471 1 : msg = nns_edge_get_host_string (eh->host, eh->port);
1472 :
1473 1 : ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
1474 1 : SAFE_FREE (msg);
1475 :
1476 1 : if (NNS_EDGE_ERROR_NONE != ret) {
1477 0 : nns_edge_loge ("Failed to publish the message to broker.");
1478 0 : goto done;
1479 : }
1480 : } else {
1481 1 : ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
1482 : eh->user_data);
1483 1 : if (NNS_EDGE_ERROR_NONE != ret) {
1484 0 : nns_edge_loge ("Failed to set event callback to MQTT broker.");
1485 0 : goto done;
1486 : }
1487 : }
1488 : }
1489 : }
1490 :
1491 8 : if ((NNS_EDGE_NODE_TYPE_QUERY_CLIENT == eh->node_type)
1492 5 : || (NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
1493 3 : || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
1494 : /* Start listener thread to accept socket. */
1495 6 : if (!_nns_edge_create_socket_listener (eh)) {
1496 0 : nns_edge_loge ("Failed to create socket listener.");
1497 0 : ret = NNS_EDGE_ERROR_IO;
1498 0 : goto done;
1499 : }
1500 :
1501 6 : ret = _nns_edge_create_send_thread (eh);
1502 : }
1503 :
1504 2 : done:
1505 9 : eh->is_started = (ret == NNS_EDGE_ERROR_NONE);
1506 9 : nns_edge_unlock (eh);
1507 9 : return ret;
1508 : }
1509 :
1510 : /**
1511 : * @brief Stop the nnstreamer edge.
1512 : */
1513 : int
1514 37 : nns_edge_stop (nns_edge_h edge_h)
1515 : {
1516 : nns_edge_handle_s *eh;
1517 37 : int ret = NNS_EDGE_ERROR_NONE;
1518 :
1519 37 : eh = (nns_edge_handle_s *) edge_h;
1520 37 : if (!eh) {
1521 0 : nns_edge_loge ("Invalid param, given edge handle is null.");
1522 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1523 : }
1524 :
1525 37 : if (!nns_edge_handle_is_valid (eh)) {
1526 0 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1527 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1528 : }
1529 :
1530 37 : nns_edge_lock (eh);
1531 37 : if (!eh->is_started) {
1532 28 : nns_edge_logi ("Edge is not started yet. Nothing to stop.");
1533 28 : goto done;
1534 : }
1535 :
1536 9 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
1537 1 : ret = nns_edge_custom_stop (eh->custom_connection_h);
1538 : }
1539 :
1540 9 : if (NNS_EDGE_ERROR_NONE == ret)
1541 9 : eh->is_started = FALSE;
1542 :
1543 0 : done:
1544 37 : nns_edge_unlock (eh);
1545 37 : return ret;
1546 : }
1547 :
1548 : /**
1549 : * @brief Release the given handle.
1550 : */
1551 : int
1552 38 : nns_edge_release_handle (nns_edge_h edge_h)
1553 : {
1554 : nns_edge_handle_s *eh;
1555 :
1556 38 : eh = (nns_edge_handle_s *) edge_h;
1557 38 : if (!eh) {
1558 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
1559 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1560 : }
1561 :
1562 37 : if (!nns_edge_handle_is_valid (eh)) {
1563 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1564 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1565 : }
1566 :
1567 36 : nns_edge_stop (eh);
1568 :
1569 36 : nns_edge_lock (eh);
1570 :
1571 : /* Clear message queue and stop thread first */
1572 36 : nns_edge_queue_clear (eh->send_queue);
1573 :
1574 36 : eh->sending = false;
1575 36 : if (eh->send_thread) {
1576 7 : pthread_join (eh->send_thread, NULL);
1577 7 : eh->send_thread = 0;
1578 : }
1579 :
1580 36 : eh->listening = false;
1581 36 : if (eh->listener_thread) {
1582 6 : pthread_join (eh->listener_thread, NULL);
1583 6 : eh->listener_thread = 0;
1584 : }
1585 :
1586 36 : if (eh->listener_fd >= 0) {
1587 6 : close (eh->listener_fd);
1588 6 : eh->listener_fd = -1;
1589 : }
1590 :
1591 36 : _nns_edge_remove_all_connection (eh);
1592 :
1593 36 : switch (eh->connect_type) {
1594 5 : case NNS_EDGE_CONNECT_TYPE_HYBRID:
1595 : case NNS_EDGE_CONNECT_TYPE_MQTT:
1596 5 : if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) {
1597 0 : nns_edge_logw ("Failed to close mqtt connection.");
1598 : }
1599 5 : break;
1600 2 : case NNS_EDGE_CONNECT_TYPE_CUSTOM:
1601 2 : if (nns_edge_custom_release (eh->custom_connection_h) !=
1602 : NNS_EDGE_ERROR_NONE) {
1603 0 : nns_edge_logw ("Failed to close custom connection.");
1604 : }
1605 2 : break;
1606 29 : default:
1607 29 : break;
1608 : }
1609 :
1610 : /* Clear event callback and handles */
1611 36 : nns_edge_handle_set_magic (eh, NNS_EDGE_MAGIC_DEAD);
1612 36 : eh->event_cb = NULL;
1613 36 : eh->user_data = NULL;
1614 36 : eh->broker_h = NULL;
1615 36 : eh->custom_connection_h = NULL;
1616 :
1617 36 : nns_edge_queue_destroy (eh->send_queue);
1618 36 : eh->send_queue = NULL;
1619 36 : nns_edge_metadata_destroy (eh->metadata);
1620 36 : eh->metadata = NULL;
1621 36 : SAFE_FREE (eh->id);
1622 36 : SAFE_FREE (eh->topic);
1623 36 : SAFE_FREE (eh->host);
1624 36 : SAFE_FREE (eh->dest_host);
1625 36 : SAFE_FREE (eh->caps_str);
1626 :
1627 36 : nns_edge_unlock (eh);
1628 36 : nns_edge_cond_destroy (eh);
1629 36 : nns_edge_lock_destroy (eh);
1630 36 : SAFE_FREE (eh);
1631 :
1632 36 : return NNS_EDGE_ERROR_NONE;
1633 : }
1634 :
1635 : /**
1636 : * @brief Set the event callback.
1637 : */
1638 : int
1639 17 : nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
1640 : void *user_data)
1641 : {
1642 : nns_edge_handle_s *eh;
1643 : int ret;
1644 :
1645 17 : eh = (nns_edge_handle_s *) edge_h;
1646 17 : if (!eh) {
1647 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
1648 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1649 : }
1650 :
1651 16 : if (!nns_edge_handle_is_valid (eh)) {
1652 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1653 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1654 : }
1655 :
1656 15 : nns_edge_lock (eh);
1657 :
1658 15 : ret = nns_edge_event_invoke_callback (eh->event_cb, eh->user_data,
1659 : NNS_EDGE_EVENT_CALLBACK_RELEASED, NULL, 0, NULL);
1660 15 : if (ret != NNS_EDGE_ERROR_NONE) {
1661 0 : nns_edge_loge ("Failed to set new event callback.");
1662 0 : goto error;
1663 : }
1664 :
1665 15 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
1666 1 : ret = nns_edge_custom_set_event_callback (eh->custom_connection_h,
1667 : cb, user_data);
1668 1 : if (NNS_EDGE_ERROR_NONE != ret) {
1669 0 : goto error;
1670 : }
1671 : }
1672 :
1673 15 : eh->event_cb = cb;
1674 15 : eh->user_data = user_data;
1675 :
1676 15 : error:
1677 15 : nns_edge_unlock (eh);
1678 15 : return ret;
1679 : }
1680 :
1681 : /**
1682 : * @brief Parse the message received from the MQTT broker and connect to the server directly.
1683 : */
1684 : static int
1685 2 : _mqtt_hybrid_direct_connection (nns_edge_handle_s * eh)
1686 : {
1687 : int ret;
1688 :
1689 0 : do {
1690 2 : char *msg = NULL;
1691 2 : char *server_ip = NULL;
1692 2 : int server_port = 0;
1693 2 : nns_size_t msg_len = 0;
1694 :
1695 : ret =
1696 2 : nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len, 0U);
1697 2 : if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
1698 : break;
1699 :
1700 1 : nns_edge_parse_host_string (msg, &server_ip, &server_port);
1701 1 : SAFE_FREE (msg);
1702 :
1703 1 : nns_edge_logd ("Parsed server info: Server [%s:%d] ", server_ip,
1704 : server_port);
1705 :
1706 1 : ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
1707 1 : SAFE_FREE (server_ip);
1708 :
1709 1 : if (NNS_EDGE_ERROR_NONE == ret)
1710 1 : break;
1711 : } while (TRUE);
1712 :
1713 2 : return ret;
1714 : }
1715 :
1716 : /**
1717 : * @brief Start subscription to MQTT message
1718 : */
1719 : static int
1720 3 : _nns_edge_start_mqtt_sub (nns_edge_handle_s * eh)
1721 : {
1722 : char *topic;
1723 : int ret;
1724 :
1725 3 : if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
1726 3 : topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
1727 :
1728 6 : ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
1729 3 : &eh->broker_h);
1730 3 : SAFE_FREE (topic);
1731 :
1732 3 : if (NNS_EDGE_ERROR_NONE != ret) {
1733 0 : return NNS_EDGE_ERROR_CONNECTION_FAILURE;
1734 : }
1735 :
1736 3 : ret = nns_edge_mqtt_subscribe (eh->broker_h);
1737 3 : if (NNS_EDGE_ERROR_NONE != ret) {
1738 0 : nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
1739 0 : return ret;
1740 : }
1741 : }
1742 :
1743 3 : return NNS_EDGE_ERROR_NONE;
1744 : }
1745 :
1746 : /**
1747 : * @brief Connect to the destination node.
1748 : */
1749 : int
1750 15 : nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
1751 : {
1752 : nns_edge_handle_s *eh;
1753 15 : int ret = NNS_EDGE_ERROR_NONE;
1754 :
1755 15 : eh = (nns_edge_handle_s *) edge_h;
1756 15 : if (!eh) {
1757 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
1758 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1759 : }
1760 :
1761 14 : if (!STR_IS_VALID (dest_host)) {
1762 2 : nns_edge_loge ("Invalid param, given host is invalid.");
1763 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1764 : }
1765 :
1766 12 : if (!PORT_IS_VALID (dest_port)) {
1767 3 : nns_edge_loge ("Invalid port number %d.", dest_port);
1768 3 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1769 : }
1770 :
1771 9 : if (!nns_edge_handle_is_valid (eh)) {
1772 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1773 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1774 : }
1775 :
1776 8 : nns_edge_lock (eh);
1777 8 : if (!eh->is_started) {
1778 1 : nns_edge_loge ("Invalid state, the edge handle is not started.");
1779 1 : nns_edge_unlock (eh);
1780 1 : return NNS_EDGE_ERROR_IO;
1781 : }
1782 :
1783 7 : if (!eh->event_cb) {
1784 0 : nns_edge_loge ("NNStreamer-edge event callback is not registered.");
1785 0 : nns_edge_unlock (eh);
1786 0 : return NNS_EDGE_ERROR_CONNECTION_FAILURE;
1787 : }
1788 :
1789 7 : if (NNS_EDGE_ERROR_NONE == nns_edge_is_connected (eh)) {
1790 0 : nns_edge_logi ("NNStreamer-edge is already connected.");
1791 0 : nns_edge_unlock (eh);
1792 0 : return NNS_EDGE_ERROR_NONE;
1793 : }
1794 :
1795 7 : SAFE_FREE (eh->dest_host);
1796 7 : eh->dest_host = nns_edge_strdup (dest_host);
1797 7 : eh->dest_port = dest_port;
1798 :
1799 7 : if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
1800 6 : || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
1801 3 : if (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)
1802 0 : goto done;
1803 3 : ret = _nns_edge_start_mqtt_sub (eh);
1804 3 : if (NNS_EDGE_ERROR_NONE != ret)
1805 0 : goto done;
1806 :
1807 3 : if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
1808 1 : ret = _mqtt_hybrid_direct_connection (eh);
1809 : } else {
1810 2 : ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
1811 : eh->user_data);
1812 2 : if (NNS_EDGE_ERROR_NONE != ret) {
1813 0 : nns_edge_loge ("Failed to set event callback to MQTT broker.");
1814 0 : goto done;
1815 : }
1816 : }
1817 4 : } else if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
1818 2 : ret = nns_edge_custom_connect (eh->custom_connection_h);
1819 2 : if (ret != NNS_EDGE_ERROR_NONE) {
1820 0 : goto done;
1821 : }
1822 : } else {
1823 2 : if (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)
1824 0 : goto done;
1825 2 : ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port);
1826 2 : if (ret != NNS_EDGE_ERROR_NONE) {
1827 0 : nns_edge_loge ("Failed to connect to %s:%d", dest_host, dest_port);
1828 : }
1829 : }
1830 :
1831 2 : done:
1832 7 : nns_edge_unlock (eh);
1833 7 : return ret;
1834 : }
1835 :
1836 : /**
1837 : * @brief Disconnect from the destination node.
1838 : */
1839 : int
1840 5 : nns_edge_disconnect (nns_edge_h edge_h)
1841 : {
1842 : nns_edge_handle_s *eh;
1843 5 : int ret = NNS_EDGE_ERROR_NONE;
1844 :
1845 5 : eh = (nns_edge_handle_s *) edge_h;
1846 5 : if (!eh) {
1847 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
1848 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1849 : }
1850 :
1851 4 : if (!nns_edge_handle_is_valid (eh)) {
1852 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1853 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1854 : }
1855 :
1856 3 : nns_edge_lock (eh);
1857 3 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
1858 1 : ret = nns_edge_custom_disconnect (eh->custom_connection_h);
1859 : } else {
1860 2 : _nns_edge_remove_all_connection (eh);
1861 : }
1862 3 : nns_edge_unlock (eh);
1863 :
1864 3 : return ret;
1865 : }
1866 :
1867 : /**
1868 : * @brief Check whether edge is connected or not.
1869 : */
1870 : int
1871 47 : nns_edge_is_connected (nns_edge_h edge_h)
1872 : {
1873 47 : nns_edge_handle_s *eh = (nns_edge_handle_s *) edge_h;
1874 : nns_edge_conn_data_s *conn_data;
1875 : nns_edge_conn_s *conn;
1876 :
1877 47 : if (!eh) {
1878 0 : nns_edge_loge ("Invalid param, given edge handle is null.");
1879 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1880 : }
1881 :
1882 47 : if (!nns_edge_handle_is_valid (eh)) {
1883 0 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1884 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1885 : }
1886 :
1887 47 : if (NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type &&
1888 7 : nns_edge_mqtt_is_connected (eh->broker_h))
1889 5 : return NNS_EDGE_ERROR_NONE;
1890 :
1891 42 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
1892 7 : return nns_edge_custom_is_connected (eh->custom_connection_h);
1893 : }
1894 :
1895 35 : conn_data = (nns_edge_conn_data_s *) eh->connections;
1896 35 : while (conn_data) {
1897 30 : conn = conn_data->sink_conn;
1898 30 : if (_nns_edge_check_connection (conn)) {
1899 30 : return NNS_EDGE_ERROR_NONE;
1900 : }
1901 0 : conn_data = conn_data->next;
1902 : }
1903 :
1904 5 : return NNS_EDGE_ERROR_CONNECTION_FAILURE;
1905 : }
1906 :
1907 : /**
1908 : * @brief Send data to destination (broker or connected node), asynchronously.
1909 : */
1910 : int
1911 39 : nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h)
1912 : {
1913 39 : int ret = NNS_EDGE_ERROR_NONE;
1914 : nns_edge_handle_s *eh;
1915 : nns_edge_data_h new_data_h;
1916 :
1917 39 : eh = (nns_edge_handle_s *) edge_h;
1918 39 : if (!eh) {
1919 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
1920 39 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1921 : }
1922 :
1923 38 : if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) {
1924 1 : nns_edge_loge ("Invalid param, given edge data is invalid.");
1925 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1926 : }
1927 :
1928 37 : if (!nns_edge_handle_is_valid (eh)) {
1929 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1930 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1931 : }
1932 :
1933 36 : nns_edge_lock (eh);
1934 :
1935 36 : if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (eh)) {
1936 0 : nns_edge_loge ("There is no available connection.");
1937 0 : nns_edge_unlock (eh);
1938 0 : return NNS_EDGE_ERROR_IO;
1939 : }
1940 :
1941 36 : if (!eh->send_thread) {
1942 0 : nns_edge_loge ("Invalid state, start edge before sending a data.");
1943 0 : nns_edge_unlock (eh);
1944 0 : return NNS_EDGE_ERROR_IO;
1945 : }
1946 :
1947 : /* Create new data handle and push it into send-queue. */
1948 36 : ret = nns_edge_data_copy (data_h, &new_data_h);
1949 36 : if (NNS_EDGE_ERROR_NONE != ret) {
1950 0 : nns_edge_loge ("Failed to send data, cannot copy data.");
1951 0 : nns_edge_unlock (eh);
1952 0 : return ret;
1953 : }
1954 :
1955 36 : ret = nns_edge_queue_push (eh->send_queue, new_data_h,
1956 : sizeof (nns_edge_data_h), nns_edge_data_release_handle);
1957 36 : if (NNS_EDGE_ERROR_NONE != ret) {
1958 0 : nns_edge_loge ("Failed to send data, cannot push data into queue.");
1959 0 : nns_edge_data_destroy (new_data_h);
1960 : }
1961 :
1962 36 : nns_edge_unlock (eh);
1963 36 : return ret;
1964 : }
1965 :
1966 : /**
1967 : * @brief Set nnstreamer edge info.
1968 : */
1969 : int
1970 41 : nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
1971 : {
1972 : nns_edge_handle_s *eh;
1973 41 : int ret = NNS_EDGE_ERROR_NONE;
1974 :
1975 41 : eh = (nns_edge_handle_s *) edge_h;
1976 41 : if (!eh) {
1977 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
1978 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1979 : }
1980 :
1981 40 : if (!STR_IS_VALID (key)) {
1982 2 : nns_edge_loge ("Invalid param, given key is invalid.");
1983 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1984 : }
1985 :
1986 38 : if (!STR_IS_VALID (value)) {
1987 2 : nns_edge_loge ("Invalid param, given value is invalid.");
1988 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1989 : }
1990 :
1991 36 : if (!nns_edge_handle_is_valid (eh)) {
1992 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
1993 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
1994 : }
1995 :
1996 35 : nns_edge_lock (eh);
1997 :
1998 35 : if (0 == strcasecmp (key, "CAPS") || 0 == strcasecmp (key, "CAPABILITY")) {
1999 6 : SAFE_FREE (eh->caps_str);
2000 6 : eh->caps_str = nns_edge_strdup (value);
2001 29 : } else if (0 == strcasecmp (key, "IP") || 0 == strcasecmp (key, "HOST")) {
2002 3 : SAFE_FREE (eh->host);
2003 3 : eh->host = nns_edge_strdup (value);
2004 26 : } else if (0 == strcasecmp (key, "PORT")) {
2005 5 : int port = nns_edge_parse_port_number (value);
2006 :
2007 5 : if (port < 0) {
2008 2 : ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
2009 : } else {
2010 3 : eh->port = port;
2011 : }
2012 21 : } else if (0 == strcasecmp (key, "DEST_IP")
2013 19 : || 0 == strcasecmp (key, "DEST_HOST")) {
2014 3 : SAFE_FREE (eh->dest_host);
2015 3 : eh->dest_host = nns_edge_strdup (value);
2016 18 : } else if (0 == strcasecmp (key, "DEST_PORT")) {
2017 3 : int port = nns_edge_parse_port_number (value);
2018 :
2019 3 : if (port < 0) {
2020 0 : ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
2021 : } else {
2022 3 : eh->dest_port = port;
2023 : }
2024 15 : } else if (0 == strcasecmp (key, "TOPIC")) {
2025 6 : SAFE_FREE (eh->topic);
2026 6 : eh->topic = nns_edge_strdup (value);
2027 9 : } else if (0 == strcasecmp (key, "ID") || 0 == strcasecmp (key, "CLIENT_ID")) {
2028 : /* Not allowed key */
2029 2 : nns_edge_loge ("Cannot update %s.", key);
2030 2 : ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
2031 7 : } else if (0 == strcasecmp (key, "QUEUE_SIZE")) {
2032 : char *s;
2033 : unsigned int limit;
2034 3 : nns_edge_queue_leak_e leaky = NNS_EDGE_QUEUE_LEAK_NEW;
2035 :
2036 3 : s = strstr (value, ":");
2037 3 : if (s) {
2038 3 : char *v = nns_edge_strndup (value, s - value);
2039 :
2040 3 : limit = (unsigned int) strtoull (v, NULL, 10);
2041 3 : SAFE_FREE (v);
2042 :
2043 3 : if (strcasecmp (s + 1, "NEW") == 0) {
2044 1 : leaky = NNS_EDGE_QUEUE_LEAK_NEW;
2045 2 : } else if (strcasecmp (s + 1, "OLD") == 0) {
2046 1 : leaky = NNS_EDGE_QUEUE_LEAK_OLD;
2047 : } else {
2048 1 : nns_edge_loge ("Cannot set queue leaky option (%s).", s + 1);
2049 1 : ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
2050 : }
2051 : } else {
2052 0 : limit = (unsigned int) strtoull (value, NULL, 10);
2053 : }
2054 :
2055 3 : if (ret == NNS_EDGE_ERROR_NONE)
2056 2 : nns_edge_queue_set_limit (eh->send_queue, limit, leaky);
2057 : } else {
2058 4 : ret = nns_edge_metadata_set (eh->metadata, key, value);
2059 : }
2060 :
2061 35 : if (ret == NNS_EDGE_ERROR_NONE &&
2062 30 : NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
2063 : /* Pass value to custom library and ignore error. */
2064 1 : if (nns_edge_custom_set_info (eh->custom_connection_h, key, value) !=
2065 : NNS_EDGE_ERROR_NONE) {
2066 0 : nns_edge_logw ("Failed to set info '%s' in custom connection.", key);
2067 : }
2068 : }
2069 :
2070 35 : nns_edge_unlock (eh);
2071 35 : return ret;
2072 : }
2073 :
2074 : /**
2075 : * @brief Get nnstreamer edge info.
2076 : */
2077 : int
2078 20 : nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value)
2079 : {
2080 : nns_edge_handle_s *eh;
2081 20 : int ret = NNS_EDGE_ERROR_NONE;
2082 :
2083 20 : eh = (nns_edge_handle_s *) edge_h;
2084 20 : if (!eh) {
2085 1 : nns_edge_loge ("Invalid param, given edge handle is null.");
2086 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2087 : }
2088 :
2089 19 : if (!STR_IS_VALID (key)) {
2090 2 : nns_edge_loge ("Invalid param, given key is invalid.");
2091 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2092 : }
2093 :
2094 17 : if (!value) {
2095 2 : nns_edge_loge ("Invalid param, value should not be null.");
2096 2 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2097 : }
2098 :
2099 : /* Init null */
2100 15 : *value = NULL;
2101 :
2102 15 : if (!nns_edge_handle_is_valid (eh)) {
2103 1 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
2104 1 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2105 : }
2106 :
2107 14 : nns_edge_lock (eh);
2108 :
2109 14 : if (0 == strcasecmp (key, "CAPS") || 0 == strcasecmp (key, "CAPABILITY")) {
2110 1 : *value = nns_edge_strdup (eh->caps_str);
2111 13 : } else if (0 == strcasecmp (key, "IP") || 0 == strcasecmp (key, "HOST")) {
2112 1 : *value = nns_edge_strdup (eh->host);
2113 12 : } else if (0 == strcasecmp (key, "PORT")) {
2114 1 : *value = nns_edge_strdup_printf ("%d", eh->port);
2115 11 : } else if (0 == strcasecmp (key, "TOPIC")) {
2116 1 : *value = nns_edge_strdup (eh->topic);
2117 10 : } else if (0 == strcasecmp (key, "ID")) {
2118 1 : *value = nns_edge_strdup (eh->id);
2119 9 : } else if (0 == strcasecmp (key, "DEST_IP")
2120 8 : || 0 == strcasecmp (key, "DEST_HOST")) {
2121 1 : *value = nns_edge_strdup (eh->dest_host);
2122 8 : } else if (0 == strcasecmp (key, "DEST_PORT")) {
2123 1 : *value = nns_edge_strdup_printf ("%d", eh->dest_port);
2124 7 : } else if (0 == strcasecmp (key, "CLIENT_ID")) {
2125 3 : if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
2126 3 : || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
2127 0 : nns_edge_loge ("Cannot get the client ID, it was started as a server.");
2128 0 : ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
2129 : } else {
2130 3 : *value = nns_edge_strdup_printf ("%lld", (long long) eh->client_id);
2131 : }
2132 : } else {
2133 4 : ret = nns_edge_metadata_get (eh->metadata, key, value);
2134 : }
2135 :
2136 14 : if (ret == NNS_EDGE_ERROR_NONE &&
2137 14 : NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
2138 1 : char *val = NULL;
2139 :
2140 1 : if (nns_edge_custom_get_info (eh->custom_connection_h, key, &val) ==
2141 : NNS_EDGE_ERROR_NONE) {
2142 : /* Replace value from custom library. */
2143 1 : SAFE_FREE (*value);
2144 1 : *value = val;
2145 : }
2146 : }
2147 :
2148 14 : nns_edge_unlock (eh);
2149 :
2150 14 : if (ret != NNS_EDGE_ERROR_NONE)
2151 0 : SAFE_FREE (*value);
2152 :
2153 14 : return ret;
2154 : }
2155 :
2156 : /**
2157 : * @brief Start discovery connectable devices within the network.
2158 : */
2159 1 : int nns_edge_start_discovery (nns_edge_h edge_h)
2160 : {
2161 : nns_edge_handle_s *eh;
2162 1 : int ret = NNS_EDGE_ERROR_NONE;
2163 :
2164 1 : eh = (nns_edge_handle_s *) edge_h;
2165 1 : if (!eh) {
2166 0 : nns_edge_loge ("Invalid param, given edge handle is null.");
2167 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2168 : }
2169 :
2170 1 : if (!nns_edge_handle_is_valid (eh)) {
2171 0 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
2172 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2173 : }
2174 :
2175 1 : nns_edge_lock (eh);
2176 1 : if (!eh->event_cb) {
2177 0 : nns_edge_loge ("NNStreamer-edge event callback is not registered.");
2178 0 : nns_edge_unlock (eh);
2179 0 : return NNS_EDGE_ERROR_CONNECTION_FAILURE;
2180 : }
2181 :
2182 1 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
2183 1 : ret = nns_edge_custom_start_discovery (eh->custom_connection_h);
2184 : }
2185 :
2186 1 : nns_edge_unlock (eh);
2187 :
2188 1 : return ret;
2189 : }
2190 :
2191 : /**
2192 : * @brief Stop discovery connectable devices within the network.
2193 : */
2194 1 : int nns_edge_stop_discovery (nns_edge_h edge_h)
2195 : {
2196 : nns_edge_handle_s *eh;
2197 1 : int ret = NNS_EDGE_ERROR_NONE;
2198 :
2199 1 : eh = (nns_edge_handle_s *) edge_h;
2200 1 : if (!eh) {
2201 0 : nns_edge_loge ("Invalid param, given edge handle is null.");
2202 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2203 : }
2204 :
2205 1 : if (!nns_edge_handle_is_valid (eh)) {
2206 0 : nns_edge_loge ("Invalid param, given edge handle is invalid.");
2207 0 : return NNS_EDGE_ERROR_INVALID_PARAMETER;
2208 : }
2209 :
2210 1 : nns_edge_lock (eh);
2211 :
2212 1 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
2213 1 : ret = nns_edge_custom_stop_discovery (eh->custom_connection_h);
2214 : }
2215 :
2216 1 : nns_edge_unlock (eh);
2217 :
2218 1 : return ret;
2219 : }
2220 :
|