diff --git a/pkg/globalmanager/controllers/incrementallearning/downstream.go b/pkg/globalmanager/controllers/incrementallearning/downstream.go index 85d6e937..84be0528 100644 --- a/pkg/globalmanager/controllers/incrementallearning/downstream.go +++ b/pkg/globalmanager/controllers/incrementallearning/downstream.go @@ -122,25 +122,25 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro runtime.InjectSecretAnnotations(c.kubeClient, job, job.Spec.CredentialName) + // isJobResidentNode checks whether nodeName is a job resident node + isJobResidentNode := func(nodeName string) bool { + // the node where LC monitors dataset and the node where inference worker is running are job resident node + if nodeName == dsNodeName || nodeName == deployNodeName { + return true + } + return false + } + doJobStageEvent := func(modelName string, nodeName string) { if currentType == sednav1.ILJobStageCondWaiting { - if jobStage != sednav1.ILJobDeploy { - syncJobWithNodeName(dsNodeName) + syncJobWithNodeName(dsNodeName) + if modelName != "" { syncModelWithName(modelName) } } else if currentType == sednav1.ILJobStageCondRunning { - if nodeName != "" { - syncJobWithNodeName(nodeName) - } - - if jobStage == sednav1.ILJobDeploy { - if evalNodeName != dsNodeName { - // delete LC's job from eval node that's different from dataset node when deploy worker's status is ready. - c.sendToEdgeFunc(evalNodeName, watch.Deleted, job) - } - } + syncJobWithNodeName(nodeName) } else if currentType == sednav1.ILJobStageCondCompleted || currentType == sednav1.ILJobStageCondFailed { - if nodeName != dsNodeName { + if !isJobResidentNode(nodeName) { // delete LC's job from nodeName that's different from dataset node when worker's status is completed or failed. c.sendToEdgeFunc(nodeName, watch.Deleted, job) } diff --git a/pkg/localcontroller/gmclient/types.go b/pkg/localcontroller/gmclient/types.go index 59e764db..876d9698 100644 --- a/pkg/localcontroller/gmclient/types.go +++ b/pkg/localcontroller/gmclient/types.go @@ -16,7 +16,10 @@ limitations under the License. package gmclient -import messagetypes "github.com/kubeedge/sedna/pkg/globalmanager/messagelayer/model" +import ( + messagetypes "github.com/kubeedge/sedna/pkg/globalmanager/messagelayer/model" + "github.com/kubeedge/sedna/pkg/globalmanager/runtime" +) const ( // InsertOperation is the insert value @@ -27,6 +30,8 @@ const ( StatusOperation = "status" ) +type Model = runtime.Model + // Message defines message between LC and GM type Message struct { Header MessageHeader `json:"header"` @@ -44,12 +49,6 @@ type UpstreamMessage struct { Output *Output `json:"output"` } -type Model struct { - Format string `json:"format"` - URL string `json:"url"` - Metrics map[string]interface{} `json:"metrics,omitempty"` -} - type Input struct { Models []Model `json:"models,omitempty"` DataURL string `json:"dataURL,omitempty"` diff --git a/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go index 877010e1..d9c62e8d 100644 --- a/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go +++ b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go @@ -54,27 +54,27 @@ type Job struct { // JobConfig defines config for incremental-learning-job type JobConfig struct { - UniqueIdentifier string - Rounds int - TrainTrigger trigger.Base - DeployTrigger trigger.Base - TriggerTime time.Time - TrainTriggerStatus string - EvalTriggerStatus string - DeployTriggerStatus string - TrainDataURL string - EvalDataURL string - OutputDir string - OutputConfig *OutputConfig - DataSamples *DataSamples - TrainModel *Model - DeployModel *Model - EvalModels []Model - EvalResult []Model - Lock sync.Mutex - Dataset *dataset.Dataset - Storage storage.Storage - Done chan struct{} + UniqueIdentifier string + Rounds int + TrainTrigger trigger.Base + DeployTrigger trigger.Base + TriggerTime time.Time + TrainTriggerStatus string + EvalTriggerStatus string + DeployTriggerStatus string + HotModelUpdateDeployTriggerStatus string + TrainDataURL string + EvalDataURL string + OutputDir string + OutputConfig *OutputConfig + DataSamples *DataSamples + TrainModel *Model + DeployModel *Model + EvalModels []Model + Lock sync.Mutex + Dataset *dataset.Dataset + Storage storage.Storage + Done chan struct{} } type Model = clienttypes.Model @@ -261,18 +261,15 @@ func (im *Manager) evalTask(job *Job) error { // hotModelUpdateDeployTask starts deploy task when job supports hot model update func (im *Manager) hotModelUpdateDeployTask(job *Job) error { - var localModelConfigFile string - if v, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok { - localModelConfigFile = v - } else { - return nil - } - - latestCond := im.getLatestCondition(job) - currentType := latestCond.Type + if job.JobConfig.HotModelUpdateDeployTriggerStatus == TriggerReadyStatus { + var localModelConfigFile string + if v, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok { + localModelConfigFile = v + } else { + return nil + } - if currentType == sednav1.ILJobStageCondRunning && job.JobConfig.DeployTriggerStatus == TriggerReadyStatus { - models := im.getModelFromJobConditions(job, sednav1.ILJobDeploy) + models := im.getJobStageModel(job, sednav1.ILJobDeploy) if models == nil { return nil } @@ -316,60 +313,66 @@ func (im *Manager) hotModelUpdateDeployTask(job *Job) error { return err } - job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus + job.JobConfig.HotModelUpdateDeployTriggerStatus = TriggerCompletedStatus klog.V(4).Infof("job(%s) write model config file(url=%s) successfully in deploy phase", job.JobConfig.UniqueIdentifier, modelConfigFile) + klog.Infof("job(%s) completed the %s task successfully", job.JobConfig.UniqueIdentifier, sednav1.ILJobDeploy) } return nil } // deployTask starts deploy task -func (im *Manager) deployTask(job *Job) { - jobConfig := job.JobConfig - var err error - var neededDeploy bool - - neededDeploy, err = im.triggerDeployTask(job) - status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)} - models := im.getModelFromJobConditions(job, sednav1.ILJobDeploy) +func (im *Manager) deployTask(job *Job) error { + if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus { + jobConfig := job.JobConfig + var err error + var neededDeploy bool + + neededDeploy, err = im.triggerDeployTask(job) + status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)} + models := im.getJobStageModel(job, sednav1.ILJobDeploy) + + if err == nil && neededDeploy && models != nil { + if !job.Spec.DeploySpec.Model.HotUpdateEnabled { + trainedModel := models[0] + deployModel := models[1] + err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL) + if err != nil { + status.Status = string(sednav1.ILJobStageCondFailed) + klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err) + return err + } - if err == nil && neededDeploy && models != nil { - if !job.Spec.DeploySpec.Model.HotUpdateEnabled { - trainedModel := models[0] - deployModel := models[1] - err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL) - if err != nil { - status.Status = string(sednav1.ILJobStageCondFailed) - klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err) + status.Status = string(sednav1.ILJobStageCondReady) + klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) } else { status.Status = string(sednav1.ILJobStageCondReady) - klog.Infof("update model for job(%s) successfully", jobConfig.UniqueIdentifier) } + + status.Input = &clienttypes.Input{ + Models: models, + } + + klog.Infof("job(%s) completed the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) } else { - status.Status = string(sednav1.ILJobStageCondReady) + // No need to deploy, just report completed status + // TODO: instead of reporting deploy-completed, another more reasonable status + klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier) + status.Status = string(sednav1.ILJobStageCondCompleted) } - status.Input = &clienttypes.Input{ - Models: models, + err = im.Client.WriteMessage(status, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) completed the %s task failed: %v", + jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err) + return err } - klog.Infof("job(%s) completed the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) - } else { - // No need to deploy, just report completed status - // TODO: instead of reporting deploy-completed, another more reasonable status - klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier) - status.Status = string(sednav1.ILJobStageCondCompleted) - } - - err = im.Client.WriteMessage(status, job.getHeader()) - if err != nil { - klog.Errorf("job(%s) completed the %s task failed: %v", - jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err) + job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus } - - klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) + return nil } // startJob starts a job @@ -406,7 +409,11 @@ func (im *Manager) startJob(name string) { case sednav1.ILJobEval: err = im.evalTask(job) case sednav1.ILJobDeploy: - err = im.hotModelUpdateDeployTask(job) + if cond.Type == sednav1.ILJobStageCondWaiting { + err = im.deployTask(job) + } else if cond.Type == sednav1.ILJobStageCondRunning && job.Spec.DeploySpec.Model.HotUpdateEnabled { + err = im.hotModelUpdateDeployTask(job) + } default: klog.Errorf("invalid phase: %s", jobStage) continue @@ -436,14 +443,14 @@ func (im *Manager) Insert(message *clienttypes.Message) error { return err } - if first { - go im.startJob(name) - } - if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil { return err } + if first { + go im.startJob(name) + } + return nil } @@ -622,6 +629,7 @@ func initTriggerStatus(jobConfig *JobConfig) { jobConfig.TrainTriggerStatus = TriggerReadyStatus jobConfig.EvalTriggerStatus = TriggerReadyStatus jobConfig.DeployTriggerStatus = TriggerReadyStatus + jobConfig.HotModelUpdateDeployTriggerStatus = TriggerReadyStatus } func newTrigger(t sednav1.Trigger) (trigger.Base, error) { @@ -639,49 +647,73 @@ func newTrigger(t sednav1.Trigger) (trigger.Base, error) { return trigger.NewTrigger(triggerMap) } -// getModelFromJobConditions gets model from job conditions for train/eval/deploy -func (im *Manager) getModelFromJobConditions(job *Job, jobStage sednav1.ILJobStage) []Model { - jobConditions := job.Status.Conditions +// getModelsFromJobConditions gets models from job condition +func (im *Manager) getModelsFromJobConditions(jobConditions []sednav1.ILJobCondition, stage sednav1.ILJobStage, currentType sednav1.ILJobStageConditionType, dataType string) []Model { + // TODO: runtime.type changes to common.type for gm and lc + for i := len(jobConditions) - 1; i >= 0; i-- { + var cond gmtypes.IncrementalCondData + jobCond := jobConditions[i] + if jobCond.Stage == stage && jobCond.Type == currentType { + if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil { + continue + } - getModels := func(stage sednav1.ILJobStage, currentType sednav1.ILJobStageConditionType, dataType string) []runtime.Model { - // TODO: runtime.type changes to common.type for gm and lc - for i := len(jobConditions) - 1; i >= 0; i-- { - var cond gmtypes.IncrementalCondData - jobCond := jobConditions[i] - if jobCond.Stage == stage && jobCond.Type == currentType { - if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil { + if dataType == "input" { + if cond.Input == nil { continue } - if dataType == "input" { - if cond.Input == nil { - continue - } - - return cond.Input.Models - } else if dataType == "output" { - if cond.Output == nil { - continue - } - - return cond.Output.Models + return cond.Input.Models + } else if dataType == "output" { + if cond.Output == nil { + continue } + + return cond.Output.Models } } + } - return nil + return nil +} + +// getEvalResult gets eval result from job conditions +func (im *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) { + jobConditions := job.Status.Conditions + models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobEval, sednav1.ILJobStageCondCompleted, "output") + + var result []map[string][]float64 + var err error + for _, m := range models { + bytes, err := json.Marshal(m.Metrics) + if err != nil { + return nil, err + } + + data := make(map[string][]float64) + if err = json.Unmarshal(bytes, &data); err != nil { + return nil, err + } + + result = append(result, data) } + return result, err +} + +// getJobStageModel gets model from job conditions for train/eval/deploy +func (im *Manager) getJobStageModel(job *Job, jobStage sednav1.ILJobStage) []Model { + jobConditions := job.Status.Conditions switch jobStage { case sednav1.ILJobTrain: // the second model is the pre-trained model of train stage. - models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") + models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") if models != nil { return []Model{{Format: models[1].Format, URL: models[1].URL}} } case sednav1.ILJobEval: // the first model is the output model of train stage. - models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") + models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") if models != nil { return []Model{{Format: models[0].Format, URL: models[0].URL}} } @@ -690,7 +722,7 @@ func (im *Manager) getModelFromJobConditions(job *Job, jobStage sednav1.ILJobSta // the first model is the output model of train stage, which was evaluated as better than the second model in eval stage. // the second model is the serving model used in the inference worker. var deployModels []Model - models := getModels(sednav1.ILJobEval, sednav1.ILJobStageCondReady, "input") + models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobEval, sednav1.ILJobStageCondReady, "input") for _, m := range models { deployModels = append(deployModels, Model{Format: m.Format, URL: m.URL}) } @@ -726,7 +758,7 @@ func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { if rounds <= 1 { m = jobConfig.TrainModel } else { - models := im.getModelFromJobConditions(job, latestCondition.Stage) + models := im.getJobStageModel(job, latestCondition.Stage) if models != nil { m = &models[0] } @@ -773,7 +805,7 @@ func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro latestCondition := im.getLatestCondition(job) - ms := im.getModelFromJobConditions(job, latestCondition.Stage) + ms := im.getJobStageModel(job, latestCondition.Stage) if ms == nil { return nil, err } @@ -818,33 +850,14 @@ func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro func (im *Manager) triggerDeployTask(job *Job) (bool, error) { jobConfig := job.JobConfig - // EvalResult must has two models info, first is trained model, second is deployed model. - if len(jobConfig.EvalResult) != 2 { - return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult)) - } - - getMetrics := func(metrics map[string]interface{}) (map[string][]float64, error) { - var err error - bytes, err := json.Marshal(metrics) - if err != nil { - return nil, err - } - - data := make(map[string][]float64) - if err := json.Unmarshal(bytes, &data); err != nil { - return nil, err - } - return data, err - } - - newMetrics, err := getMetrics(jobConfig.EvalResult[0].Metrics) - if err != nil { - return false, err - } - oldMetrics, err := getMetrics(jobConfig.EvalResult[1].Metrics) - if err != nil { + evalResult, err := im.getEvalResult(job) + if err != nil && len(evalResult) < 2 { + klog.Errorf("job(name=%s failed to get eval result(%v): %+w", job.Name, evalResult, err) return false, err } + + newMetrics := evalResult[0] + oldMetrics := evalResult[1] metricDelta := make(map[string]interface{}) for metric := range newMetrics { @@ -1177,59 +1190,6 @@ func (im *Manager) monitorWorker() { klog.Errorf("job(%s) failed to write message: %v", name, err) continue } - - im.handleWorkerMessage(job, workerMessage) - } -} - -// handleWorkerMessage handles message from worker -func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) { - latestCond := im.getLatestCondition(job) - jobStage := strings.ToLower(string(latestCond.Stage)) - workerKind := strings.ToLower(workerMessage.Kind) - - if jobStage != workerKind { - klog.Warningf("job(%s)'s %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier, - jobStage, workerKind) - return - } - - var models []Model - for _, result := range workerMessage.Results { - metrics := make(map[string]interface{}) - if m, ok := result["metrics"]; ok { - bytes, err := json.Marshal(m) - if err != nil { - return - } - - err = json.Unmarshal(bytes, &metrics) - if err != nil { - klog.Warningf("failed to unmarshal the worker(name=%s) metrics %v, err: %v", - workerMessage.Name, - m, - err) - } - } - - model := Model{ - Format: result["format"].(string), - URL: result["url"].(string), - Metrics: metrics} - models = append(models, model) - } - - workerStatus := workerMessage.Status - jobName := job.JobConfig.UniqueIdentifier - - if workerStatus == workertypes.CompletedStatus { - klog.Infof("job(%s) completed the %s task successfully", jobName, jobStage) - switch latestCond.Stage { - case sednav1.ILJobEval: - job.JobConfig.EvalResult = models - // when eval worker is completed status, the deploy task will starts immediately without waiting for the notification of GM. - im.deployTask(job) - } } }