From fb483c6c6e21d5b763991f1558cccd8f71791d99 Mon Sep 17 00:00:00 2001 From: llhuii Date: Thu, 22 Apr 2021 15:47:29 +0800 Subject: [PATCH] gm: add s3-like storage support with public access 1. add s3-like storage support 2. abstract storage-related(s3, hostpath) into storage_initializer_injector.go 3. refactor worker-related code into worker.go Signed-off-by: llhuii --- pkg/globalmanager/common.go | 163 +++------ pkg/globalmanager/federatedlearningjob.go | 106 +++--- pkg/globalmanager/incrementallearningjob.go | 187 +++++----- pkg/globalmanager/jointinferenceservice.go | 110 +++--- .../storage_initializer_injector.go | 319 ++++++++++++++++++ pkg/globalmanager/types.go | 14 - pkg/globalmanager/worker.go | 102 ++++++ 7 files changed, 649 insertions(+), 352 deletions(-) create mode 100644 pkg/globalmanager/storage_initializer_injector.go create mode 100644 pkg/globalmanager/worker.go diff --git a/pkg/globalmanager/common.go b/pkg/globalmanager/common.go index 52565304..480242f6 100644 --- a/pkg/globalmanager/common.go +++ b/pkg/globalmanager/common.go @@ -30,7 +30,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - k8scontroller "k8s.io/kubernetes/pkg/controller" ) const ( @@ -39,63 +38,10 @@ const ( // MaxBackOff is the max backoff period MaxBackOff = 360 * time.Second statusUpdateRetries = 3 - // setting some prefix for container path, include data and code prefix - dataPrefix = "/home/data" + bigModelPort int32 = 5000 ) -// CreateVolumeMap creates volumeMap for container and -// returns volumeMounts and volumes for stage of creating pod -func CreateVolumeMap(workerPara *WorkerPara) ([]v1.VolumeMount, []v1.Volume) { - var volumeMounts []v1.VolumeMount - var volumes []v1.Volume - volumetype := v1.HostPathDirectory - mountPathMap := make(map[string]bool) - duplicateIdx := make(map[int]bool) - for i, v := range workerPara.volumeMountList { - if mountPathMap[v] { - duplicateIdx[i] = true - continue - } - mountPathMap[v] = true - tempVolumeMount := v1.VolumeMount{ - MountPath: v, - Name: workerPara.volumeMapName[i], - } - volumeMounts = append(volumeMounts, tempVolumeMount) - } - for i, v := range workerPara.volumeList { - if duplicateIdx[i] { - continue - } - tempVolume := v1.Volume{ - Name: workerPara.volumeMapName[i], - VolumeSource: v1.VolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: v, - Type: &volumetype, - }, - }, - } - volumes = append(volumes, tempVolume) - } - return volumeMounts, volumes -} - -// CreateEnvVars creates EnvMap for container -// include EnvName and EnvValue map for stage of creating a pod -func CreateEnvVars(envMap map[string]string) []v1.EnvVar { - var envVars []v1.EnvVar - for envName, envValue := range envMap { - Env := v1.EnvVar{ - Name: envName, - Value: envValue, - } - envVars = append(envVars, Env) - } - return envVars -} - // GetNodeIPByName get node ip by node name func GetNodeIPByName(kubeClient kubernetes.Interface, name string) (string, error) { n, err := kubeClient.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{}) @@ -119,25 +65,6 @@ func GetNodeIPByName(kubeClient kubernetes.Interface, name string) (string, erro return "", fmt.Errorf("can't found node ip for node %s", name) } -// GenerateLabels generates labels for an object -func GenerateLabels(object CommonInterface) map[string]string { - kind := object.GroupVersionKind().Kind - group := object.GroupVersionKind().Group - name := object.GetName() - key := strings.ToLower(kind) + "." + group + "/name" - labels := make(map[string]string) - labels[key] = name - return labels -} - -// GenerateSelector generates selector for an object -func GenerateSelector(object CommonInterface) (labels.Selector, error) { - ls := &metav1.LabelSelector{ - MatchLabels: GenerateLabels(object), - } - return metav1.LabelSelectorAsSelector(ls) -} - // CreateKubernetesService creates a k8s service for an object given ip and port func CreateKubernetesService(kubeClient kubernetes.Interface, object CommonInterface, inputPort int32, inputIP string) (int32, error) { ctx := context.Background() @@ -213,56 +140,58 @@ func calcActivePodCount(pods []*v1.Pod) int32 { return result } -// injectWorkerPara modifies pod in-place -func injectWorkerPara(pod *v1.Pod, workerPara *WorkerPara, object CommonInterface) { - // inject our predefined volumes/envs - volumeMounts, volumes := CreateVolumeMap(workerPara) - envs := CreateEnvVars(workerPara.env) - pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...) - for idx := range pod.Spec.Containers { - pod.Spec.Containers[idx].Env = append( - pod.Spec.Containers[idx].Env, envs..., - ) - pod.Spec.Containers[idx].VolumeMounts = append( - pod.Spec.Containers[idx].VolumeMounts, volumeMounts..., - ) - } - - // inject our labels - if pod.Labels == nil { - pod.Labels = make(map[string]string) - } - for k, v := range GenerateLabels(object) { - pod.Labels[k] = v - } +// GenerateLabels generates labels for an object +func GenerateLabels(object CommonInterface) map[string]string { + kind := object.GroupVersionKind().Kind + group := object.GroupVersionKind().Group - pod.GenerateName = object.GetName() + "-" + strings.ToLower(workerPara.workerType) + "-" + keyPrefix := strings.ToLower(kind + "." + group + "/") - pod.Namespace = object.GetNamespace() + labels := make(map[string]string) + labels[keyPrefix+"name"] = object.GetName() + labels[keyPrefix+"uid"] = string(object.GetUID()) + return labels +} - if workerPara.hostNetwork { - // FIXME - // force to set hostnetwork - pod.Spec.HostNetwork = true +// GenerateSelector generates the selector for an object +func GenerateSelector(object CommonInterface) (labels.Selector, error) { + ls := &metav1.LabelSelector{ + MatchLabels: GenerateLabels(object), } + return metav1.LabelSelectorAsSelector(ls) +} + +// ConvertK8SValidName converts to the k8s valid name +func ConvertK8SValidName(name string) string { + // the name(e.g. pod/volume name) should be a lowercase RFC 1123 label: + // [a-z0-9]([-a-z0-9]*[a-z0-9])? + // and no more than 63 characters + limitCount := 63 + var fixName []byte + for _, c := range []byte(strings.ToLower(name)) { + if ('a' <= c && c <= 'z') || + ('0' <= c && c <= '9') || + c == '-' { + fixName = append(fixName, c) + continue + } - if pod.Spec.RestartPolicy == "" { - pod.Spec.RestartPolicy = workerPara.restartPolicy + // the first char not '-' + // and no two consecutive '-' + if len(fixName) > 0 && fixName[len(fixName)-1] != '-' { + fixName = append(fixName, '-') + } } -} -// createPodWithTemplate creates and returns a pod object given a crd object, pod template, and workerPara -func createPodWithTemplate(client kubernetes.Interface, object CommonInterface, spec *v1.PodTemplateSpec, workerPara *WorkerPara) (*v1.Pod, error) { - objectKind := object.GroupVersionKind() - pod, _ := k8scontroller.GetPodFromTemplate(spec, object, metav1.NewControllerRef(object, objectKind)) - injectWorkerPara(pod, workerPara, object) + // fix limitCount + if len(fixName) > limitCount { + fixName = fixName[:limitCount] + } - createdPod, err := client.CoreV1().Pods(object.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{}) - objectName := object.GetNamespace() + "/" + object.GetName() - if err != nil { - klog.Warningf("failed to create pod(type=%s) for %s %s, err:%s", workerPara.workerType, objectKind, objectName, err) - return nil, err + // fix the end character + if len(fixName) > 0 && fixName[len(fixName)-1] == '-' { + fixName[len(fixName)-1] = 'z' } - klog.V(2).Infof("pod %s is created successfully for %s %s", createdPod.Name, objectKind, objectName) - return createdPod, nil + + return string(fixName) } diff --git a/pkg/globalmanager/federatedlearningjob.go b/pkg/globalmanager/federatedlearningjob.go index 1a1a06a3..74c242c2 100644 --- a/pkg/globalmanager/federatedlearningjob.go +++ b/pkg/globalmanager/federatedlearningjob.go @@ -18,9 +18,7 @@ package globalmanager import ( "context" - "encoding/json" "fmt" - "path/filepath" "strconv" "time" @@ -411,43 +409,38 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act return active, fmt.Errorf("failed to get model %s: %w", modelName, err) } - modelPath := model.Spec.URL participantsCount := strconv.Itoa(len(job.Spec.TrainingWorkers)) - // convert crd to json, and put them into env of container - modeljson, _ := json.Marshal(model) - modelstring := string(modeljson) - // deliver pod for aggregation worker aggWorker := job.Spec.AggregationWorker - // Container VolumeMounts parameters - aggModelConPath := dataPrefix + modelPath - - // Env parameters for agg - aggModelURL := aggModelConPath - - // Configure container mounting and Env information by initial WorkerPara + // Configure container mounting and Env information by initial WorkerParam var aggPort int32 = 7363 - var aggWorkerPara *WorkerPara = new(WorkerPara) - aggWorkerPara.volumeMountList = []string{aggModelConPath} - aggWorkerPara.volumeList = []string{modelPath} - aggWorkerPara.volumeMapName = []string{"model"} - aggWorkerPara.env = map[string]string{ - "MODEL": modelstring, - "WORKER_NAME": "aggworker-" + utilrand.String(5), - "JOB_NAME": job.Name, - "PARTICIPANTS_COUNT": participantsCount, - "MODEL_URL": aggModelURL, - "NAMESPACE": job.Namespace, + var aggWorkerParam *WorkerParam = new(WorkerParam) + aggWorkerParam.env = map[string]string{ + "NAMESPACE": job.Namespace, + "WORKER_NAME": "aggworker-" + utilrand.String(5), + "JOB_NAME": job.Name, + "AGG_BIND_PORT": strconv.Itoa(int(aggPort)), + "PARTICIPANTS_COUNT": participantsCount, } - aggWorkerPara.workerType = FLJobStageAgg - aggWorkerPara.restartPolicy = v1.RestartPolicyOnFailure + aggWorkerParam.workerType = FLJobStageAgg + aggWorkerParam.restartPolicy = v1.RestartPolicyOnFailure + + aggWorkerParam.mounts = append(aggWorkerParam.mounts, + WorkerMount{ + URL: &MountURL{ + URL: model.Spec.URL, + Mode: workerMountWriteOnly, + }, + EnvName: "MODEL_URL", + }, + ) // create aggpod based on configured parameters - _, err = createPodWithTemplate(fc.kubeClient, job, &aggWorker.Template, aggWorkerPara) + _, err = createPodWithTemplate(fc.kubeClient, job, &aggWorker.Template, aggWorkerParam) if err != nil { return active, err } @@ -459,6 +452,7 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act // FIXME(llhuii): only the case that Spec.NodeName specified is support, // will support Spec.NodeSelector. appIP, err = GetNodeIPByName(fc.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName) + aggServicePort, err = CreateKubernetesService(fc.kubeClient, job, aggPort, appIP) if err != nil { return active, err @@ -468,34 +462,34 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act // get dataseturl through parsing crd of dataset datasetName := trainingWorker.Dataset.Name dataset, err := fc.client.Datasets(job.Namespace).Get(ctx, datasetName, metav1.GetOptions{}) - datasetjson, _ := json.Marshal(dataset) - datasetstring := string(datasetjson) - var trainDatasetPath string if err != nil { return active, fmt.Errorf("failed to get dataset %s: %w", datasetName, err) } - trainDatasetPath = dataset.Spec.URL - datasetParent := filepath.Dir(trainDatasetPath) - // Container VolumeMounts parameters - trainDataConPath := dataPrefix + datasetParent - trainModelConPath := dataPrefix + modelPath - - // Env parameters for train - trainDatasetURL := dataPrefix + trainDatasetPath - trainModelURL := trainModelConPath - - // Configure container mounting and Env information by initial WorkerPara - var workerPara *WorkerPara = new(WorkerPara) - workerPara.volumeMountList = []string{trainDataConPath, trainModelConPath} - workerPara.volumeList = []string{datasetParent, modelPath} - workerPara.volumeMapName = []string{"data", "model"} - workerPara.env = map[string]string{ - "DATASET": datasetstring, - "AGG_PORT": strconv.Itoa(int(aggServicePort)), - "AGG_IP": appIP, - "MODEL_URL": trainModelURL, - "TRAIN_DATASET_URL": trainDatasetURL, + + // Configure container mounting and Env information by initial WorkerParam + var workerParam *WorkerParam = new(WorkerParam) + + workerParam.mounts = append(workerParam.mounts, + WorkerMount{ + URL: &MountURL{ + URL: model.Spec.URL, + }, + EnvName: "MODEL_URL", + }, + + WorkerMount{ + URL: &MountURL{ + URL: dataset.Spec.URL, + }, + EnvName: "TRAIN_DATASET_URL", + }, + ) + + workerParam.env = map[string]string{ + "AGG_PORT": strconv.Itoa(int(aggServicePort)), + "AGG_IP": appIP, + "WORKER_NAME": "trainworker-" + utilrand.String(5), "JOB_NAME": job.Name, "PARTICIPANTS_COUNT": participantsCount, @@ -504,11 +498,11 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act "DATASET_NAME": datasetName, "LC_SERVER": fc.cfg.LC.Server, } - workerPara.workerType = "train" - workerPara.hostNetwork = true - workerPara.restartPolicy = v1.RestartPolicyOnFailure + workerParam.workerType = "train" + workerParam.hostNetwork = true + workerParam.restartPolicy = v1.RestartPolicyOnFailure // create train pod based on configured parameters - _, err = createPodWithTemplate(fc.kubeClient, job, &trainingWorker.Template, workerPara) + _, err = createPodWithTemplate(fc.kubeClient, job, &trainingWorker.Template, workerParam) if err != nil { return active, err } diff --git a/pkg/globalmanager/incrementallearningjob.go b/pkg/globalmanager/incrementallearningjob.go index 469e9689..a028b026 100644 --- a/pkg/globalmanager/incrementallearningjob.go +++ b/pkg/globalmanager/incrementallearningjob.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "fmt" - "path/filepath" "strings" "time" @@ -503,34 +502,24 @@ func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJo initialModelName := job.Spec.InitialModel.Name deployModelName := job.Spec.DeploySpec.Model.Name - // get basemodel URL, deploymodel, dataset URL - var basemodelPath string - var deploymodelPath string - var datasetPath string - - basemodel, err := jc.client.Models(job.Namespace).Get(ctx, initialModelName, metav1.GetOptions{}) + // check initial model name + _, err = jc.client.Models(job.Namespace).Get(ctx, initialModelName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get initial model %s: %w", initialModelName, err) } - basemodelPath = filepath.Dir(basemodel.Spec.URL) - deploymodel, err := jc.client.Models(job.Namespace).Get(ctx, deployModelName, metav1.GetOptions{}) + _, err = jc.client.Models(job.Namespace).Get(ctx, deployModelName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get deploy model %s: %w", deployModelName, err) } - deploymodelPath = filepath.Dir(deploymodel.Spec.URL) dataset, err := jc.client.Datasets(job.Namespace).Get(ctx, incrementalDatasetName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get dataset %s: %w", incrementalDatasetName, err) } - datasetPath = dataset.Spec.URL - - outputDir := job.Spec.OutputDir - datasetParent := filepath.Dir(datasetPath) // get all url for train and eval from data in condition condDataStr := job.Status.Conditions[len(job.Status.Conditions)-1].Data @@ -542,84 +531,90 @@ func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJo } dataURL := cond.Input.DataURL inputmodelURLs := cond.GetInputModelURLs() - outputmodelURL := cond.Input.OutputDir - - // convert user inputs into its form in the container - var inputmodelURLList []string - var inputmodelURLContain string - var outputmodelURLContain string - var dataURLContain string - - // process inputmodelURLs, add dataPrefix to ench inputmodelURL, return inputmodelURLList - for _, URL := range inputmodelURLs { - inputmodelURLList = append(inputmodelURLList, dataPrefix+URL) - } - - // three container Url for data, inputmodel, outputmodel - inputmodelURLContain = strings.Join(inputmodelURLList, ";") - outputmodelURLContain = dataPrefix + outputmodelURL - dataURLContain = dataPrefix + dataURL - - // Container VolumeMounts parameters - dataConPath := dataPrefix + datasetParent - basemodelConPath := dataPrefix + basemodelPath - deploymodelConPath := dataPrefix + deploymodelPath - outputConPath := dataPrefix + outputDir - originalDatasetPathInContainer := dataPrefix + datasetPath - var workerPara *WorkerPara = new(WorkerPara) + + var workerParam *WorkerParam = new(WorkerParam) if podtype == sednav1.ILJobTrain { - workerPara.workerType = "Train" + workerParam.workerType = "Train" podTemplate = &job.Spec.TrainSpec.Template // Env parameters for train - preModelURL := inputmodelURLContain // premodel savepath before increase - outputModelURL := outputmodelURLContain // outputmodel savepath after increase, should be under outputdir - trainDataURL := dataURLContain - - // Configure container mounting and Env information for train by initial WorkerPara - workerPara.volumeMountList = []string{dataConPath, basemodelConPath, deploymodelConPath, outputConPath} - workerPara.volumeList = []string{datasetParent, basemodelPath, deploymodelPath, outputDir} - workerPara.volumeMapName = []string{"data", "base-model", "deploy-model", "output-dir"} - workerPara.env = map[string]string{ - // see https://github.com/kubeedge/sedna/issues/35 - "ORIGINAL_DATASET_URL": originalDatasetPathInContainer, - "TRAIN_DATASET_URL": trainDataURL, - "MODEL_URL": outputModelURL, - "BASE_MODEL_URL": preModelURL, - "NAMESPACE": job.Namespace, - "JOB_NAME": job.Name, - "WORKER_NAME": "train-worker-" + utilrand.String(5), - "LC_SERVER": jc.cfg.LC.Server, + + workerParam.env = map[string]string{ + "NAMESPACE": job.Namespace, + "JOB_NAME": job.Name, + "WORKER_NAME": "train-worker-" + utilrand.String(5), + + "LC_SERVER": jc.cfg.LC.Server, } + + workerParam.mounts = append(workerParam.mounts, + WorkerMount{ + URL: &MountURL{URL: inputmodelURLs[0]}, + EnvName: "BASE_MODEL_URL", + }, + WorkerMount{ + URL: &MountURL{URL: cond.Input.OutputDir}, + EnvName: "MODEL_URL", + }, + + WorkerMount{ + URL: &MountURL{URL: dataURL}, + EnvName: "TEST_DATASET_URL", + }, + + // see https://github.com/kubeedge/sedna/issues/35 + WorkerMount{ + URL: &MountURL{URL: dataset.Spec.URL}, + EnvName: "ORIGINAL_DATASET_URL", + }, + ) } else { podTemplate = &job.Spec.EvalSpec.Template - workerPara.workerType = "Eval" - - // Env parameters for eval - evalDataURL := dataURLContain - modelForEval := inputmodelURLContain // can be single or multi models - - // Configure container mounting and Env information for eval by initial WorkerPara - workerPara.volumeMountList = []string{dataConPath, basemodelConPath, deploymodelConPath, outputConPath} - workerPara.volumeList = []string{datasetParent, basemodelPath, deploymodelPath, outputDir} - workerPara.volumeMapName = []string{"data", "base-model", "deploy-model", "output-dir"} - workerPara.env = map[string]string{ - "ORIGINAL_DATASET_URL": originalDatasetPathInContainer, - "TEST_DATASET_URL": evalDataURL, - "MODEL_URLS": modelForEval, - "NAMESPACE": job.Namespace, - "JOB_NAME": job.Name, - "WORKER_NAME": "eval-worker-" + utilrand.String(5), - "LC_SERVER": jc.cfg.LC.Server, + workerParam.workerType = "Eval" + + // Configure Env information for eval by initial WorkerParam + workerParam.env = map[string]string{ + "NAMESPACE": job.Namespace, + "JOB_NAME": job.Name, + "WORKER_NAME": "eval-worker-" + utilrand.String(5), + + "LC_SERVER": jc.cfg.LC.Server, + } + + var modelMountURLs []MountURL + for _, url := range inputmodelURLs { + modelMountURLs = append(modelMountURLs, MountURL{URL: url}) } + workerParam.mounts = append(workerParam.mounts, + WorkerMount{ + URLs: modelMountURLs, + Name: "models", + EnvName: "MODEL_URLS", + }, + + WorkerMount{ + URL: &MountURL{URL: dataURL}, + Name: "datasets", + EnvName: "TEST_DATASET_URL", + }, + + WorkerMount{ + // only need when URL is hostpath + URL: &MountURL{ + CheckHostPath: true, + URL: dataset.Spec.URL}, + Name: "origin-dataset", + EnvName: "ORIGINAL_DATASET_URL", + }, + ) } // set the default policy instead of Always policy - workerPara.restartPolicy = v1.RestartPolicyOnFailure - workerPara.hostNetwork = true + workerParam.restartPolicy = v1.RestartPolicyOnFailure + workerParam.hostNetwork = true // create pod based on podtype - _, err = createPodWithTemplate(jc.kubeClient, job, podTemplate, workerPara) + _, err = createPodWithTemplate(jc.kubeClient, job, podTemplate, workerParam) if err != nil { return err } @@ -633,31 +628,31 @@ func (jc *IncrementalJobController) createInferPod(job *sednav1.IncrementalLearn return fmt.Errorf("failed to get infer model %s: %w", infermodelName, err) } - inferModelPath := inferModel.Spec.URL - - // convert crd to JSON, and put them into env of container - inferModelParent := filepath.Dir(inferModelPath) - - // Container VolumeMounts parameters - inferModelConPath := dataPrefix + inferModelParent + inferModelURL := inferModel.Spec.URL // Env parameters for edge - inferModelURL := dataPrefix + inferModelPath HEMParameterJSON, _ := json.Marshal(job.Spec.DeploySpec.HardExampleMining.Parameters) HEMParameterString := string(HEMParameterJSON) - // Configure container mounting and Env information by initial WorkerPara - var workerParam *WorkerPara = new(WorkerPara) - workerParam.volumeMountList = []string{inferModelConPath} - workerParam.volumeList = []string{inferModelParent} - workerParam.volumeMapName = []string{"model"} + // Configure container mounting and Env information by initial WorkerParam + var workerParam *WorkerParam = new(WorkerParam) + workerParam.mounts = append(workerParam.mounts, + WorkerMount{ + URL: &MountURL{URL: inferModelURL}, + Name: "model", + EnvName: "MODEL_URL", + }, + ) + workerParam.env = map[string]string{ - "WORKER_NAME": "inferworker-" + utilrand.String(5), - "MODEL_URL": inferModelURL, - "NAMESPACE": job.Namespace, + "NAMESPACE": job.Namespace, + "JOB_NAME": job.Name, + "WORKER_NAME": "inferworker-" + utilrand.String(5), + "HEM_NAME": job.Spec.DeploySpec.HardExampleMining.Name, "HEM_PARAMETERS": HEMParameterString, - "LC_SERVER": jc.cfg.LC.Server, + + "LC_SERVER": jc.cfg.LC.Server, } workerParam.workerType = "inference" diff --git a/pkg/globalmanager/jointinferenceservice.go b/pkg/globalmanager/jointinferenceservice.go index 25ecfb7f..b7a90d98 100644 --- a/pkg/globalmanager/jointinferenceservice.go +++ b/pkg/globalmanager/jointinferenceservice.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "fmt" - "path/filepath" "strconv" "time" @@ -397,8 +396,8 @@ func isJointinferenceserviceFinished(j *sednav1.JointInferenceService) bool { func (jc *JointInferenceServiceController) createPod(service *sednav1.JointInferenceService) (active int32, err error) { active = 0 - // create pod for cloudPod - err = jc.createCloudPod(service) + // create cloud worker + err = jc.createCloudWorker(service) if err != nil { return active, err } @@ -414,8 +413,8 @@ func (jc *JointInferenceServiceController) createPod(service *sednav1.JointInfer return active, err } - // create pod for edgePod - err = jc.createEdgePod(service, bigServicePort) + // create edge worker + err = jc.createEdgeWorker(service, bigServicePort) if err != nil { return active, err } @@ -424,58 +423,42 @@ func (jc *JointInferenceServiceController) createPod(service *sednav1.JointInfer return active, err } -func (jc *JointInferenceServiceController) createCloudPod(service *sednav1.JointInferenceService) error { +func (jc *JointInferenceServiceController) createCloudWorker(service *sednav1.JointInferenceService) error { // deliver pod for cloudworker - ctx := context.Background() - var cloudModelPath string cloudModelName := service.Spec.CloudWorker.Model.Name - cloudModel, err := jc.client.Models(service.Namespace).Get(ctx, cloudModelName, metav1.GetOptions{}) + cloudModel, err := jc.client.Models(service.Namespace).Get(context.Background(), cloudModelName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get cloud model %s: %w", cloudModelName, err) } - cloudModelPath = cloudModel.Spec.URL - - // convert crd to json, and put them into env of container - cloudModelJSON, _ := json.Marshal(cloudModel) - cloudModelString := string(cloudModelJSON) - cloudModelParent := filepath.Dir(cloudModelPath) + var workerParam WorkerParam - // Container VolumeMounts parameters - cloudModelConPath := dataPrefix + cloudModelParent + workerParam.mounts = append(workerParam.mounts, WorkerMount{ + URL: &MountURL{URL: cloudModel.Spec.URL}, + Name: "model", + EnvName: "MODEL_URL", + }) - // Env parameters for cloud - cloudModelURL := dataPrefix + cloudModelPath + workerParam.env = map[string]string{ + "NAMESPACE": service.Namespace, + "SERVICE_NAME": service.Name, + "WORKER_NAME": "cloudworker-" + utilrand.String(5), - // Configure container mounting and Env information by initial WorkerPara - var cloudContainer *WorkerPara = new(WorkerPara) - cloudContainer.volumeMountList = []string{cloudModelConPath} - cloudContainer.volumeList = []string{cloudModelParent} - cloudContainer.volumeMapName = []string{"code", "model"} - cloudContainer.env = map[string]string{ - "MODEL": cloudModelString, - "WORKER_NAME": "cloudworker-" + utilrand.String(5), - "SERVICE_NAME": service.Name, - "MODEL_URL": cloudModelURL, - "NAMESPACE": service.Namespace, "BIG_MODEL_BIND_PORT": strconv.Itoa(int(bigModelPort)), } - cloudContainer.workerType = jointInferenceForCloud + workerParam.workerType = jointInferenceForCloud // create cloud pod _, err = createPodWithTemplate(jc.kubeClient, service, &service.Spec.CloudWorker.Template, - cloudContainer) - if err != nil { - return err - } - return nil + &workerParam) + return err } -func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointInferenceService, bigServicePort int32) error { +func (jc *JointInferenceServiceController) createEdgeWorker(service *sednav1.JointInferenceService, bigServicePort int32) error { // deliver pod for edgeworker ctx := context.Background() edgeModelName := service.Spec.EdgeWorker.Model.Name @@ -484,7 +467,6 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI return fmt.Errorf("failed to get edge model %s: %w", edgeModelName, err) } - edgeModelPath := edgeModel.Spec.URL // FIXME(llhuii): only the case that Spec.NodeName specified is support, // will support Spec.NodeSelector. @@ -494,51 +476,41 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI return fmt.Errorf("failed to get node ip: %w", err) } - // convert crd to json, and put them into env of container - edgeModelJSON, _ := json.Marshal(edgeModel) - edgeModelString := string(edgeModelJSON) - edgeModelParent := filepath.Dir(edgeModelPath) - edgeWorker := service.Spec.EdgeWorker HEMParameterJSON, _ := json.Marshal(edgeWorker.HardExampleMining.Parameters) HEMParameterString := string(HEMParameterJSON) - // Container VolumeMounts parameters - edgeModelConPath := dataPrefix + edgeModelParent - - // Env parameters for edge - edgeModelURL := dataPrefix + edgeModelPath - - // Configure container mounting and Env information by initial WorkerPara - var edgeContainer *WorkerPara = new(WorkerPara) - edgeContainer.volumeMountList = []string{edgeModelConPath} - edgeContainer.volumeList = []string{edgeModelParent} - edgeContainer.volumeMapName = []string{"code", "model"} - edgeContainer.env = map[string]string{ - "MODEL": edgeModelString, - "WORKER_NAME": "edgeworker-" + utilrand.String(5), - "SERVICE_NAME": service.Name, + var workerParam WorkerParam + + workerParam.mounts = append(workerParam.mounts, WorkerMount{ + URL: &MountURL{URL: edgeModel.Spec.URL}, + Name: "model", + EnvName: "MODEL_URL", + }) + + workerParam.env = map[string]string{ + "NAMESPACE": service.Namespace, + "SERVICE_NAME": service.Name, + "WORKER_NAME": "edgeworker-" + utilrand.String(5), + "BIG_MODEL_IP": bigModelIP, "BIG_MODEL_PORT": strconv.Itoa(int(bigServicePort)), - "HEM_PARAMETERS": HEMParameterString, - "MODEL_URL": edgeModelURL, - "NAMESPACE": service.Namespace, + "HEM_NAME": edgeWorker.HardExampleMining.Name, - "LC_SERVER": jc.cfg.LC.Server, + "HEM_PARAMETERS": HEMParameterString, + + "LC_SERVER": jc.cfg.LC.Server, } - edgeContainer.workerType = jointInferenceForEdge - edgeContainer.hostNetwork = true + workerParam.workerType = jointInferenceForEdge + workerParam.hostNetwork = true // create edge pod _, err = createPodWithTemplate(jc.kubeClient, service, &service.Spec.EdgeWorker.Template, - edgeContainer) - if err != nil { - return err - } - return nil + &workerParam) + return err } // GetName returns the name of the joint inference controller diff --git a/pkg/globalmanager/storage_initializer_injector.go b/pkg/globalmanager/storage_initializer_injector.go new file mode 100644 index 00000000..ed5f82df --- /dev/null +++ b/pkg/globalmanager/storage_initializer_injector.go @@ -0,0 +1,319 @@ +/* +Copyright 2021 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package globalmanager + +import ( + "net/url" + "path/filepath" + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" +) + +const ( + downloadInitalizerContainerName = "storage-initializer" + downloadInitalizerImage = "kubeedge/sedna-storage-initializer:v0.1.0" + + downloadInitalizerPrefix = "/downloads" + downloadInitalizerVolumeName = "sedna-storage-initializer" + + hostPathPrefix = "/hostpath" + urlsFieldSep = ";" + volumeNamePrefix = "sedna-" +) + +var supportStorageInitializerURLSchemes = [...]string{ + // s3 compatible storage + "s3", + + // http server, only for downloading + "http", "https", +} + +var supportURLSchemes = [...]string{ + // s3 compatbile storage + "s3", + + // http server, only for downloading + "http", "https", + + // hostpath of node, for compatibility only + // "/opt/data/model.pb" + "", + + // the local path of worker-container + "file", +} + +type workerMountMode string + +const ( + workerMountReadOnly workerMountMode = "readonly" + workerMountWriteOnly workerMountMode = "writeonly" + +// no read-write support for mount url/directory now +) + +type MountURL struct { + // URL is the url of dataset/model + URL string + + // Mode indicates the url mode, default is workerMountReadOnly + Mode workerMountMode + + // IsDir indicates that url is directory + IsDir bool + + // if true, only mounts when url is hostpath + CheckHostPath bool + + // the container path + ContainerPath string + + // indicates the path this url will be mounted into container. + // can be containerPath or its parent dir + MountPath string + + // for host path, we just need to mount without downloading + HostPath string + + // for storage initializer + DownloadSrcURL string + DownloadDstDir string + + // if Disable, then no mount + Disable bool + + // parsed for the parent of url + u *url.URL +} + +func (m *MountURL) Parse() { + u, _ := url.Parse(m.URL) + + m.u = u + m.injectDownloadPath() + m.injectHostPath() +} + +func (m *MountURL) injectDownloadPath() { + if m.Mode == workerMountWriteOnly { + // no storage-initializer for write only + // leave the write operation to worker + return + } + + for _, scheme := range supportStorageInitializerURLSchemes { + if m.u.Scheme == scheme { + m.MountPath = downloadInitalizerPrefix + + // here use u.Host + u.Path to avoid conflict + m.ContainerPath = filepath.Join(m.MountPath, m.u.Host+m.u.Path) + + m.DownloadSrcURL = m.URL + m.DownloadDstDir, _ = filepath.Split(m.ContainerPath) + + break + } + } +} + +func (m *MountURL) injectHostPath() { + // for compatibility, hostpath of a node is supported. + // e.g. the url of a dataset: /datasets/d1/label.txt + if m.u.Scheme != "" { + if m.CheckHostPath { + m.Disable = true + } + return + } + + if m.IsDir { + m.HostPath = m.URL + m.MountPath = filepath.Join(hostPathPrefix, m.u.Path) + m.ContainerPath = m.MountPath + } else { + // if file, here mount its directory + m.HostPath, _ = filepath.Split(m.URL) + m.ContainerPath = filepath.Join(hostPathPrefix, m.u.Path) + m.MountPath, _ = filepath.Split(m.ContainerPath) + } +} + +func injectHostPathMount(pod *v1.Pod, workerParam *WorkerParam) { + var volumes []v1.Volume + var volumeMounts []v1.VolumeMount + + uniqVolumeName := make(map[string]bool) + + hostPathType := v1.HostPathDirectory + + for _, mount := range workerParam.mounts { + for _, m := range mount.URLs { + if m.HostPath != "" { + volumeName := ConvertK8SValidName(m.HostPath) + + if volumeName == "" { + klog.Warningf("failed to convert volume name from the url and skipped: %s", m.URL) + continue + } + + if _, ok := uniqVolumeName[volumeName]; !ok { + volumes = append(volumes, v1.Volume{ + Name: volumeName, + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: m.HostPath, + Type: &hostPathType, + }, + }, + }) + uniqVolumeName[volumeName] = true + } + + vm := v1.VolumeMount{ + MountPath: m.MountPath, + Name: volumeName, + } + volumeMounts = append(volumeMounts, vm) + } + } + } + injectVolume(pod, volumes, volumeMounts) +} + +func injectDownloadInitializer(pod *v1.Pod, workerParam *WorkerParam) { + var volumes []v1.Volume + var volumeMounts []v1.VolumeMount + + var downloadPairs []string + for _, mount := range workerParam.mounts { + for _, m := range mount.URLs { + if m.DownloadSrcURL != "" && m.DownloadDstDir != "" { + // srcURL dstDir + // need to add srcURL first + downloadPairs = append(downloadPairs, m.DownloadSrcURL, m.DownloadDstDir) + } + } + } + + // no need to download + if len(downloadPairs) == 0 { + return + } + + // use one empty directory + storageVolume := v1.Volume{ + Name: downloadInitalizerVolumeName, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + } + + storageVolumeMounts := v1.VolumeMount{ + Name: storageVolume.Name, + MountPath: downloadInitalizerPrefix, + ReadOnly: true, + } + volumes = append(volumes, storageVolume) + volumeMounts = append(volumeMounts, storageVolumeMounts) + + initVolumeMounts := []v1.VolumeMount{ + { + Name: storageVolume.Name, + MountPath: downloadInitalizerPrefix, + ReadOnly: false, + }, + } + + initContainer := v1.Container{ + Name: downloadInitalizerContainerName, + Image: downloadInitalizerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Args: downloadPairs, + + TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError, + + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + // limit one cpu + v1.ResourceCPU: resource.MustParse("1"), + // limit 1Gi memory + v1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + VolumeMounts: initVolumeMounts, + } + + pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer) + injectVolume(pod, volumes, volumeMounts) +} + +// InjectStorageInitializer injects these storage related volumes and envs into pod in-place +func InjectStorageInitializer(pod *v1.Pod, workerParam *WorkerParam) { + var mounts []WorkerMount + // parse the mounts and environment key + for _, mount := range workerParam.mounts { + var envPaths []string + + if mount.URL != nil { + mount.URLs = append(mount.URLs, *mount.URL) + } + + var mountURLs []MountURL + for _, m := range mount.URLs { + m.Parse() + if m.Disable { + continue + } + mountURLs = append(mountURLs, m) + envPaths = append(envPaths, m.ContainerPath) + } + + if len(mountURLs) > 0 { + mount.URLs = mountURLs + mounts = append(mounts, mount) + } + if mount.EnvName != "" { + workerParam.env[mount.EnvName] = strings.Join( + envPaths, urlsFieldSep, + ) + } + } + + workerParam.mounts = mounts + + injectHostPathMount(pod, workerParam) + injectDownloadInitializer(pod, workerParam) +} + +func injectVolume(pod *v1.Pod, volumes []v1.Volume, volumeMounts []v1.VolumeMount) { + if len(volumes) > 0 { + pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...) + } + + if len(volumeMounts) > 0 { + for idx := range pod.Spec.Containers { + // inject every containers + pod.Spec.Containers[idx].VolumeMounts = append( + pod.Spec.Containers[idx].VolumeMounts, volumeMounts..., + ) + } + } +} diff --git a/pkg/globalmanager/types.go b/pkg/globalmanager/types.go index dc0e428c..33e28b13 100644 --- a/pkg/globalmanager/types.go +++ b/pkg/globalmanager/types.go @@ -19,25 +19,11 @@ package globalmanager import ( "encoding/json" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) -// WorkerPara describes the system-defined parameters of worker -type WorkerPara struct { - volumeMountList []string - volumeList []string - volumeMapName []string - env map[string]string - workerType string - // if true, force to use hostNetwork - hostNetwork bool - - restartPolicy v1.RestartPolicy -} - // CommonInterface describes the commom interface of CRs type CommonInterface interface { metav1.Object diff --git a/pkg/globalmanager/worker.go b/pkg/globalmanager/worker.go new file mode 100644 index 00000000..dbb09bac --- /dev/null +++ b/pkg/globalmanager/worker.go @@ -0,0 +1,102 @@ +package globalmanager + +import ( + "context" + "strings" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + k8scontroller "k8s.io/kubernetes/pkg/controller" +) + +type WorkerMount struct { + Name string + // the url to be mounted + URL *MountURL + + // for some cases, there are more than one url to be mounted + URLs []MountURL + + // envName indicates the environment key of the mounts injected to the worker + EnvName string +} + +// WorkerParam describes the system-defined parameters of worker +type WorkerParam struct { + mounts []WorkerMount + + env map[string]string + workerType string + + // if true, force to use hostNetwork + hostNetwork bool + + restartPolicy v1.RestartPolicy +} + +// injectWorkerParam modifies pod in-place +func injectWorkerParam(pod *v1.Pod, workerParam *WorkerParam, object CommonInterface) { + InjectStorageInitializer(pod, workerParam) + + envs := createEnvVars(workerParam.env) + for idx := range pod.Spec.Containers { + pod.Spec.Containers[idx].Env = append( + pod.Spec.Containers[idx].Env, envs..., + ) + } + + // inject our labels + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + + for k, v := range GenerateLabels(object) { + pod.Labels[k] = v + } + + pod.GenerateName = object.GetName() + "-" + strings.ToLower(workerParam.workerType) + "-" + + pod.Namespace = object.GetNamespace() + + if workerParam.hostNetwork { + // FIXME + // force to set hostnetwork + pod.Spec.HostNetwork = true + } + + if pod.Spec.RestartPolicy == "" { + pod.Spec.RestartPolicy = workerParam.restartPolicy + } +} + +// createPodWithTemplate creates and returns a pod object given a crd object, pod template, and workerParam +func createPodWithTemplate(client kubernetes.Interface, object CommonInterface, spec *v1.PodTemplateSpec, workerParam *WorkerParam) (*v1.Pod, error) { + objectKind := object.GroupVersionKind() + pod, _ := k8scontroller.GetPodFromTemplate(spec, object, metav1.NewControllerRef(object, objectKind)) + injectWorkerParam(pod, workerParam, object) + + createdPod, err := client.CoreV1().Pods(object.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{}) + objectName := object.GetNamespace() + "/" + object.GetName() + if err != nil { + klog.Warningf("failed to create pod(type=%s) for %s %s, err:%s", workerParam.workerType, objectKind, objectName, err) + return nil, err + } + klog.V(2).Infof("pod %s is created successfully for %s %s", createdPod.Name, objectKind, objectName) + return createdPod, nil +} + +// createEnvVars creates EnvMap for container +// include EnvName and EnvValue map for stage of creating a pod +func createEnvVars(envMap map[string]string) []v1.EnvVar { + var envVars []v1.EnvVar + for envName, envValue := range envMap { + Env := v1.EnvVar{ + Name: envName, + Value: envValue, + } + envVars = append(envVars, Env) + } + return envVars +}