Browse Source

Add pod template for joint inference

Signed-off-by: llhuii <liulinghui@huawei.com>
tags/v0.1.0
llhuii 4 years ago
parent
commit
1f010862f1
7 changed files with 13194 additions and 142 deletions
  1. +13132
    -66
      build/crds/sedna.io_jointinferenceservices.yaml
  2. +2
    -1
      lib/sedna/context.py
  3. +5
    -7
      pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go
  4. +2
    -2
      pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go
  5. +23
    -0
      pkg/globalmanager/common.go
  6. +3
    -2
      pkg/globalmanager/downstream.go
  7. +27
    -64
      pkg/globalmanager/jointinferenceservice.go

+ 13132
- 66
build/crds/sedna.io_jointinferenceservices.yaml
File diff suppressed because it is too large
View File


+ 2
- 1
lib/sedna/context.py View File

@@ -14,6 +14,7 @@

import json
import logging
import os

from sedna.common.config import BaseConfig

@@ -45,7 +46,7 @@ class Context:
`PARAMETERS` and `HEM_PARAMETERS` field"""

def __init__(self):
self.parameters = parse_parameters(BaseConfig.parameters)
self.parameters = os.environ
self.hem_parameters = parse_parameters(BaseConfig.hem_parameters)

def get_context(self):


+ 5
- 7
pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go View File

@@ -44,17 +44,15 @@ type JointInferenceServiceSpec struct {

// EdgeWorker describes the data a edge worker should have
type EdgeWorker struct {
Model SmallModel `json:"model"`
NodeName string `json:"nodeName"`
HardExampleMining HardExampleMining `json:"hardExampleMining"`
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
Model SmallModel `json:"model"`
HardExampleMining HardExampleMining `json:"hardExampleMining"`
Template v1.PodTemplateSpec `json:"template"`
}

// CloudWorker describes the data a cloud worker should have
type CloudWorker struct {
Model BigModel `json:"model"`
NodeName string `json:"nodeName"`
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
Model BigModel `json:"model"`
Template v1.PodTemplateSpec `json:"template"`
}

// SmallModel describes the small model


+ 2
- 2
pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go View File

@@ -79,7 +79,7 @@ func (in *BigModel) DeepCopy() *BigModel {
func (in *CloudWorker) DeepCopyInto(out *CloudWorker) {
*out = *in
out.Model = in.Model
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.Template.DeepCopyInto(&out.Template)
return
}

@@ -268,7 +268,7 @@ func (in *EdgeWorker) DeepCopyInto(out *EdgeWorker) {
*out = *in
out.Model = in.Model
in.HardExampleMining.DeepCopyInto(&out.HardExampleMining)
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.Template.DeepCopyInto(&out.Template)
return
}



+ 23
- 0
pkg/globalmanager/common.go View File

@@ -223,3 +223,26 @@ func calcActivePodCount(pods []*v1.Pod) int32 {
}
return result
}

func InjectContainerPara(pod *v1.Pod, containerPara *ContainerPara, object CommonInterface) {

// inject our predefined volumes/envs
volumeMounts, volumes := CreateVolumeMap(containerPara)
envs := CreateEnvVars(containerPara.env)
pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
for idx := range pod.Spec.Containers {
pod.Spec.Containers[idx].Env = append(
pod.Spec.Containers[idx].Env, envs...,
)
pod.Spec.Containers[idx].VolumeMounts = append(
pod.Spec.Containers[idx].VolumeMounts, volumeMounts...,
)
}

if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
for k, v := range GenerateLabels(object) {
pod.Labels[k] = v
}
}

+ 3
- 2
pkg/globalmanager/downstream.go View File

@@ -60,7 +60,8 @@ func (dc *DownstreamController) syncDataset(eventType watch.EventType, dataset *
// syncJointInferenceService syncs the joint-inference-service resources
func (dc *DownstreamController) syncJointInferenceService(eventType watch.EventType, joint *sednav1.JointInferenceService) error {
// Here only propagate to the nodes with non empty name
nodeName := joint.Spec.EdgeWorker.NodeName
// FIXME: only the case that Spec.NodeName specified is support
nodeName := joint.Spec.EdgeWorker.Template.Spec.NodeName
if len(nodeName) == 0 {
return fmt.Errorf("empty node name")
}
@@ -229,7 +230,7 @@ func (dc *DownstreamController) watch(stopCh <-chan struct{}) {
client := dc.client.RESTClient()

// make this option configurable
resyncPeriod := time.Second * 60
resyncPeriod := time.Second * 600
namespace := dc.cfg.Namespace

// TODO: use the informer


+ 27
- 64
pkg/globalmanager/jointinferenceservice.go View File

@@ -409,7 +409,9 @@ func (jc *JointInferenceServiceController) createPod(service *sednav1.JointInfer

// create kubernetesService for cloudPod, and get bigServicePort for edgePod
var bigServicePort int32
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.NodeName)
// FIXME(llhuii): only the case that Spec.NodeName specified is support,
// will support Spec.NodeSelector.
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.Template.Spec.NodeName)
bigServicePort, err = CreateKubernetesService(jc.kubeClient, service, bigModelPort, bigModelIP)
if err != nil {
return active, err
@@ -443,13 +445,7 @@ func (jc *JointInferenceServiceController) createCloudPod(service *sednav1.Joint
cloudModelString := string(cloudModelJSON)
cloudModelParent := filepath.Dir(cloudModelPath)

cloudWorker := service.Spec.CloudWorker
cloudCodePath := cloudWorker.WorkerSpec.ScriptDir
cloudParameterJSON, _ := json.Marshal(cloudWorker.WorkerSpec.Parameters)
cloudParameterString := string(cloudParameterJSON)

// Container VolumeMounts parameters
cloudCodeConPath := codePrefix
cloudModelConPath := dataPrefix + cloudModelParent

// Env parameters for cloud
@@ -457,14 +453,13 @@ func (jc *JointInferenceServiceController) createCloudPod(service *sednav1.Joint

// Configure container mounting and Env information by initial ContainerPara
var cloudContainer *ContainerPara = new(ContainerPara)
cloudContainer.volumeMountList = []string{cloudCodeConPath, cloudModelConPath}
cloudContainer.volumeList = []string{cloudCodePath, cloudModelParent}
cloudContainer.volumeMountList = []string{cloudModelConPath}
cloudContainer.volumeList = []string{cloudModelParent}
cloudContainer.volumeMapName = []string{"code", "model"}
cloudContainer.env = map[string]string{
"MODEL": cloudModelString,
"WORKER_NAME": "cloudworker-" + utilrand.String(5),
"SERVICE_NAME": service.Name,
"PARAMETERS": cloudParameterString,
"MODEL_URL": cloudModelURL,
"NAMESPACE": service.Namespace,
"BIG_MODEL_BIND_PORT": strconv.Itoa(int(bigModelPort)),
@@ -489,8 +484,10 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI
}
edgeModelPath := edgeModel.Spec.URL

// FIXME(llhuii): only the case that Spec.NodeName specified is support,
// will support Spec.NodeSelector.
// get bigModelIP from nodeName in cloudWorker
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.NodeName)
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.Template.Spec.NodeName)
if err != nil {
return fmt.Errorf("failed to get node ip: %w", err)
}
@@ -501,14 +498,10 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI
edgeModelParent := filepath.Dir(edgeModelPath)

edgeWorker := service.Spec.EdgeWorker
edgeCodePath := edgeWorker.WorkerSpec.ScriptDir
edgeParameterJSON, _ := json.Marshal(edgeWorker.WorkerSpec.Parameters)
edgeParameterString := string(edgeParameterJSON)
HEMParameterJSON, _ := json.Marshal(edgeWorker.HardExampleMining.Parameters)
HEMParameterString := string(HEMParameterJSON)

// Container VolumeMounts parameters
edgeCodeConPath := codePrefix
edgeModelConPath := dataPrefix + edgeModelParent

// Env parameters for edge
@@ -516,8 +509,8 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI

// Configure container mounting and Env information by initial ContainerPara
var edgeContainer *ContainerPara = new(ContainerPara)
edgeContainer.volumeMountList = []string{edgeCodeConPath, edgeModelConPath}
edgeContainer.volumeList = []string{edgeCodePath, edgeModelParent}
edgeContainer.volumeMountList = []string{edgeModelConPath}
edgeContainer.volumeList = []string{edgeModelParent}
edgeContainer.volumeMapName = []string{"code", "model"}
edgeContainer.env = map[string]string{
"MODEL": edgeModelString,
@@ -525,7 +518,6 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI
"SERVICE_NAME": service.Name,
"BIG_MODEL_IP": bigModelIP,
"BIG_MODEL_PORT": strconv.Itoa(int(bigServicePort)),
"PARAMETERS": edgeParameterString,
"HEM_PARAMETERS": HEMParameterString,
"MODEL_URL": edgeModelURL,
"NAMESPACE": service.Namespace,
@@ -543,60 +535,31 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI

func (jc *JointInferenceServiceController) generatedPod(service *sednav1.JointInferenceService, podtype jointInferenceType,
containerPara *ContainerPara, hostNetwork bool) error {
var workerSpec sednav1.CommonWorkerSpec
var volumeMounts []v1.VolumeMount
var volumes []v1.Volume
var envs []v1.EnvVar
var nodeName string

var podTemplate *v1.PodTemplateSpec
ctx := context.Background()
if podtype == jointInferenceForEdge {
workerSpec = service.Spec.EdgeWorker.WorkerSpec
nodeName = service.Spec.EdgeWorker.NodeName
podTemplate = &service.Spec.EdgeWorker.Template
} else {
workerSpec = service.Spec.CloudWorker.WorkerSpec
nodeName = service.Spec.CloudWorker.NodeName
}
// get baseImgURL from imageHub based on user's configuration in job CRD
frameName := workerSpec.FrameworkType
frameVersion := workerSpec.FrameworkVersion
baseImgURL, err := MatchContainerBaseImage(jc.cfg.ImageHub, frameName, frameVersion)
// TODO: if matched image is empty, the pod creation process will not proceed, return error directly.
if err != nil {
klog.Warningf("jointinference service %v/%v %v worker matching container base image occurs error:%v", service.Namespace, service.Name, podtype, err)
return fmt.Errorf("%s pod occurs error: %w",
podtype, err)
}
volumeMounts, volumes = CreateVolumeMap(containerPara)
envs = CreateEnvVars(containerPara.env)
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: service.Namespace,
GenerateName: service.Name + "-" + strings.ToLower(string(podtype)) + "-",
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(service, jointServiceControllerKind),
},
Labels: GenerateLabels(service),
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
NodeName: nodeName,
Containers: []v1.Container{
{Name: "container-" + service.Name + "-" + strings.ToLower(string(podtype)) + "-" + utilrand.String(5),
Image: baseImgURL,
Args: []string{workerSpec.ScriptBootFile},
Env: envs,
VolumeMounts: volumeMounts,
}},
Volumes: volumes,
HostNetwork: hostNetwork,
},
podTemplate = &service.Spec.CloudWorker.Template
}

pod, _ := k8scontroller.GetPodFromTemplate(podTemplate, service, metav1.NewControllerRef(service, jointServiceControllerKind))
InjectContainerPara(pod, containerPara, service)

pod.Namespace = service.Namespace
pod.GenerateName = service.Name + "-" + strings.ToLower(string(podtype)) + "-"

if hostNetwork {
// force to set hostnetwork
pod.Spec.HostNetwork = true
}
pod, err := jc.kubeClient.CoreV1().Pods(service.Namespace).Create(ctx, podSpec, metav1.CreateOptions{})
createdPod, err := jc.kubeClient.CoreV1().Pods(service.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
klog.Warningf("failed to create %s pod %s for jointinference service %v/%v, err:%s", string(podtype), pod.Name, service.Namespace, service.Name, err)
return err
}
klog.V(2).Infof("%s pod %s is created successfully for jointinference service %v/%v", string(podtype), pod.Name, service.Namespace, service.Name)
klog.V(2).Infof("%s pod %s is created successfully for jointinference service %v/%v", string(podtype), createdPod.Name, service.Namespace, service.Name)
return nil
}



Loading…
Cancel
Save