When a jointinferenceservice is created, a k8s service is created for big worker, so the big-worker's lables need to be unique. This can be done by adding a new label key 'worker-type'. Same as federatedlearningjob. Signed-off-by: llhuii <liulinghui@huawei.com>tags/v0.3.0
| @@ -25,11 +25,8 @@ import ( | |||||
| v1 "k8s.io/api/core/v1" | v1 "k8s.io/api/core/v1" | ||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| "k8s.io/apimachinery/pkg/labels" | |||||
| "k8s.io/apimachinery/pkg/util/intstr" | |||||
| "k8s.io/client-go/kubernetes" | "k8s.io/client-go/kubernetes" | ||||
| "k8s.io/client-go/util/workqueue" | "k8s.io/client-go/util/workqueue" | ||||
| "k8s.io/klog/v2" | |||||
| ) | ) | ||||
| const ( | const ( | ||||
| @@ -65,48 +62,6 @@ func GetNodeIPByName(kubeClient kubernetes.Interface, name string) (string, erro | |||||
| return "", fmt.Errorf("can't found node ip for node %s", name) | return "", fmt.Errorf("can't found node ip for node %s", name) | ||||
| } | } | ||||
| // CreateKubernetesService creates a k8s service for an object given ip and port | |||||
| func CreateKubernetesService(kubeClient kubernetes.Interface, object CommonInterface, inputPort int32, inputIP string) (int32, error) { | |||||
| ctx := context.Background() | |||||
| name := object.GetName() | |||||
| namespace := object.GetNamespace() | |||||
| kind := object.GroupVersionKind().Kind | |||||
| targePort := intstr.IntOrString{ | |||||
| IntVal: inputPort, | |||||
| } | |||||
| serviceSpec := &v1.Service{ | |||||
| ObjectMeta: metav1.ObjectMeta{ | |||||
| Namespace: object.GetNamespace(), | |||||
| GenerateName: name + "-" + "service" + "-", | |||||
| OwnerReferences: []metav1.OwnerReference{ | |||||
| *metav1.NewControllerRef(object, object.GroupVersionKind()), | |||||
| }, | |||||
| Labels: GenerateLabels(object), | |||||
| }, | |||||
| Spec: v1.ServiceSpec{ | |||||
| Selector: GenerateLabels(object), | |||||
| ExternalIPs: []string{ | |||||
| inputIP, | |||||
| }, | |||||
| Type: v1.ServiceTypeNodePort, | |||||
| Ports: []v1.ServicePort{ | |||||
| { | |||||
| Port: inputPort, | |||||
| TargetPort: targePort, | |||||
| }, | |||||
| }, | |||||
| }, | |||||
| } | |||||
| service, err := kubeClient.CoreV1().Services(namespace).Create(ctx, serviceSpec, metav1.CreateOptions{}) | |||||
| if err != nil { | |||||
| klog.Warningf("failed to create service for %v %v/%v, err:%s", kind, namespace, name, err) | |||||
| return 0, err | |||||
| } | |||||
| klog.V(2).Infof("Service %s is created successfully for %v %v/%v", service.Name, kind, namespace, name) | |||||
| return service.Spec.Ports[0].NodePort, nil | |||||
| } | |||||
| // getBackoff calc the next wait time for the key | // getBackoff calc the next wait time for the key | ||||
| func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration { | func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration { | ||||
| exp := queue.NumRequeues(key) | exp := queue.NumRequeues(key) | ||||
| @@ -140,27 +95,6 @@ func calcActivePodCount(pods []*v1.Pod) int32 { | |||||
| return result | return result | ||||
| } | } | ||||
| // GenerateLabels generates labels for an object | |||||
| func GenerateLabels(object CommonInterface) map[string]string { | |||||
| kind := object.GroupVersionKind().Kind | |||||
| group := object.GroupVersionKind().Group | |||||
| keyPrefix := strings.ToLower(kind + "." + group + "/") | |||||
| labels := make(map[string]string) | |||||
| labels[keyPrefix+"name"] = object.GetName() | |||||
| labels[keyPrefix+"uid"] = string(object.GetUID()) | |||||
| return labels | |||||
| } | |||||
| // GenerateSelector generates the selector for an object | |||||
| func GenerateSelector(object CommonInterface) (labels.Selector, error) { | |||||
| ls := &metav1.LabelSelector{ | |||||
| MatchLabels: GenerateLabels(object), | |||||
| } | |||||
| return metav1.LabelSelectorAsSelector(ls) | |||||
| } | |||||
| // ConvertK8SValidName converts to the k8s valid name | // ConvertK8SValidName converts to the k8s valid name | ||||
| func ConvertK8SValidName(name string) string { | func ConvertK8SValidName(name string) string { | ||||
| // the name(e.g. pod/volume name) should be a lowercase RFC 1123 label: | // the name(e.g. pod/volume name) should be a lowercase RFC 1123 label: | ||||
| @@ -460,7 +460,7 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act | |||||
| // will support Spec.NodeSelector. | // will support Spec.NodeSelector. | ||||
| appIP, err = GetNodeIPByName(fc.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName) | appIP, err = GetNodeIPByName(fc.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName) | ||||
| aggServicePort, err = CreateKubernetesService(fc.kubeClient, job, aggPort, appIP) | |||||
| aggServicePort, err = CreateKubernetesService(fc.kubeClient, job, FLJobStageAgg, aggPort, appIP) | |||||
| if err != nil { | if err != nil { | ||||
| return active, err | return active, err | ||||
| } | } | ||||
| @@ -316,7 +316,7 @@ func (jc *JointInferenceServiceController) sync(key string) (bool, error) { | |||||
| jc.recorder.Event(&jointinferenceservice, v1.EventTypeWarning, reason, message) | jc.recorder.Event(&jointinferenceservice, v1.EventTypeWarning, reason, message) | ||||
| } else { | } else { | ||||
| if len(pods) == 0 { | if len(pods) == 0 { | ||||
| active, manageServiceErr = jc.createPod(&jointinferenceservice) | |||||
| active, manageServiceErr = jc.createWorkers(&jointinferenceservice) | |||||
| } | } | ||||
| if manageServiceErr != nil { | if manageServiceErr != nil { | ||||
| serviceFailed = true | serviceFailed = true | ||||
| @@ -393,7 +393,7 @@ func isJointinferenceserviceFinished(j *sednav1.JointInferenceService) bool { | |||||
| return false | return false | ||||
| } | } | ||||
| func (jc *JointInferenceServiceController) createPod(service *sednav1.JointInferenceService) (active int32, err error) { | |||||
| func (jc *JointInferenceServiceController) createWorkers(service *sednav1.JointInferenceService) (active int32, err error) { | |||||
| active = 0 | active = 0 | ||||
| // create cloud worker | // create cloud worker | ||||
| @@ -403,12 +403,11 @@ func (jc *JointInferenceServiceController) createPod(service *sednav1.JointInfer | |||||
| } | } | ||||
| active++ | active++ | ||||
| // create kubernetesService for cloudPod, and get bigServicePort for edgePod | |||||
| var bigServicePort int32 | |||||
| // create k8s service for cloudPod | |||||
| // FIXME(llhuii): only the case that Spec.NodeName specified is support, | // FIXME(llhuii): only the case that Spec.NodeName specified is support, | ||||
| // will support Spec.NodeSelector. | // will support Spec.NodeSelector. | ||||
| bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.Template.Spec.NodeName) | bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.Template.Spec.NodeName) | ||||
| bigServicePort, err = CreateKubernetesService(jc.kubeClient, service, bigModelPort, bigModelIP) | |||||
| bigServicePort, err := CreateKubernetesService(jc.kubeClient, service, jointInferenceForCloud, bigModelPort, bigModelIP) | |||||
| if err != nil { | if err != nil { | ||||
| return active, err | return active, err | ||||
| } | } | ||||
| @@ -6,6 +6,8 @@ import ( | |||||
| v1 "k8s.io/api/core/v1" | v1 "k8s.io/api/core/v1" | ||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| "k8s.io/apimachinery/pkg/labels" | |||||
| "k8s.io/apimachinery/pkg/util/intstr" | |||||
| "k8s.io/client-go/kubernetes" | "k8s.io/client-go/kubernetes" | ||||
| "k8s.io/klog/v2" | "k8s.io/klog/v2" | ||||
| k8scontroller "k8s.io/kubernetes/pkg/controller" | k8scontroller "k8s.io/kubernetes/pkg/controller" | ||||
| @@ -36,6 +38,73 @@ type WorkerParam struct { | |||||
| restartPolicy v1.RestartPolicy | restartPolicy v1.RestartPolicy | ||||
| } | } | ||||
| // generateLabels generates labels for an object | |||||
| func generateLabels(object CommonInterface, workerType string) map[string]string { | |||||
| kind := object.GroupVersionKind().Kind | |||||
| group := object.GroupVersionKind().Group | |||||
| keyPrefix := strings.ToLower(kind + "." + group + "/") | |||||
| labels := make(map[string]string) | |||||
| labels[keyPrefix+"name"] = object.GetName() | |||||
| labels[keyPrefix+"uid"] = string(object.GetUID()) | |||||
| if workerType != "" { | |||||
| labels[keyPrefix+"worker-type"] = strings.ToLower(workerType) | |||||
| } | |||||
| return labels | |||||
| } | |||||
| // GenerateSelector generates the selector of an object for worker | |||||
| func GenerateSelector(object CommonInterface) (labels.Selector, error) { | |||||
| ls := &metav1.LabelSelector{ | |||||
| // select any type workers | |||||
| MatchLabels: generateLabels(object, ""), | |||||
| } | |||||
| return metav1.LabelSelectorAsSelector(ls) | |||||
| } | |||||
| // CreateKubernetesService creates a k8s service for an object given ip and port | |||||
| func CreateKubernetesService(kubeClient kubernetes.Interface, object CommonInterface, workerType string, inputPort int32, inputIP string) (int32, error) { | |||||
| ctx := context.Background() | |||||
| name := object.GetName() | |||||
| namespace := object.GetNamespace() | |||||
| kind := object.GroupVersionKind().Kind | |||||
| targePort := intstr.IntOrString{ | |||||
| IntVal: inputPort, | |||||
| } | |||||
| serviceSpec := &v1.Service{ | |||||
| ObjectMeta: metav1.ObjectMeta{ | |||||
| Namespace: object.GetNamespace(), | |||||
| GenerateName: name + "-" + "service" + "-", | |||||
| OwnerReferences: []metav1.OwnerReference{ | |||||
| *metav1.NewControllerRef(object, object.GroupVersionKind()), | |||||
| }, | |||||
| Labels: generateLabels(object, workerType), | |||||
| }, | |||||
| Spec: v1.ServiceSpec{ | |||||
| Selector: generateLabels(object, workerType), | |||||
| ExternalIPs: []string{ | |||||
| inputIP, | |||||
| }, | |||||
| Type: v1.ServiceTypeNodePort, | |||||
| Ports: []v1.ServicePort{ | |||||
| { | |||||
| Port: inputPort, | |||||
| TargetPort: targePort, | |||||
| }, | |||||
| }, | |||||
| }, | |||||
| } | |||||
| service, err := kubeClient.CoreV1().Services(namespace).Create(ctx, serviceSpec, metav1.CreateOptions{}) | |||||
| if err != nil { | |||||
| klog.Warningf("failed to create service for %v %v/%v, err:%s", kind, namespace, name, err) | |||||
| return 0, err | |||||
| } | |||||
| klog.V(2).Infof("Service %s is created successfully for %v %v/%v", service.Name, kind, namespace, name) | |||||
| return service.Spec.Ports[0].NodePort, nil | |||||
| } | |||||
| // injectWorkerParam modifies pod in-place | // injectWorkerParam modifies pod in-place | ||||
| func injectWorkerParam(pod *v1.Pod, workerParam *WorkerParam, object CommonInterface) { | func injectWorkerParam(pod *v1.Pod, workerParam *WorkerParam, object CommonInterface) { | ||||
| InjectStorageInitializer(pod, workerParam) | InjectStorageInitializer(pod, workerParam) | ||||
| @@ -52,7 +121,7 @@ func injectWorkerParam(pod *v1.Pod, workerParam *WorkerParam, object CommonInter | |||||
| pod.Labels = make(map[string]string) | pod.Labels = make(map[string]string) | ||||
| } | } | ||||
| for k, v := range GenerateLabels(object) { | |||||
| for k, v := range generateLabels(object, workerParam.workerType) { | |||||
| pod.Labels[k] = v | pod.Labels[k] = v | ||||
| } | } | ||||