Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file ml-api-service-training-offloading.c
6 : * @date 5 Apr 2024
7 : * @brief ML training offloading service of NNStreamer/Service C-API
8 : * @see https://github.com/nnstreamer/api
9 : * @author Hyunil Park <hyunil46.park@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #include <glib.h>
14 : #include <json-glib/json-glib.h>
15 : #include <nnstreamer-edge.h>
16 :
17 : #include "ml-api-internal.h"
18 : #include "ml-api-service.h"
19 : #include "ml-api-service-training-offloading.h"
20 :
21 : /** It(@~~@) will be replaced with the path set by the app. */
22 : #define APP_RW_PATH "@APP_RW_PATH@"
23 : #define REMOTE_APP_RW_PATH "@REMOTE_APP_RW_PATH@"
24 : /** combined with trained model file name set in conf */
25 : #define TRAINED_MODEL_FILE "@TRAINED_MODEL_FILE@"
26 :
27 : /** default receive time limit (second) */
28 : #define DEFAULT_TIME_LIMIT 10
29 :
30 : /**
31 : * @brief Internal enumeration for ml-service training offloading types.
32 : */
33 : typedef enum
34 : {
35 : ML_TRAINING_OFFLOADING_TYPE_UNKNOWN = 0,
36 : ML_TRAINING_OFFLOADING_TYPE_SENDER,
37 : ML_TRAINING_OFFLOADING_TYPE_RECEIVER,
38 :
39 : ML_TRAINING_OFFLOADING_TYPE_MAX,
40 : } ml_training_offloaing_type_e;
41 :
42 : /**
43 : * @brief Internal structure for ml-service training offloading handle.
44 : */
45 : typedef struct
46 : {
47 : ml_training_offloaing_type_e type;
48 : ml_pipeline_h pipeline_h;
49 :
50 : gchar *receiver_pipe_json_str; /** @TRAINED_MODEL_FILE@ and @REMOTE_APP_RW_PATH@ in the receiver pipeline is converted to model_config_path, model_path, and data_path. */
51 : gchar *receiver_pipe;
52 : gchar *sender_pipe;
53 : gchar *trained_model_path; /* reply to remote sender */
54 : gchar *path; /* Readable and writable path set by the app */
55 :
56 : gboolean is_received;
57 : gint time_limit; /* second, For receiving the data necessary for training */
58 : GMutex received_lock;
59 : GCond received_cond;
60 : GThread *received_thread;
61 :
62 : GHashTable *transfer_data_table;
63 : GHashTable *node_table;
64 : } ml_training_services_s;
65 :
66 : /**
67 : * @brief Internal function to check offloading mode and get private data for training.
68 : */
69 : static int
70 42 : _training_offloading_get_priv (ml_service_s * mls,
71 : ml_training_services_s ** training_s)
72 : {
73 42 : ml_service_offloading_mode_e mode = ML_SERVICE_OFFLOADING_MODE_NONE;
74 : int ret;
75 :
76 42 : ret = _ml_service_offloading_get_mode (mls, &mode, (void **) training_s);
77 42 : if (ret != ML_ERROR_NONE) {
78 42 : _ml_error_report_return (ret,
79 : "Failed to get offloading mode and private data.");
80 : }
81 :
82 40 : if (mode != ML_SERVICE_OFFLOADING_MODE_TRAINING || *training_s == NULL) {
83 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
84 : "The ml service is not training mode.");
85 : }
86 :
87 40 : return ML_ERROR_NONE;
88 : }
89 :
90 : /**
91 : * @brief Internal function to release pipeline node info.
92 : */
93 : static void
94 2 : _training_offloading_node_info_free (gpointer data)
95 : {
96 2 : ml_service_node_info_s *node_info = (ml_service_node_info_s *) data;
97 :
98 2 : if (!node_info)
99 0 : return;
100 :
101 2 : g_free (node_info->name);
102 2 : g_free (node_info);
103 : }
104 :
105 : /**
106 : * @brief Internal function to create node info in pipeline.
107 : */
108 : static ml_service_node_info_s *
109 2 : _training_offloading_node_info_new (ml_service_s * mls,
110 : const gchar * name, ml_service_node_type_e type)
111 : {
112 : ml_service_node_info_s *node_info;
113 2 : ml_training_services_s *training_s = NULL;
114 : int ret;
115 :
116 4 : g_return_val_if_fail (name != NULL, NULL);
117 :
118 2 : ret = _training_offloading_get_priv (mls, &training_s);
119 2 : g_return_val_if_fail (ret == ML_ERROR_NONE, NULL);
120 :
121 2 : if (g_hash_table_lookup (training_s->node_table, name)) {
122 0 : _ml_error_report_return (NULL,
123 : "Cannot add duplicated node '%s' in ml-service pipeline.", name);
124 : }
125 :
126 2 : node_info = g_try_new0 (ml_service_node_info_s, 1);
127 2 : if (!node_info) {
128 0 : _ml_error_report_return (NULL,
129 : "Failed to allocate new memory for node info in ml-service pipeline. Out of memory?");
130 : }
131 :
132 2 : node_info->name = g_strdup (name);
133 2 : node_info->type = type;
134 2 : node_info->mls = mls;
135 :
136 2 : g_hash_table_insert (training_s->node_table, g_strdup (name), node_info);
137 :
138 2 : return node_info;
139 : }
140 :
141 : /**
142 : * @brief Internal function to parse configuration file.
143 : */
144 : static int
145 7 : _training_offloading_conf_parse_json (ml_service_s * mls, JsonObject * object)
146 : {
147 7 : ml_training_services_s *training_s = NULL;
148 : JsonObject *training_obj, *data_obj;
149 : JsonNode *training_node, *data_node, *pipeline_node;
150 : const gchar *key, *val;
151 7 : gchar *transfer_data = NULL;
152 : GList *list, *iter;
153 : int ret;
154 :
155 14 : g_return_val_if_fail (object != NULL, ML_ERROR_INVALID_PARAMETER);
156 :
157 7 : ret = _training_offloading_get_priv (mls, &training_s);
158 7 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
159 :
160 7 : val = _ml_service_get_json_string_member (object, "node-type");
161 :
162 7 : if (g_ascii_strcasecmp (val, "sender") == 0) {
163 1 : training_s->type = ML_TRAINING_OFFLOADING_TYPE_SENDER;
164 6 : } else if (g_ascii_strcasecmp (val, "receiver") == 0) {
165 6 : training_s->type = ML_TRAINING_OFFLOADING_TYPE_RECEIVER;
166 : } else {
167 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
168 : "The given param, \"node-type\" is invalid.");
169 : }
170 :
171 7 : training_node = json_object_get_member (object, "training");
172 7 : training_obj = json_node_get_object (training_node);
173 :
174 7 : if (json_object_has_member (training_obj, "time-limit")) {
175 6 : training_s->time_limit =
176 6 : (gint) json_object_get_int_member (training_obj, "time-limit");
177 : } else {
178 1 : _ml_logw
179 : ("The default time-limit(10 sec) is set because `time-limit` is not set.");
180 : }
181 :
182 7 : val = _ml_service_get_json_string_member (training_obj, "sender-pipeline");
183 7 : training_s->sender_pipe = g_strdup (val);
184 :
185 7 : if (!json_object_has_member (training_obj, "transfer-data")) {
186 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
187 : "The given param, \"transfer-data\" is invalid.");
188 : }
189 :
190 7 : data_node = json_object_get_member (training_obj, "transfer-data");
191 7 : data_obj = json_node_get_object (data_node);
192 7 : list = json_object_get_members (data_obj);
193 7 : if (!list) {
194 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
195 : "Failed to get transfer data table");
196 : }
197 :
198 16 : for (iter = list; iter != NULL; iter = g_list_next (iter)) {
199 9 : key = iter->data;
200 :
201 9 : if (!STR_IS_VALID (key)) {
202 0 : _ml_error_report
203 : ("The parameter, 'key' is invalid. It should be a valid string.");
204 0 : ret = ML_ERROR_INVALID_PARAMETER;
205 0 : goto error;
206 : }
207 :
208 9 : val = _ml_service_get_json_string_member (data_obj, key);
209 :
210 9 : if (STR_IS_VALID (val)) {
211 8 : transfer_data = g_strdup (val);
212 : } else {
213 : /* pipeline is a JSON string */
214 1 : pipeline_node = json_object_get_member (data_obj, key);
215 1 : transfer_data = json_to_string (pipeline_node, TRUE);
216 :
217 1 : if (!g_strstr_len (transfer_data, -1, "pipeline")) {
218 0 : g_free (transfer_data);
219 :
220 0 : _ml_error_report
221 : ("The parameter, 'val' is invalid. It should be a valid string.");
222 0 : ret = ML_ERROR_INVALID_PARAMETER;
223 0 : goto error;
224 : }
225 : }
226 :
227 9 : g_hash_table_insert (training_s->transfer_data_table, g_strdup (key),
228 : transfer_data);
229 : }
230 :
231 7 : error:
232 7 : g_list_free (list);
233 :
234 7 : if (ret == ML_ERROR_NONE) {
235 : /* Since we are only sending the trained model now, there is only 1 item in the list. */
236 7 : if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER)
237 12 : training_s->trained_model_path = g_strdup (transfer_data);
238 : }
239 :
240 7 : return ret;
241 : }
242 :
243 : /**
244 : * @brief Internal function to parse the node info in pipeline.
245 : */
246 : static int
247 2 : _training_offloading_conf_parse_pipeline_node (ml_service_s * mls,
248 : JsonNode * node, ml_service_node_type_e type)
249 : {
250 2 : int ret = ML_ERROR_NONE;
251 2 : guint i, array_len = 1;
252 : const gchar *name;
253 2 : JsonArray *array = NULL;
254 : JsonObject *node_object;
255 2 : ml_service_node_info_s *node_info = NULL;
256 2 : ml_training_services_s *training_s = NULL;
257 :
258 4 : g_return_val_if_fail (node != NULL, ML_ERROR_INVALID_PARAMETER);
259 :
260 2 : ret = _training_offloading_get_priv (mls, &training_s);
261 2 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
262 :
263 2 : if (JSON_NODE_HOLDS_ARRAY (node)) {
264 2 : array = json_node_get_array (node);
265 2 : array_len = json_array_get_length (array);
266 : }
267 :
268 4 : for (i = 0; i < array_len; i++) {
269 2 : if (array)
270 2 : node_object = json_array_get_object_element (array, i);
271 : else
272 0 : node_object = json_node_get_object (node);
273 :
274 2 : if (!json_object_has_member (node_object, "name")) {
275 0 : _ml_error_report_return (ret,
276 : "Failed to parse configuration file, cannot get the name for pipeline node.");
277 : }
278 :
279 2 : name = json_object_get_string_member (node_object, "name");
280 :
281 2 : node_info = _training_offloading_node_info_new (mls, name, type);
282 2 : if (!node_info) {
283 0 : _ml_error_report_return_continue (ML_ERROR_INVALID_PARAMETER,
284 : "Failed to parse configuration file, cannot add new node information.");
285 : }
286 :
287 2 : switch (type) {
288 1 : case ML_SERVICE_NODE_TYPE_TRAINING:
289 2 : ret = ml_pipeline_element_get_handle (training_s->pipeline_h, name,
290 1 : &node_info->handle);
291 1 : break;
292 1 : case ML_SERVICE_NODE_TYPE_OUTPUT:
293 2 : ret = ml_pipeline_sink_register (training_s->pipeline_h, name,
294 1 : _ml_service_pipeline_sink_cb, node_info, &node_info->handle);
295 1 : break;
296 0 : default:
297 0 : ret = ML_ERROR_INVALID_PARAMETER;
298 0 : break;
299 : }
300 :
301 2 : if (ret != ML_ERROR_NONE) {
302 0 : _ml_error_report_return (ret,
303 : "Failed to parse configuration file, cannot get the handle for pipeline node.");
304 : }
305 : }
306 :
307 2 : return ret;
308 : }
309 :
310 : /**
311 : * @brief Internal function to parse the pipeline in the configuration file.
312 : */
313 : static int
314 1 : _training_offloading_conf_parse_pipeline (ml_service_s * mls, JsonObject * pipe)
315 : {
316 1 : int ret = ML_ERROR_NONE;
317 : JsonNode *node;
318 :
319 1 : g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
320 1 : g_return_val_if_fail (pipe != NULL, ML_ERROR_INVALID_PARAMETER);
321 :
322 1 : if (json_object_has_member (pipe, "output_node")) {
323 1 : node = json_object_get_member (pipe, "output_node");
324 1 : ret = _training_offloading_conf_parse_pipeline_node (mls, node,
325 : ML_SERVICE_NODE_TYPE_OUTPUT);
326 1 : if (ret != ML_ERROR_NONE) {
327 0 : _ml_error_report_return (ret,
328 : "Failed to parse configuration file, cannot get the input node.");
329 : }
330 : }
331 :
332 1 : if (json_object_has_member (pipe, "training_node")) {
333 1 : node = json_object_get_member (pipe, "training_node");
334 1 : ret = _training_offloading_conf_parse_pipeline_node (mls, node,
335 : ML_SERVICE_NODE_TYPE_TRAINING);
336 1 : if (ret != ML_ERROR_NONE) {
337 0 : _ml_error_report_return (ret,
338 : "Failed to parse configuration file, cannot get the training node.");
339 : }
340 : }
341 :
342 1 : return ret;
343 : }
344 :
345 : /**
346 : * @brief Internal function to create ml-service training offloading handle.
347 : */
348 : static int
349 7 : _training_offloading_create (ml_service_s * mls)
350 : {
351 7 : ml_training_services_s *training_s = NULL;
352 :
353 7 : g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
354 :
355 7 : training_s = g_try_new0 (ml_training_services_s, 1);
356 7 : if (training_s == NULL) {
357 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
358 : "Failed to allocate memory for the service handle's private data. Out of memory?");
359 : }
360 :
361 7 : g_cond_init (&training_s->received_cond);
362 7 : g_mutex_init (&training_s->received_lock);
363 :
364 7 : training_s->type = ML_TRAINING_OFFLOADING_TYPE_UNKNOWN;
365 7 : training_s->time_limit = DEFAULT_TIME_LIMIT;
366 :
367 7 : _ml_service_offloading_set_mode (mls,
368 : ML_SERVICE_OFFLOADING_MODE_TRAINING, training_s);
369 :
370 7 : training_s->transfer_data_table =
371 7 : g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
372 7 : if (!training_s->transfer_data_table) {
373 0 : _ml_service_training_offloading_destroy (mls);
374 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
375 : "Failed to allocate memory for the data table. Out of memory?");
376 : }
377 :
378 7 : training_s->node_table =
379 7 : g_hash_table_new_full (g_str_hash, g_str_equal, g_free,
380 : _training_offloading_node_info_free);
381 7 : if (!training_s->node_table) {
382 0 : _ml_service_training_offloading_destroy (mls);
383 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
384 : "Failed to allocate memory for the node table. Out of memory?");
385 : }
386 :
387 7 : return ML_ERROR_NONE;
388 : }
389 :
390 : /**
391 : * @brief Internal function to create ml-service training offloading handle.
392 : */
393 : int
394 9 : _ml_service_training_offloading_create (ml_service_s * mls,
395 : JsonObject * offloading)
396 : {
397 9 : int ret = ML_ERROR_NONE;
398 :
399 9 : g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
400 8 : g_return_val_if_fail (offloading != NULL, ML_ERROR_INVALID_PARAMETER);
401 :
402 7 : ret = _training_offloading_create (mls);
403 7 : if (ret != ML_ERROR_NONE) {
404 0 : _ml_error_report_return_continue (ret,
405 : "Failed to create ml-service for training offloading.");
406 : }
407 :
408 7 : ret = _training_offloading_conf_parse_json (mls, offloading);
409 7 : if (ret != ML_ERROR_NONE) {
410 0 : _ml_service_training_offloading_destroy (mls);
411 0 : _ml_error_report_return (ret,
412 : "Failed to parse the configuration file for training offloading.");
413 : }
414 :
415 7 : return ML_ERROR_NONE;
416 : }
417 :
418 : /**
419 : * @brief Request service to ml-service-offloading.
420 : */
421 : static int
422 4 : _request_offloading_service (ml_service_s * mls,
423 : const gchar * service_name, void *data, size_t len)
424 : {
425 4 : int ret = ML_ERROR_NONE;
426 :
427 4 : g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
428 4 : g_return_val_if_fail (service_name != NULL, ML_ERROR_INVALID_PARAMETER);
429 4 : g_return_val_if_fail (data != NULL, ML_ERROR_INVALID_PARAMETER);
430 4 : g_return_val_if_fail (len > 0, ML_ERROR_INVALID_PARAMETER);
431 :
432 4 : ret = _ml_service_offloading_request_raw (mls, service_name, data, len);
433 4 : if (ret != ML_ERROR_NONE) {
434 0 : _ml_error_report ("Failed to request service '%s'.)", service_name);
435 : }
436 :
437 4 : return ret;
438 : }
439 :
440 : /**
441 : * @brief Request all services to ml-service offloading.
442 : */
443 : static int
444 1 : _training_offloading_services_request (ml_service_s * mls)
445 : {
446 1 : ml_training_services_s *training_s = NULL;
447 1 : int ret = ML_ERROR_NONE;
448 : GList *list, *iter;
449 1 : gchar *transfer_data = NULL, *service_name = NULL;
450 1 : gchar *contents = NULL, *pipeline = NULL;
451 1 : gsize len = 0;
452 :
453 1 : ret = _training_offloading_get_priv (mls, &training_s);
454 2 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
455 :
456 1 : _ml_logd ("path set by app:%s ", training_s->path);
457 :
458 1 : list = g_hash_table_get_keys (training_s->transfer_data_table);
459 1 : if (!list) {
460 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
461 : "Failed to get transfer data table");
462 : }
463 :
464 4 : for (iter = list; iter != NULL; iter = g_list_next (iter)) {
465 3 : const gchar *name = iter->data;
466 :
467 : transfer_data =
468 3 : g_strdup (g_hash_table_lookup (training_s->transfer_data_table, name));
469 :
470 3 : if (g_strstr_len (transfer_data, -1, APP_RW_PATH)) {
471 4 : transfer_data = _ml_replace_string (transfer_data, APP_RW_PATH,
472 2 : training_s->path, NULL, NULL);
473 :
474 2 : _ml_logd ("transfer_data:%s", transfer_data);
475 :
476 2 : if (!g_file_get_contents (transfer_data, &contents, &len, NULL)) {
477 0 : _ml_error_report ("Failed to read file:%s", transfer_data);
478 0 : goto error;
479 : }
480 :
481 2 : ret = _request_offloading_service (mls, name, contents, len);
482 2 : if (ret != ML_ERROR_NONE) {
483 0 : _ml_error_report ("Failed to request service '%s'.", name);
484 0 : goto error;
485 : }
486 :
487 2 : g_free (transfer_data);
488 2 : g_free (contents);
489 2 : transfer_data = NULL;
490 2 : contents = NULL;
491 1 : } else if (g_strstr_len (transfer_data, -1, "pipeline")) {
492 2 : service_name = g_strdup (iter->data);
493 1 : pipeline = g_strdup (transfer_data);
494 1 : transfer_data = NULL;
495 : }
496 : }
497 :
498 1 : if (pipeline) {
499 : /**
500 : * The remote sender sends the last in the pipeline.
501 : * When the pipeline arrives, the remote receiver determines that the sender has sent all the necessary files specified in the pipeline.
502 : * pipeline description must be sent last.
503 : */
504 1 : _ml_logd
505 : ("In case of pipeline, @REMOTE_APP_RW_PATH@ will be replaced at the remote receiver.\n transfer_data:pipeline(%s),",
506 : pipeline);
507 2 : ret = _request_offloading_service (mls, service_name, pipeline,
508 1 : strlen (pipeline) + 1);
509 1 : if (ret != ML_ERROR_NONE) {
510 0 : _ml_error_report ("Failed to request service(%s)", service_name);
511 : }
512 : }
513 :
514 1 : error:
515 1 : g_free (service_name);
516 1 : g_free (transfer_data);
517 1 : g_free (contents);
518 1 : g_list_free (list);
519 :
520 1 : return ret;
521 : }
522 :
523 : /**
524 : * @brief Thread for checking receive data.
525 : */
526 : static gpointer
527 2 : _check_received_data_thread (gpointer data)
528 : {
529 2 : ml_training_services_s *training_s = (ml_training_services_s *) data;
530 : gint usec;
531 :
532 2 : g_return_val_if_fail (training_s != NULL, NULL);
533 :
534 2 : usec = training_s->time_limit * 1000000;
535 102 : while (usec > 0) {
536 101 : g_usleep (100000);
537 101 : if (training_s->receiver_pipe_json_str != NULL) {
538 1 : _ml_logd
539 : ("Lock to receive pipeline JSON string required for model training.");
540 1 : g_mutex_lock (&training_s->received_lock);
541 1 : training_s->is_received = TRUE;
542 1 : _ml_logd ("receive_pipe:%s", training_s->receiver_pipe_json_str);
543 1 : _ml_logd
544 : ("Now pipeline has arrived, The remote sender send the pipeline last, so probably received all the data."
545 : "If there are no files required for the pipeline, a runtime error occurs.");
546 1 : g_cond_signal (&training_s->received_cond);
547 1 : g_mutex_unlock (&training_s->received_lock);
548 1 : return NULL;
549 : }
550 100 : usec -= 100000;
551 : }
552 :
553 1 : _ml_loge ("Required data is null, receive_pipe:%s",
554 : training_s->receiver_pipe_json_str);
555 1 : g_mutex_lock (&training_s->received_lock);
556 1 : training_s->is_received = FALSE;
557 1 : g_cond_signal (&training_s->received_cond);
558 1 : g_mutex_unlock (&training_s->received_lock);
559 1 : return NULL;
560 : }
561 :
562 : /**
563 : * @brief Check if all necessary data is received.
564 : */
565 : static gboolean
566 2 : _training_offloading_check_received_data (ml_training_services_s * training_s)
567 : {
568 2 : gboolean is_received = FALSE;
569 :
570 2 : g_return_val_if_fail (training_s != NULL, FALSE);
571 :
572 2 : training_s->received_thread = g_thread_new ("check_received_file",
573 : _check_received_data_thread, training_s);
574 :
575 2 : g_mutex_lock (&training_s->received_lock);
576 :
577 3 : while (!training_s->is_received) {
578 2 : _ml_logd ("Wait to receive all data needed for model training.");
579 2 : g_cond_wait (&training_s->received_cond, &training_s->received_lock);
580 2 : if (training_s->is_received == FALSE)
581 1 : break;
582 : }
583 :
584 2 : is_received = training_s->is_received;
585 2 : g_mutex_unlock (&training_s->received_lock);
586 2 : _ml_logd ("unlock, receive all data");
587 :
588 2 : return is_received;
589 : }
590 :
591 : /**
592 : * @brief replace path.
593 : */
594 : static void
595 2 : _training_offloading_replace_pipeline_data_path (ml_service_s * mls)
596 : {
597 2 : ml_training_services_s *training_s = NULL;
598 : int ret;
599 :
600 2 : ret = _training_offloading_get_priv (mls, &training_s);
601 2 : g_return_if_fail (ret == ML_ERROR_NONE);
602 :
603 2 : if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_SENDER) {
604 1 : if (training_s->sender_pipe) {
605 2 : training_s->sender_pipe =
606 2 : _ml_replace_string (training_s->sender_pipe, APP_RW_PATH,
607 1 : training_s->path, NULL, NULL);
608 1 : _ml_logd ("@APP_RW_PATH@ is replaced, sender_pipe:%s",
609 : training_s->sender_pipe);
610 : }
611 : } else {
612 1 : if (training_s->receiver_pipe_json_str) {
613 2 : training_s->trained_model_path =
614 2 : _ml_replace_string (training_s->trained_model_path, APP_RW_PATH,
615 1 : training_s->path, NULL, NULL);
616 2 : training_s->receiver_pipe_json_str =
617 2 : _ml_replace_string (training_s->receiver_pipe_json_str,
618 1 : REMOTE_APP_RW_PATH, training_s->path, NULL, NULL);
619 2 : training_s->receiver_pipe_json_str =
620 2 : _ml_replace_string (training_s->receiver_pipe_json_str,
621 1 : TRAINED_MODEL_FILE, training_s->trained_model_path, NULL, NULL);
622 1 : _ml_logd
623 : ("@REMOTE_APP_RW_PATH@ and @TRAINED_MODEL_FILE@ are replaced, receiver_pipe JSON string: %s",
624 : training_s->receiver_pipe_json_str);
625 : }
626 : }
627 : }
628 :
629 : /**
630 : * @brief Set path in ml-service training offloading handle.
631 : */
632 : int
633 6 : _ml_service_training_offloading_set_path (ml_service_s * mls,
634 : const gchar * path)
635 : {
636 6 : int ret = ML_ERROR_NONE;
637 6 : ml_training_services_s *training_s = NULL;
638 :
639 11 : g_return_val_if_fail (path != NULL, ML_ERROR_INVALID_PARAMETER);
640 :
641 5 : ret = _training_offloading_get_priv (mls, &training_s);
642 5 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
643 :
644 5 : g_free (training_s->path);
645 5 : training_s->path = g_strdup (path);
646 :
647 5 : return ret;
648 : }
649 :
650 : /**
651 : * @brief Prepare ml training offloading service as sender.
652 : */
653 : static int
654 1 : _ml_service_training_offloading_prepare_sender (ml_service_s * mls,
655 : ml_training_services_s * training_s)
656 : {
657 1 : int ret = ML_ERROR_NONE;
658 :
659 1 : ret = _training_offloading_services_request (mls);
660 1 : if (ret != ML_ERROR_NONE) {
661 0 : _ml_error_report_return (ret, "Failed to request service.");
662 : }
663 1 : _training_offloading_replace_pipeline_data_path (mls);
664 :
665 1 : ret = ml_pipeline_construct (training_s->sender_pipe, NULL, NULL,
666 : &training_s->pipeline_h);
667 1 : if (ML_ERROR_NONE != ret) {
668 0 : _ml_error_report_return (ret, "Failed to construct pipeline.");
669 : }
670 :
671 1 : return ret;
672 : }
673 :
674 : /**
675 : * @brief Prepare ml training offloading service as receiver.
676 : */
677 : static int
678 2 : _ml_service_training_offloading_prepare_receiver (ml_service_s * mls,
679 : ml_training_services_s * training_s)
680 : {
681 2 : int ret = ML_ERROR_NONE;
682 2 : g_autoptr (JsonNode) pipeline_node = NULL;
683 : JsonObject *pipeline_obj;
684 : JsonObject *pipe;
685 :
686 : /* checking if all required files are received */
687 2 : if (!_training_offloading_check_received_data (training_s)) {
688 1 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
689 : "Failed to receive the required data");
690 : }
691 1 : _training_offloading_replace_pipeline_data_path (mls);
692 :
693 1 : pipeline_node = json_from_string (training_s->receiver_pipe_json_str, NULL);
694 1 : if (!pipeline_node) {
695 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
696 : "Failed to parse the json string, %s.",
697 : training_s->receiver_pipe_json_str);
698 : }
699 :
700 1 : pipeline_obj = json_node_get_object (pipeline_node);
701 1 : if (!pipeline_obj) {
702 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
703 : "Failed to get the json object from the json node.");
704 : }
705 1 : if (!json_object_has_member (pipeline_obj, "pipeline")) {
706 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
707 : "Failed to parse configuration file, cannot get the pipeline JSON object.");
708 : }
709 :
710 1 : pipe = json_object_get_object_member (pipeline_obj, "pipeline");
711 1 : if (json_object_has_member (pipe, "description")) {
712 1 : training_s->receiver_pipe =
713 2 : g_strdup (_ml_service_get_json_string_member (pipe, "description"));
714 : } else {
715 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
716 : "Failed to parse configuration file, cannot get the pipeline description.");
717 : }
718 :
719 1 : ret = ml_pipeline_construct (training_s->receiver_pipe, NULL, NULL,
720 : &training_s->pipeline_h);
721 1 : if (ML_ERROR_NONE != ret) {
722 0 : _ml_error_report_return (ret, "Failed to construct pipeline.");
723 : }
724 :
725 1 : ret = _training_offloading_conf_parse_pipeline (mls, pipe);
726 1 : if (ret != ML_ERROR_NONE) {
727 0 : return ret;
728 : }
729 :
730 1 : return ret;
731 : }
732 :
733 : /**
734 : * @brief Start ml training offloading service.
735 : */
736 : int
737 4 : _ml_service_training_offloading_start (ml_service_s * mls)
738 : {
739 4 : int ret = ML_ERROR_NONE;
740 4 : ml_training_services_s *training_s = NULL;
741 :
742 4 : ret = _training_offloading_get_priv (mls, &training_s);
743 7 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
744 :
745 3 : if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_SENDER) {
746 1 : ret = _ml_service_training_offloading_prepare_sender (mls, training_s);
747 2 : } else if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER) {
748 2 : ret = _ml_service_training_offloading_prepare_receiver (mls, training_s);
749 : } else {
750 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
751 : "The node type information in JSON is incorrect.");
752 : }
753 :
754 3 : if (ret != ML_ERROR_NONE)
755 1 : return ret;
756 :
757 2 : ret = ml_pipeline_start (training_s->pipeline_h);
758 2 : if (ret != ML_ERROR_NONE) {
759 0 : _ml_error_report_return (ret, "Failed to start ml pipeline.");
760 : }
761 :
762 2 : return ret;
763 : }
764 :
765 : /**
766 : * @brief Stop ml training offloading service.
767 : */
768 : int
769 3 : _ml_service_training_offloading_stop (ml_service_s * mls)
770 : {
771 3 : int ret = ML_ERROR_NONE;
772 3 : ml_training_services_s *training_s = NULL;
773 :
774 3 : ret = _training_offloading_get_priv (mls, &training_s);
775 6 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
776 :
777 3 : if (!training_s->pipeline_h) {
778 1 : _ml_error_report_return (ML_ERROR_STREAMS_PIPE,
779 : "Pipeline is not constructed.");
780 : }
781 :
782 2 : ret = ml_pipeline_stop (training_s->pipeline_h);
783 2 : if (ML_ERROR_NONE != ret) {
784 0 : _ml_error_report_return (ret, "Failed to stop pipeline.");
785 : }
786 :
787 2 : return ret;
788 : }
789 :
790 : /**
791 : * @brief Save receiver pipeline description.
792 : */
793 : int
794 4 : _ml_service_training_offloading_process_received_data (ml_service_s * mls,
795 : void *data_h, const gchar * dir_path, const gchar * data, int service_type)
796 : {
797 4 : g_autofree gchar *name = NULL;
798 4 : ml_training_services_s *training_s = NULL;
799 : int ret;
800 :
801 4 : g_return_val_if_fail (data_h != NULL, ML_ERROR_INVALID_PARAMETER);
802 4 : g_return_val_if_fail (dir_path != NULL, ML_ERROR_INVALID_PARAMETER);
803 4 : g_return_val_if_fail (data != NULL, ML_ERROR_INVALID_PARAMETER);
804 :
805 4 : ret = _training_offloading_get_priv (mls, &training_s);
806 4 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
807 :
808 4 : _ml_logd ("Received data, service_type:%d", service_type);
809 :
810 4 : if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER) {
811 3 : if (service_type == ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW) {
812 1 : training_s->receiver_pipe_json_str = g_strdup (data);
813 1 : _ml_logd ("Received JSON string pipeline:%s",
814 : training_s->receiver_pipe_json_str);
815 : }
816 : } else {
817 : /* receive trained model from remote */
818 1 : if (service_type == ML_SERVICE_OFFLOADING_TYPE_REPLY) {
819 1 : ret = nns_edge_data_get_info (data_h, "name", &name);
820 1 : if (NNS_EDGE_ERROR_NONE != ret) {
821 0 : _ml_error_report_return (ret,
822 : "Failed to get name while processing the ml-offloading service.");
823 : }
824 2 : training_s->trained_model_path =
825 1 : g_build_path (G_DIR_SEPARATOR_S, dir_path, name, NULL);
826 1 : _ml_logd ("Reply: name:%s, received trained_model:%s", name,
827 : training_s->trained_model_path);
828 : }
829 : }
830 :
831 4 : return ML_ERROR_NONE;
832 : }
833 :
834 : /**
835 : * @brief Send trained model
836 : */
837 : static void
838 5 : _training_offloading_send_trained_model (ml_service_s * mls)
839 : {
840 5 : ml_training_services_s *training_s = NULL;
841 : GList *list, *iter;
842 5 : gchar *contents = NULL;
843 5 : gsize len = 0;
844 : int ret;
845 :
846 5 : ret = _training_offloading_get_priv (mls, &training_s);
847 10 : g_return_if_fail (ret == ML_ERROR_NONE);
848 :
849 5 : if (training_s->trained_model_path == NULL)
850 0 : return;
851 :
852 5 : if (!g_file_get_contents (training_s->trained_model_path, &contents, &len,
853 : NULL)) {
854 4 : _ml_error_report ("Failed to read file:%s", training_s->trained_model_path);
855 4 : return;
856 : }
857 :
858 1 : list = g_hash_table_get_keys (training_s->transfer_data_table);
859 :
860 1 : if (list) {
861 1 : _ml_logd ("Send trained model");
862 2 : for (iter = list; iter != NULL; iter = g_list_next (iter)) {
863 1 : _request_offloading_service (mls, iter->data, contents, len);
864 : }
865 :
866 1 : g_list_free (list);
867 : } else {
868 0 : _ml_error_report ("Failed to get transfer data table.");
869 : }
870 :
871 1 : g_free (contents);
872 1 : return;
873 : }
874 :
875 : /**
876 : * @brief Internal function to destroy ml-service training offloading data.
877 : */
878 : int
879 7 : _ml_service_training_offloading_destroy (ml_service_s * mls)
880 : {
881 7 : int ret = ML_ERROR_NONE;
882 7 : ml_training_services_s *training_s = NULL;
883 :
884 7 : ret = _training_offloading_get_priv (mls, &training_s);
885 13 : g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
886 :
887 6 : if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER) {
888 : /* reply to remote sender */
889 5 : _training_offloading_send_trained_model (mls);
890 : }
891 :
892 6 : g_cond_clear (&training_s->received_cond);
893 6 : g_mutex_clear (&training_s->received_lock);
894 :
895 6 : if (training_s->received_thread) {
896 2 : g_thread_join (training_s->received_thread);
897 2 : training_s->received_thread = NULL;
898 : }
899 :
900 6 : if (training_s->transfer_data_table) {
901 6 : g_hash_table_destroy (training_s->transfer_data_table);
902 6 : training_s->transfer_data_table = NULL;
903 : }
904 :
905 6 : if (training_s->node_table) {
906 6 : g_hash_table_destroy (training_s->node_table);
907 6 : training_s->node_table = NULL;
908 : }
909 :
910 6 : if (training_s->pipeline_h) {
911 2 : ret = ml_pipeline_destroy (training_s->pipeline_h);
912 2 : if (ret != ML_ERROR_NONE) {
913 0 : _ml_error_report ("Failed to destroy ml pipeline, clear handle anyway.");
914 : }
915 :
916 2 : training_s->pipeline_h = NULL;
917 : }
918 :
919 6 : g_free (training_s->path);
920 6 : training_s->path = NULL;
921 :
922 6 : g_free (training_s->trained_model_path);
923 6 : training_s->trained_model_path = NULL;
924 :
925 6 : g_free (training_s->receiver_pipe_json_str);
926 6 : training_s->receiver_pipe_json_str = NULL;
927 :
928 6 : g_free (training_s->receiver_pipe);
929 6 : training_s->receiver_pipe = NULL;
930 :
931 6 : g_free (training_s->sender_pipe);
932 6 : training_s->sender_pipe = NULL;
933 :
934 6 : g_free (training_s);
935 :
936 6 : _ml_service_offloading_set_mode (mls, ML_SERVICE_OFFLOADING_MODE_NONE, NULL);
937 6 : return ret;
938 : }
|