LCOV - code coverage report
Current view: top level - capi-machine-learning-inference-1.8.6/c/src - ml-api-service-training-offloading.c (source / functions) Coverage Total Hit
Test: ML API 1.8.6-0 nnstreamer/api#7f8530c294f86ec880b29347a861499239d358a1 Lines: 87.6 % 427 374
Test Date: 2025-06-06 05:24:38 Functions: 100.0 % 21 21

            Line data    Source code
       1              : /* SPDX-License-Identifier: Apache-2.0 */
       2              : /**
       3              :  * Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved.
       4              :  *
       5              :  * @file ml-api-service-training-offloading.c
       6              :  * @date 5 Apr 2024
       7              :  * @brief ML training offloading service of NNStreamer/Service C-API
       8              :  * @see https://github.com/nnstreamer/api
       9              :  * @author Hyunil Park <hyunil46.park@samsung.com>
      10              :  * @bug No known bugs except for NYI items
      11              :  */
      12              : 
      13              : #include <glib.h>
      14              : #include <json-glib/json-glib.h>
      15              : #include <nnstreamer-edge.h>
      16              : 
      17              : #include "ml-api-internal.h"
      18              : #include "ml-api-service.h"
      19              : #include "ml-api-service-training-offloading.h"
      20              : 
      21              : /** It(@~~@) will be replaced with the path set by the app. */
      22              : #define APP_RW_PATH "@APP_RW_PATH@"
      23              : #define REMOTE_APP_RW_PATH "@REMOTE_APP_RW_PATH@"
      24              : /** combined with trained model file name set in conf */
      25              : #define TRAINED_MODEL_FILE "@TRAINED_MODEL_FILE@"
      26              : 
      27              : /** default receive time limit (second) */
      28              : #define DEFAULT_TIME_LIMIT 10
      29              : 
      30              : /**
      31              :  * @brief Internal enumeration for ml-service training offloading types.
      32              :  */
      33              : typedef enum
      34              : {
      35              :   ML_TRAINING_OFFLOADING_TYPE_UNKNOWN = 0,
      36              :   ML_TRAINING_OFFLOADING_TYPE_SENDER,
      37              :   ML_TRAINING_OFFLOADING_TYPE_RECEIVER,
      38              : 
      39              :   ML_TRAINING_OFFLOADING_TYPE_MAX,
      40              : } ml_training_offloaing_type_e;
      41              : 
      42              : /**
      43              :  * @brief Internal structure for ml-service training offloading handle.
      44              :  */
      45              : typedef struct
      46              : {
      47              :   ml_training_offloaing_type_e type;
      48              :   ml_pipeline_h pipeline_h;
      49              : 
      50              :   gchar *receiver_pipe_json_str;   /** @TRAINED_MODEL_FILE@ and @REMOTE_APP_RW_PATH@ in the receiver pipeline is converted to model_config_path, model_path, and data_path. */
      51              :   gchar *receiver_pipe;
      52              :   gchar *sender_pipe;
      53              :   gchar *trained_model_path;    /* reply to remote sender */
      54              :   gchar *path;                  /* Readable and writable path set by the app */
      55              : 
      56              :   gboolean is_received;
      57              :   gint time_limit;              /* second, For receiving the data necessary for training */
      58              :   GMutex received_lock;
      59              :   GCond received_cond;
      60              :   GThread *received_thread;
      61              : 
      62              :   GHashTable *transfer_data_table;
      63              :   GHashTable *node_table;
      64              : } ml_training_services_s;
      65              : 
      66              : /**
      67              :  * @brief Internal function to check offloading mode and get private data for training.
      68              :  */
      69              : static int
      70           42 : _training_offloading_get_priv (ml_service_s * mls,
      71              :     ml_training_services_s ** training_s)
      72              : {
      73           42 :   ml_service_offloading_mode_e mode = ML_SERVICE_OFFLOADING_MODE_NONE;
      74              :   int ret;
      75              : 
      76           42 :   ret = _ml_service_offloading_get_mode (mls, &mode, (void **) training_s);
      77           42 :   if (ret != ML_ERROR_NONE) {
      78           42 :     _ml_error_report_return (ret,
      79              :         "Failed to get offloading mode and private data.");
      80              :   }
      81              : 
      82           40 :   if (mode != ML_SERVICE_OFFLOADING_MODE_TRAINING || *training_s == NULL) {
      83            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
      84              :         "The ml service is not training mode.");
      85              :   }
      86              : 
      87           40 :   return ML_ERROR_NONE;
      88              : }
      89              : 
      90              : /**
      91              :  * @brief Internal function to release pipeline node info.
      92              :  */
      93              : static void
      94            2 : _training_offloading_node_info_free (gpointer data)
      95              : {
      96            2 :   ml_service_node_info_s *node_info = (ml_service_node_info_s *) data;
      97              : 
      98            2 :   if (!node_info)
      99            0 :     return;
     100              : 
     101            2 :   g_free (node_info->name);
     102            2 :   g_free (node_info);
     103              : }
     104              : 
     105              : /**
     106              :  * @brief Internal function to create node info in pipeline.
     107              :  */
     108              : static ml_service_node_info_s *
     109            2 : _training_offloading_node_info_new (ml_service_s * mls,
     110              :     const gchar * name, ml_service_node_type_e type)
     111              : {
     112              :   ml_service_node_info_s *node_info;
     113            2 :   ml_training_services_s *training_s = NULL;
     114              :   int ret;
     115              : 
     116            4 :   g_return_val_if_fail (name != NULL, NULL);
     117              : 
     118            2 :   ret = _training_offloading_get_priv (mls, &training_s);
     119            2 :   g_return_val_if_fail (ret == ML_ERROR_NONE, NULL);
     120              : 
     121            2 :   if (g_hash_table_lookup (training_s->node_table, name)) {
     122            0 :     _ml_error_report_return (NULL,
     123              :         "Cannot add duplicated node '%s' in ml-service pipeline.", name);
     124              :   }
     125              : 
     126            2 :   node_info = g_try_new0 (ml_service_node_info_s, 1);
     127            2 :   if (!node_info) {
     128            0 :     _ml_error_report_return (NULL,
     129              :         "Failed to allocate new memory for node info in ml-service pipeline. Out of memory?");
     130              :   }
     131              : 
     132            2 :   node_info->name = g_strdup (name);
     133            2 :   node_info->type = type;
     134            2 :   node_info->mls = mls;
     135              : 
     136            2 :   g_hash_table_insert (training_s->node_table, g_strdup (name), node_info);
     137              : 
     138            2 :   return node_info;
     139              : }
     140              : 
     141              : /**
     142              :  * @brief Internal function to parse configuration file.
     143              :  */
     144              : static int
     145            7 : _training_offloading_conf_parse_json (ml_service_s * mls, JsonObject * object)
     146              : {
     147            7 :   ml_training_services_s *training_s = NULL;
     148              :   JsonObject *training_obj, *data_obj;
     149              :   JsonNode *training_node, *data_node, *pipeline_node;
     150              :   const gchar *key, *val;
     151            7 :   gchar *transfer_data = NULL;
     152              :   GList *list, *iter;
     153              :   int ret;
     154              : 
     155           14 :   g_return_val_if_fail (object != NULL, ML_ERROR_INVALID_PARAMETER);
     156              : 
     157            7 :   ret = _training_offloading_get_priv (mls, &training_s);
     158            7 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     159              : 
     160            7 :   val = _ml_service_get_json_string_member (object, "node-type");
     161              : 
     162            7 :   if (g_ascii_strcasecmp (val, "sender") == 0) {
     163            1 :     training_s->type = ML_TRAINING_OFFLOADING_TYPE_SENDER;
     164            6 :   } else if (g_ascii_strcasecmp (val, "receiver") == 0) {
     165            6 :     training_s->type = ML_TRAINING_OFFLOADING_TYPE_RECEIVER;
     166              :   } else {
     167            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     168              :         "The given param, \"node-type\" is invalid.");
     169              :   }
     170              : 
     171            7 :   training_node = json_object_get_member (object, "training");
     172            7 :   training_obj = json_node_get_object (training_node);
     173              : 
     174            7 :   if (json_object_has_member (training_obj, "time-limit")) {
     175            6 :     training_s->time_limit =
     176            6 :         (gint) json_object_get_int_member (training_obj, "time-limit");
     177              :   } else {
     178            1 :     _ml_logw
     179              :         ("The default time-limit(10 sec) is set because `time-limit` is not set.");
     180              :   }
     181              : 
     182            7 :   val = _ml_service_get_json_string_member (training_obj, "sender-pipeline");
     183            7 :   training_s->sender_pipe = g_strdup (val);
     184              : 
     185            7 :   if (!json_object_has_member (training_obj, "transfer-data")) {
     186            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     187              :         "The given param, \"transfer-data\" is invalid.");
     188              :   }
     189              : 
     190            7 :   data_node = json_object_get_member (training_obj, "transfer-data");
     191            7 :   data_obj = json_node_get_object (data_node);
     192            7 :   list = json_object_get_members (data_obj);
     193            7 :   if (!list) {
     194            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     195              :         "Failed to get transfer data table");
     196              :   }
     197              : 
     198           16 :   for (iter = list; iter != NULL; iter = g_list_next (iter)) {
     199            9 :     key = iter->data;
     200              : 
     201            9 :     if (!STR_IS_VALID (key)) {
     202            0 :       _ml_error_report
     203              :           ("The parameter, 'key' is invalid. It should be a valid string.");
     204            0 :       ret = ML_ERROR_INVALID_PARAMETER;
     205            0 :       goto error;
     206              :     }
     207              : 
     208            9 :     val = _ml_service_get_json_string_member (data_obj, key);
     209              : 
     210            9 :     if (STR_IS_VALID (val)) {
     211            8 :       transfer_data = g_strdup (val);
     212              :     } else {
     213              :       /* pipeline is a JSON string */
     214            1 :       pipeline_node = json_object_get_member (data_obj, key);
     215            1 :       transfer_data = json_to_string (pipeline_node, TRUE);
     216              : 
     217            1 :       if (!g_strstr_len (transfer_data, -1, "pipeline")) {
     218            0 :         g_free (transfer_data);
     219              : 
     220            0 :         _ml_error_report
     221              :             ("The parameter, 'val' is invalid. It should be a valid string.");
     222            0 :         ret = ML_ERROR_INVALID_PARAMETER;
     223            0 :         goto error;
     224              :       }
     225              :     }
     226              : 
     227            9 :     g_hash_table_insert (training_s->transfer_data_table, g_strdup (key),
     228              :         transfer_data);
     229              :   }
     230              : 
     231            7 : error:
     232            7 :   g_list_free (list);
     233              : 
     234            7 :   if (ret == ML_ERROR_NONE) {
     235              :     /* Since we are only sending the trained model now, there is only 1 item in the list. */
     236            7 :     if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER)
     237           12 :       training_s->trained_model_path = g_strdup (transfer_data);
     238              :   }
     239              : 
     240            7 :   return ret;
     241              : }
     242              : 
     243              : /**
     244              :  * @brief Internal function to parse the node info in pipeline.
     245              :  */
     246              : static int
     247            2 : _training_offloading_conf_parse_pipeline_node (ml_service_s * mls,
     248              :     JsonNode * node, ml_service_node_type_e type)
     249              : {
     250            2 :   int ret = ML_ERROR_NONE;
     251            2 :   guint i, array_len = 1;
     252              :   const gchar *name;
     253            2 :   JsonArray *array = NULL;
     254              :   JsonObject *node_object;
     255            2 :   ml_service_node_info_s *node_info = NULL;
     256            2 :   ml_training_services_s *training_s = NULL;
     257              : 
     258            4 :   g_return_val_if_fail (node != NULL, ML_ERROR_INVALID_PARAMETER);
     259              : 
     260            2 :   ret = _training_offloading_get_priv (mls, &training_s);
     261            2 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     262              : 
     263            2 :   if (JSON_NODE_HOLDS_ARRAY (node)) {
     264            2 :     array = json_node_get_array (node);
     265            2 :     array_len = json_array_get_length (array);
     266              :   }
     267              : 
     268            4 :   for (i = 0; i < array_len; i++) {
     269            2 :     if (array)
     270            2 :       node_object = json_array_get_object_element (array, i);
     271              :     else
     272            0 :       node_object = json_node_get_object (node);
     273              : 
     274            2 :     if (!json_object_has_member (node_object, "name")) {
     275            0 :       _ml_error_report_return (ret,
     276              :           "Failed to parse configuration file, cannot get the name for pipeline node.");
     277              :     }
     278              : 
     279            2 :     name = json_object_get_string_member (node_object, "name");
     280              : 
     281            2 :     node_info = _training_offloading_node_info_new (mls, name, type);
     282            2 :     if (!node_info) {
     283            0 :       _ml_error_report_return_continue (ML_ERROR_INVALID_PARAMETER,
     284              :           "Failed to parse configuration file, cannot add new node information.");
     285              :     }
     286              : 
     287            2 :     switch (type) {
     288            1 :       case ML_SERVICE_NODE_TYPE_TRAINING:
     289            2 :         ret = ml_pipeline_element_get_handle (training_s->pipeline_h, name,
     290            1 :             &node_info->handle);
     291            1 :         break;
     292            1 :       case ML_SERVICE_NODE_TYPE_OUTPUT:
     293            2 :         ret = ml_pipeline_sink_register (training_s->pipeline_h, name,
     294            1 :             _ml_service_pipeline_sink_cb, node_info, &node_info->handle);
     295            1 :         break;
     296            0 :       default:
     297            0 :         ret = ML_ERROR_INVALID_PARAMETER;
     298            0 :         break;
     299              :     }
     300              : 
     301            2 :     if (ret != ML_ERROR_NONE) {
     302            0 :       _ml_error_report_return (ret,
     303              :           "Failed to parse configuration file, cannot get the handle for pipeline node.");
     304              :     }
     305              :   }
     306              : 
     307            2 :   return ret;
     308              : }
     309              : 
     310              : /**
     311              :  * @brief Internal function to parse the pipeline in the configuration file.
     312              :  */
     313              : static int
     314            1 : _training_offloading_conf_parse_pipeline (ml_service_s * mls, JsonObject * pipe)
     315              : {
     316            1 :   int ret = ML_ERROR_NONE;
     317              :   JsonNode *node;
     318              : 
     319            1 :   g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
     320            1 :   g_return_val_if_fail (pipe != NULL, ML_ERROR_INVALID_PARAMETER);
     321              : 
     322            1 :   if (json_object_has_member (pipe, "output_node")) {
     323            1 :     node = json_object_get_member (pipe, "output_node");
     324            1 :     ret = _training_offloading_conf_parse_pipeline_node (mls, node,
     325              :         ML_SERVICE_NODE_TYPE_OUTPUT);
     326            1 :     if (ret != ML_ERROR_NONE) {
     327            0 :       _ml_error_report_return (ret,
     328              :           "Failed to parse configuration file, cannot get the input node.");
     329              :     }
     330              :   }
     331              : 
     332            1 :   if (json_object_has_member (pipe, "training_node")) {
     333            1 :     node = json_object_get_member (pipe, "training_node");
     334            1 :     ret = _training_offloading_conf_parse_pipeline_node (mls, node,
     335              :         ML_SERVICE_NODE_TYPE_TRAINING);
     336            1 :     if (ret != ML_ERROR_NONE) {
     337            0 :       _ml_error_report_return (ret,
     338              :           "Failed to parse configuration file, cannot get the training node.");
     339              :     }
     340              :   }
     341              : 
     342            1 :   return ret;
     343              : }
     344              : 
     345              : /**
     346              :  * @brief Internal function to create ml-service training offloading handle.
     347              :  */
     348              : static int
     349            7 : _training_offloading_create (ml_service_s * mls)
     350              : {
     351            7 :   ml_training_services_s *training_s = NULL;
     352              : 
     353            7 :   g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
     354              : 
     355            7 :   training_s = g_try_new0 (ml_training_services_s, 1);
     356            7 :   if (training_s == NULL) {
     357            0 :     _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
     358              :         "Failed to allocate memory for the service handle's private data. Out of memory?");
     359              :   }
     360              : 
     361            7 :   g_cond_init (&training_s->received_cond);
     362            7 :   g_mutex_init (&training_s->received_lock);
     363              : 
     364            7 :   training_s->type = ML_TRAINING_OFFLOADING_TYPE_UNKNOWN;
     365            7 :   training_s->time_limit = DEFAULT_TIME_LIMIT;
     366              : 
     367            7 :   _ml_service_offloading_set_mode (mls,
     368              :       ML_SERVICE_OFFLOADING_MODE_TRAINING, training_s);
     369              : 
     370            7 :   training_s->transfer_data_table =
     371            7 :       g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
     372            7 :   if (!training_s->transfer_data_table) {
     373            0 :     _ml_service_training_offloading_destroy (mls);
     374            0 :     _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
     375              :         "Failed to allocate memory for the data table. Out of memory?");
     376              :   }
     377              : 
     378            7 :   training_s->node_table =
     379            7 :       g_hash_table_new_full (g_str_hash, g_str_equal, g_free,
     380              :       _training_offloading_node_info_free);
     381            7 :   if (!training_s->node_table) {
     382            0 :     _ml_service_training_offloading_destroy (mls);
     383            0 :     _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
     384              :         "Failed to allocate memory for the node table. Out of memory?");
     385              :   }
     386              : 
     387            7 :   return ML_ERROR_NONE;
     388              : }
     389              : 
     390              : /**
     391              :  * @brief Internal function to create ml-service training offloading handle.
     392              :  */
     393              : int
     394            9 : _ml_service_training_offloading_create (ml_service_s * mls,
     395              :     JsonObject * offloading)
     396              : {
     397            9 :   int ret = ML_ERROR_NONE;
     398              : 
     399            9 :   g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
     400            8 :   g_return_val_if_fail (offloading != NULL, ML_ERROR_INVALID_PARAMETER);
     401              : 
     402            7 :   ret = _training_offloading_create (mls);
     403            7 :   if (ret != ML_ERROR_NONE) {
     404            0 :     _ml_error_report_return_continue (ret,
     405              :         "Failed to create ml-service for training offloading.");
     406              :   }
     407              : 
     408            7 :   ret = _training_offloading_conf_parse_json (mls, offloading);
     409            7 :   if (ret != ML_ERROR_NONE) {
     410            0 :     _ml_service_training_offloading_destroy (mls);
     411            0 :     _ml_error_report_return (ret,
     412              :         "Failed to parse the configuration file for training offloading.");
     413              :   }
     414              : 
     415            7 :   return ML_ERROR_NONE;
     416              : }
     417              : 
     418              : /**
     419              :  * @brief Request service to ml-service-offloading.
     420              :  */
     421              : static int
     422            4 : _request_offloading_service (ml_service_s * mls,
     423              :     const gchar * service_name, void *data, size_t len)
     424              : {
     425            4 :   int ret = ML_ERROR_NONE;
     426              : 
     427            4 :   g_return_val_if_fail (mls != NULL, ML_ERROR_INVALID_PARAMETER);
     428            4 :   g_return_val_if_fail (service_name != NULL, ML_ERROR_INVALID_PARAMETER);
     429            4 :   g_return_val_if_fail (data != NULL, ML_ERROR_INVALID_PARAMETER);
     430            4 :   g_return_val_if_fail (len > 0, ML_ERROR_INVALID_PARAMETER);
     431              : 
     432            4 :   ret = _ml_service_offloading_request_raw (mls, service_name, data, len);
     433            4 :   if (ret != ML_ERROR_NONE) {
     434            0 :     _ml_error_report ("Failed to request service '%s'.)", service_name);
     435              :   }
     436              : 
     437            4 :   return ret;
     438              : }
     439              : 
     440              : /**
     441              :  * @brief Request all services to ml-service offloading.
     442              :  */
     443              : static int
     444            1 : _training_offloading_services_request (ml_service_s * mls)
     445              : {
     446            1 :   ml_training_services_s *training_s = NULL;
     447            1 :   int ret = ML_ERROR_NONE;
     448              :   GList *list, *iter;
     449            1 :   gchar *transfer_data = NULL, *service_name = NULL;
     450            1 :   gchar *contents = NULL, *pipeline = NULL;
     451            1 :   gsize len = 0;
     452              : 
     453            1 :   ret = _training_offloading_get_priv (mls, &training_s);
     454            2 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     455              : 
     456            1 :   _ml_logd ("path set by app:%s ", training_s->path);
     457              : 
     458            1 :   list = g_hash_table_get_keys (training_s->transfer_data_table);
     459            1 :   if (!list) {
     460            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     461              :         "Failed to get transfer data table");
     462              :   }
     463              : 
     464            4 :   for (iter = list; iter != NULL; iter = g_list_next (iter)) {
     465            3 :     const gchar *name = iter->data;
     466              : 
     467              :     transfer_data =
     468            3 :         g_strdup (g_hash_table_lookup (training_s->transfer_data_table, name));
     469              : 
     470            3 :     if (g_strstr_len (transfer_data, -1, APP_RW_PATH)) {
     471            4 :       transfer_data = _ml_replace_string (transfer_data, APP_RW_PATH,
     472            2 :           training_s->path, NULL, NULL);
     473              : 
     474            2 :       _ml_logd ("transfer_data:%s", transfer_data);
     475              : 
     476            2 :       if (!g_file_get_contents (transfer_data, &contents, &len, NULL)) {
     477            0 :         _ml_error_report ("Failed to read file:%s", transfer_data);
     478            0 :         goto error;
     479              :       }
     480              : 
     481            2 :       ret = _request_offloading_service (mls, name, contents, len);
     482            2 :       if (ret != ML_ERROR_NONE) {
     483            0 :         _ml_error_report ("Failed to request service '%s'.", name);
     484            0 :         goto error;
     485              :       }
     486              : 
     487            2 :       g_free (transfer_data);
     488            2 :       g_free (contents);
     489            2 :       transfer_data = NULL;
     490            2 :       contents = NULL;
     491            1 :     } else if (g_strstr_len (transfer_data, -1, "pipeline")) {
     492            2 :       service_name = g_strdup (iter->data);
     493            1 :       pipeline = g_strdup (transfer_data);
     494            1 :       transfer_data = NULL;
     495              :     }
     496              :   }
     497              : 
     498            1 :   if (pipeline) {
     499              :     /**
     500              :      * The remote sender sends the last in the pipeline.
     501              :      * When the pipeline arrives, the remote receiver determines that the sender has sent all the necessary files specified in the pipeline.
     502              :      * pipeline description must be sent last.
     503              :      */
     504            1 :     _ml_logd
     505              :         ("In case of pipeline, @REMOTE_APP_RW_PATH@ will be replaced at the remote receiver.\n transfer_data:pipeline(%s),",
     506              :         pipeline);
     507            2 :     ret = _request_offloading_service (mls, service_name, pipeline,
     508            1 :         strlen (pipeline) + 1);
     509            1 :     if (ret != ML_ERROR_NONE) {
     510            0 :       _ml_error_report ("Failed to request service(%s)", service_name);
     511              :     }
     512              :   }
     513              : 
     514            1 : error:
     515            1 :   g_free (service_name);
     516            1 :   g_free (transfer_data);
     517            1 :   g_free (contents);
     518            1 :   g_list_free (list);
     519              : 
     520            1 :   return ret;
     521              : }
     522              : 
     523              : /**
     524              :  * @brief Thread for checking receive data.
     525              :  */
     526              : static gpointer
     527            2 : _check_received_data_thread (gpointer data)
     528              : {
     529            2 :   ml_training_services_s *training_s = (ml_training_services_s *) data;
     530              :   gint usec;
     531              : 
     532            2 :   g_return_val_if_fail (training_s != NULL, NULL);
     533              : 
     534            2 :   usec = training_s->time_limit * 1000000;
     535          102 :   while (usec > 0) {
     536          101 :     g_usleep (100000);
     537          101 :     if (training_s->receiver_pipe_json_str != NULL) {
     538            1 :       _ml_logd
     539              :           ("Lock to receive pipeline JSON string required for model training.");
     540            1 :       g_mutex_lock (&training_s->received_lock);
     541            1 :       training_s->is_received = TRUE;
     542            1 :       _ml_logd ("receive_pipe:%s", training_s->receiver_pipe_json_str);
     543            1 :       _ml_logd
     544              :           ("Now pipeline has arrived, The remote sender send the pipeline last, so probably received all the data."
     545              :           "If there are no files required for the pipeline, a runtime error occurs.");
     546            1 :       g_cond_signal (&training_s->received_cond);
     547            1 :       g_mutex_unlock (&training_s->received_lock);
     548            1 :       return NULL;
     549              :     }
     550          100 :     usec -= 100000;
     551              :   }
     552              : 
     553            1 :   _ml_loge ("Required data is null, receive_pipe:%s",
     554              :       training_s->receiver_pipe_json_str);
     555            1 :   g_mutex_lock (&training_s->received_lock);
     556            1 :   training_s->is_received = FALSE;
     557            1 :   g_cond_signal (&training_s->received_cond);
     558            1 :   g_mutex_unlock (&training_s->received_lock);
     559            1 :   return NULL;
     560              : }
     561              : 
     562              : /**
     563              :  * @brief Check if all necessary data is received.
     564              :  */
     565              : static gboolean
     566            2 : _training_offloading_check_received_data (ml_training_services_s * training_s)
     567              : {
     568            2 :   gboolean is_received = FALSE;
     569              : 
     570            2 :   g_return_val_if_fail (training_s != NULL, FALSE);
     571              : 
     572            2 :   training_s->received_thread = g_thread_new ("check_received_file",
     573              :       _check_received_data_thread, training_s);
     574              : 
     575            2 :   g_mutex_lock (&training_s->received_lock);
     576              : 
     577            3 :   while (!training_s->is_received) {
     578            2 :     _ml_logd ("Wait to receive all data needed for model training.");
     579            2 :     g_cond_wait (&training_s->received_cond, &training_s->received_lock);
     580            2 :     if (training_s->is_received == FALSE)
     581            1 :       break;
     582              :   }
     583              : 
     584            2 :   is_received = training_s->is_received;
     585            2 :   g_mutex_unlock (&training_s->received_lock);
     586            2 :   _ml_logd ("unlock, receive all data");
     587              : 
     588            2 :   return is_received;
     589              : }
     590              : 
     591              : /**
     592              :  * @brief replace path.
     593              :  */
     594              : static void
     595            2 : _training_offloading_replace_pipeline_data_path (ml_service_s * mls)
     596              : {
     597            2 :   ml_training_services_s *training_s = NULL;
     598              :   int ret;
     599              : 
     600            2 :   ret = _training_offloading_get_priv (mls, &training_s);
     601            2 :   g_return_if_fail (ret == ML_ERROR_NONE);
     602              : 
     603            2 :   if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_SENDER) {
     604            1 :     if (training_s->sender_pipe) {
     605            2 :       training_s->sender_pipe =
     606            2 :           _ml_replace_string (training_s->sender_pipe, APP_RW_PATH,
     607            1 :           training_s->path, NULL, NULL);
     608            1 :       _ml_logd ("@APP_RW_PATH@ is replaced, sender_pipe:%s",
     609              :           training_s->sender_pipe);
     610              :     }
     611              :   } else {
     612            1 :     if (training_s->receiver_pipe_json_str) {
     613            2 :       training_s->trained_model_path =
     614            2 :           _ml_replace_string (training_s->trained_model_path, APP_RW_PATH,
     615            1 :           training_s->path, NULL, NULL);
     616            2 :       training_s->receiver_pipe_json_str =
     617            2 :           _ml_replace_string (training_s->receiver_pipe_json_str,
     618            1 :           REMOTE_APP_RW_PATH, training_s->path, NULL, NULL);
     619            2 :       training_s->receiver_pipe_json_str =
     620            2 :           _ml_replace_string (training_s->receiver_pipe_json_str,
     621            1 :           TRAINED_MODEL_FILE, training_s->trained_model_path, NULL, NULL);
     622            1 :       _ml_logd
     623              :           ("@REMOTE_APP_RW_PATH@ and @TRAINED_MODEL_FILE@ are replaced, receiver_pipe JSON string: %s",
     624              :           training_s->receiver_pipe_json_str);
     625              :     }
     626              :   }
     627              : }
     628              : 
     629              : /**
     630              :  * @brief Set path in ml-service training offloading handle.
     631              :  */
     632              : int
     633            6 : _ml_service_training_offloading_set_path (ml_service_s * mls,
     634              :     const gchar * path)
     635              : {
     636            6 :   int ret = ML_ERROR_NONE;
     637            6 :   ml_training_services_s *training_s = NULL;
     638              : 
     639           11 :   g_return_val_if_fail (path != NULL, ML_ERROR_INVALID_PARAMETER);
     640              : 
     641            5 :   ret = _training_offloading_get_priv (mls, &training_s);
     642            5 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     643              : 
     644            5 :   g_free (training_s->path);
     645            5 :   training_s->path = g_strdup (path);
     646              : 
     647            5 :   return ret;
     648              : }
     649              : 
     650              : /**
     651              :  * @brief Prepare ml training offloading service as sender.
     652              :  */
     653              : static int
     654            1 : _ml_service_training_offloading_prepare_sender (ml_service_s * mls,
     655              :     ml_training_services_s * training_s)
     656              : {
     657            1 :   int ret = ML_ERROR_NONE;
     658              : 
     659            1 :   ret = _training_offloading_services_request (mls);
     660            1 :   if (ret != ML_ERROR_NONE) {
     661            0 :     _ml_error_report_return (ret, "Failed to request service.");
     662              :   }
     663            1 :   _training_offloading_replace_pipeline_data_path (mls);
     664              : 
     665            1 :   ret = ml_pipeline_construct (training_s->sender_pipe, NULL, NULL,
     666              :       &training_s->pipeline_h);
     667            1 :   if (ML_ERROR_NONE != ret) {
     668            0 :     _ml_error_report_return (ret, "Failed to construct pipeline.");
     669              :   }
     670              : 
     671            1 :   return ret;
     672              : }
     673              : 
     674              : /**
     675              :  * @brief Prepare ml training offloading service as receiver.
     676              :  */
     677              : static int
     678            2 : _ml_service_training_offloading_prepare_receiver (ml_service_s * mls,
     679              :     ml_training_services_s * training_s)
     680              : {
     681            2 :   int ret = ML_ERROR_NONE;
     682            2 :   g_autoptr (JsonNode) pipeline_node = NULL;
     683              :   JsonObject *pipeline_obj;
     684              :   JsonObject *pipe;
     685              : 
     686              :   /* checking if all required files are received */
     687            2 :   if (!_training_offloading_check_received_data (training_s)) {
     688            1 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     689              :         "Failed to receive the required data");
     690              :   }
     691            1 :   _training_offloading_replace_pipeline_data_path (mls);
     692              : 
     693            1 :   pipeline_node = json_from_string (training_s->receiver_pipe_json_str, NULL);
     694            1 :   if (!pipeline_node) {
     695            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     696              :         "Failed to parse the json string, %s.",
     697              :         training_s->receiver_pipe_json_str);
     698              :   }
     699              : 
     700            1 :   pipeline_obj = json_node_get_object (pipeline_node);
     701            1 :   if (!pipeline_obj) {
     702            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     703              :         "Failed to get the json object from the json node.");
     704              :   }
     705            1 :   if (!json_object_has_member (pipeline_obj, "pipeline")) {
     706            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     707              :         "Failed to parse configuration file, cannot get the pipeline JSON object.");
     708              :   }
     709              : 
     710            1 :   pipe = json_object_get_object_member (pipeline_obj, "pipeline");
     711            1 :   if (json_object_has_member (pipe, "description")) {
     712            1 :     training_s->receiver_pipe =
     713            2 :         g_strdup (_ml_service_get_json_string_member (pipe, "description"));
     714              :   } else {
     715            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     716              :         "Failed to parse configuration file, cannot get the pipeline description.");
     717              :   }
     718              : 
     719            1 :   ret = ml_pipeline_construct (training_s->receiver_pipe, NULL, NULL,
     720              :       &training_s->pipeline_h);
     721            1 :   if (ML_ERROR_NONE != ret) {
     722            0 :     _ml_error_report_return (ret, "Failed to construct pipeline.");
     723              :   }
     724              : 
     725            1 :   ret = _training_offloading_conf_parse_pipeline (mls, pipe);
     726            1 :   if (ret != ML_ERROR_NONE) {
     727            0 :     return ret;
     728              :   }
     729              : 
     730            1 :   return ret;
     731              : }
     732              : 
     733              : /**
     734              :  * @brief Start ml training offloading service.
     735              :  */
     736              : int
     737            4 : _ml_service_training_offloading_start (ml_service_s * mls)
     738              : {
     739            4 :   int ret = ML_ERROR_NONE;
     740            4 :   ml_training_services_s *training_s = NULL;
     741              : 
     742            4 :   ret = _training_offloading_get_priv (mls, &training_s);
     743            7 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     744              : 
     745            3 :   if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_SENDER) {
     746            1 :     ret = _ml_service_training_offloading_prepare_sender (mls, training_s);
     747            2 :   } else if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER) {
     748            2 :     ret = _ml_service_training_offloading_prepare_receiver (mls, training_s);
     749              :   } else {
     750            0 :     _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
     751              :         "The node type information in JSON is incorrect.");
     752              :   }
     753              : 
     754            3 :   if (ret != ML_ERROR_NONE)
     755            1 :     return ret;
     756              : 
     757            2 :   ret = ml_pipeline_start (training_s->pipeline_h);
     758            2 :   if (ret != ML_ERROR_NONE) {
     759            0 :     _ml_error_report_return (ret, "Failed to start ml pipeline.");
     760              :   }
     761              : 
     762            2 :   return ret;
     763              : }
     764              : 
     765              : /**
     766              :  * @brief Stop ml training offloading service.
     767              :  */
     768              : int
     769            3 : _ml_service_training_offloading_stop (ml_service_s * mls)
     770              : {
     771            3 :   int ret = ML_ERROR_NONE;
     772            3 :   ml_training_services_s *training_s = NULL;
     773              : 
     774            3 :   ret = _training_offloading_get_priv (mls, &training_s);
     775            6 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     776              : 
     777            3 :   if (!training_s->pipeline_h) {
     778            1 :     _ml_error_report_return (ML_ERROR_STREAMS_PIPE,
     779              :         "Pipeline is not constructed.");
     780              :   }
     781              : 
     782            2 :   ret = ml_pipeline_stop (training_s->pipeline_h);
     783            2 :   if (ML_ERROR_NONE != ret) {
     784            0 :     _ml_error_report_return (ret, "Failed to stop pipeline.");
     785              :   }
     786              : 
     787            2 :   return ret;
     788              : }
     789              : 
     790              : /**
     791              :  * @brief Save receiver pipeline description.
     792              :  */
     793              : int
     794            4 : _ml_service_training_offloading_process_received_data (ml_service_s * mls,
     795              :     void *data_h, const gchar * dir_path, const gchar * data, int service_type)
     796              : {
     797            4 :   g_autofree gchar *name = NULL;
     798            4 :   ml_training_services_s *training_s = NULL;
     799              :   int ret;
     800              : 
     801            4 :   g_return_val_if_fail (data_h != NULL, ML_ERROR_INVALID_PARAMETER);
     802            4 :   g_return_val_if_fail (dir_path != NULL, ML_ERROR_INVALID_PARAMETER);
     803            4 :   g_return_val_if_fail (data != NULL, ML_ERROR_INVALID_PARAMETER);
     804              : 
     805            4 :   ret = _training_offloading_get_priv (mls, &training_s);
     806            4 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     807              : 
     808            4 :   _ml_logd ("Received data, service_type:%d", service_type);
     809              : 
     810            4 :   if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER) {
     811            3 :     if (service_type == ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW) {
     812            1 :       training_s->receiver_pipe_json_str = g_strdup (data);
     813            1 :       _ml_logd ("Received JSON string pipeline:%s",
     814              :           training_s->receiver_pipe_json_str);
     815              :     }
     816              :   } else {
     817              :     /* receive trained model from remote */
     818            1 :     if (service_type == ML_SERVICE_OFFLOADING_TYPE_REPLY) {
     819            1 :       ret = nns_edge_data_get_info (data_h, "name", &name);
     820            1 :       if (NNS_EDGE_ERROR_NONE != ret) {
     821            0 :         _ml_error_report_return (ret,
     822              :             "Failed to get name while processing the ml-offloading service.");
     823              :       }
     824            2 :       training_s->trained_model_path =
     825            1 :           g_build_path (G_DIR_SEPARATOR_S, dir_path, name, NULL);
     826            1 :       _ml_logd ("Reply: name:%s, received trained_model:%s", name,
     827              :           training_s->trained_model_path);
     828              :     }
     829              :   }
     830              : 
     831            4 :   return ML_ERROR_NONE;
     832              : }
     833              : 
     834              : /**
     835              :  * @brief Send trained model
     836              :  */
     837              : static void
     838            5 : _training_offloading_send_trained_model (ml_service_s * mls)
     839              : {
     840            5 :   ml_training_services_s *training_s = NULL;
     841              :   GList *list, *iter;
     842            5 :   gchar *contents = NULL;
     843            5 :   gsize len = 0;
     844              :   int ret;
     845              : 
     846            5 :   ret = _training_offloading_get_priv (mls, &training_s);
     847           10 :   g_return_if_fail (ret == ML_ERROR_NONE);
     848              : 
     849            5 :   if (training_s->trained_model_path == NULL)
     850            0 :     return;
     851              : 
     852            5 :   if (!g_file_get_contents (training_s->trained_model_path, &contents, &len,
     853              :           NULL)) {
     854            4 :     _ml_error_report ("Failed to read file:%s", training_s->trained_model_path);
     855            4 :     return;
     856              :   }
     857              : 
     858            1 :   list = g_hash_table_get_keys (training_s->transfer_data_table);
     859              : 
     860            1 :   if (list) {
     861            1 :     _ml_logd ("Send trained model");
     862            2 :     for (iter = list; iter != NULL; iter = g_list_next (iter)) {
     863            1 :       _request_offloading_service (mls, iter->data, contents, len);
     864              :     }
     865              : 
     866            1 :     g_list_free (list);
     867              :   } else {
     868            0 :     _ml_error_report ("Failed to get transfer data table.");
     869              :   }
     870              : 
     871            1 :   g_free (contents);
     872            1 :   return;
     873              : }
     874              : 
     875              : /**
     876              :  * @brief Internal function to destroy ml-service training offloading data.
     877              :  */
     878              : int
     879            7 : _ml_service_training_offloading_destroy (ml_service_s * mls)
     880              : {
     881            7 :   int ret = ML_ERROR_NONE;
     882            7 :   ml_training_services_s *training_s = NULL;
     883              : 
     884            7 :   ret = _training_offloading_get_priv (mls, &training_s);
     885           13 :   g_return_val_if_fail (ret == ML_ERROR_NONE, ret);
     886              : 
     887            6 :   if (training_s->type == ML_TRAINING_OFFLOADING_TYPE_RECEIVER) {
     888              :     /* reply to remote sender */
     889            5 :     _training_offloading_send_trained_model (mls);
     890              :   }
     891              : 
     892            6 :   g_cond_clear (&training_s->received_cond);
     893            6 :   g_mutex_clear (&training_s->received_lock);
     894              : 
     895            6 :   if (training_s->received_thread) {
     896            2 :     g_thread_join (training_s->received_thread);
     897            2 :     training_s->received_thread = NULL;
     898              :   }
     899              : 
     900            6 :   if (training_s->transfer_data_table) {
     901            6 :     g_hash_table_destroy (training_s->transfer_data_table);
     902            6 :     training_s->transfer_data_table = NULL;
     903              :   }
     904              : 
     905            6 :   if (training_s->node_table) {
     906            6 :     g_hash_table_destroy (training_s->node_table);
     907            6 :     training_s->node_table = NULL;
     908              :   }
     909              : 
     910            6 :   if (training_s->pipeline_h) {
     911            2 :     ret = ml_pipeline_destroy (training_s->pipeline_h);
     912            2 :     if (ret != ML_ERROR_NONE) {
     913            0 :       _ml_error_report ("Failed to destroy ml pipeline, clear handle anyway.");
     914              :     }
     915              : 
     916            2 :     training_s->pipeline_h = NULL;
     917              :   }
     918              : 
     919            6 :   g_free (training_s->path);
     920            6 :   training_s->path = NULL;
     921              : 
     922            6 :   g_free (training_s->trained_model_path);
     923            6 :   training_s->trained_model_path = NULL;
     924              : 
     925            6 :   g_free (training_s->receiver_pipe_json_str);
     926            6 :   training_s->receiver_pipe_json_str = NULL;
     927              : 
     928            6 :   g_free (training_s->receiver_pipe);
     929            6 :   training_s->receiver_pipe = NULL;
     930              : 
     931            6 :   g_free (training_s->sender_pipe);
     932            6 :   training_s->sender_pipe = NULL;
     933              : 
     934            6 :   g_free (training_s);
     935              : 
     936            6 :   _ml_service_offloading_set_mode (mls, ML_SERVICE_OFFLOADING_MODE_NONE, NULL);
     937            6 :   return ret;
     938              : }
        

Generated by: LCOV version 2.0-1