Browse Source

Merge pull request #69 from JimmyYang20/dev

GM&LC: add lifelonglearningjob
tags/v0.3.0
KubeEdge Bot GitHub 4 years ago
parent
commit
8d29c95885
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 22843 additions and 40 deletions
  1. +19890
    -0
      build/crds/sedna.io_lifelonglearningjobs.yaml
  2. +2
    -5
      build/gm/rbac/gm.yaml
  3. +3
    -1
      cmd/sedna-lc/app/server.go
  4. +3
    -0
      hack/local-up.sh
  5. +158
    -0
      pkg/apis/sedna/v1alpha1/lifelonglearningjob_types.go
  6. +2
    -0
      pkg/apis/sedna/v1alpha1/register.go
  7. +252
    -0
      pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go
  8. +142
    -0
      pkg/client/clientset/versioned/typed/sedna/v1alpha1/fake/fake_lifelonglearningjob.go
  9. +4
    -0
      pkg/client/clientset/versioned/typed/sedna/v1alpha1/fake/fake_sedna_client.go
  10. +2
    -0
      pkg/client/clientset/versioned/typed/sedna/v1alpha1/generated_expansion.go
  11. +195
    -0
      pkg/client/clientset/versioned/typed/sedna/v1alpha1/lifelonglearningjob.go
  12. +5
    -0
      pkg/client/clientset/versioned/typed/sedna/v1alpha1/sedna_client.go
  13. +2
    -0
      pkg/client/informers/externalversions/generic.go
  14. +7
    -0
      pkg/client/informers/externalversions/sedna/v1alpha1/interface.go
  15. +90
    -0
      pkg/client/informers/externalversions/sedna/v1alpha1/lifelonglearningjob.go
  16. +8
    -0
      pkg/client/listers/sedna/v1alpha1/expansion_generated.go
  17. +99
    -0
      pkg/client/listers/sedna/v1alpha1/lifelonglearningjob.go
  18. +13
    -0
      pkg/globalmanager/config/config.go
  19. +1
    -0
      pkg/globalmanager/controller.go
  20. +26
    -1
      pkg/globalmanager/downstream.go
  21. +1
    -1
      pkg/globalmanager/federatedlearningjob.go
  22. +5
    -5
      pkg/globalmanager/incrementallearningjob.go
  23. +770
    -0
      pkg/globalmanager/lifelonglearningjob.go
  24. +63
    -0
      pkg/globalmanager/types.go
  25. +78
    -0
      pkg/globalmanager/upstream.go
  26. +28
    -13
      pkg/localcontroller/manager/dataset.go
  27. +891
    -0
      pkg/localcontroller/manager/lifelonglearningjob.go
  28. +5
    -0
      pkg/localcontroller/manager/types.go
  29. +0
    -1
      pkg/localcontroller/server/server.go
  30. +1
    -1
      pkg/localcontroller/util/util.go
  31. +97
    -12
      scripts/installation/install.sh

+ 19890
- 0
build/crds/sedna.io_lifelonglearningjobs.yaml
File diff suppressed because it is too large
View File


+ 2
- 5
build/gm/rbac/gm.yaml View File

@@ -1,8 +1,3 @@
apiVersion: v1
kind: Namespace
metadata:
name: sedna
---
# cluster role
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
@@ -18,6 +13,7 @@ rules:
- jointinferenceservices
- federatedlearningjobs
- incrementallearningjobs
- lifelonglearningjobs
verbs:
- get
- list
@@ -32,6 +28,7 @@ rules:
- jointinferenceservices/status
- federatedlearningjobs/status
- incrementallearningjobs/status
- lifelonglearningjobs/status
verbs:
- get
- update


+ 3
- 1
cmd/sedna-lc/app/server.go View File

@@ -96,10 +96,12 @@ func runServer() {

im := manager.NewIncrementalJobManager(c, dm, mm, Options)

lm := manager.NewLifelongLearningJobManager(c, dm, mm, Options)

s := server.New(Options)

for _, m := range []manager.FeatureManager{
dm, mm, jm, fm, im,
dm, mm, jm, fm, im, lm,
} {
s.AddFeatureManager(m)
c.Subscribe(m)


+ 3
- 0
hack/local-up.sh View File

@@ -195,9 +195,12 @@ prepare_k8s_env() {
kind get kubeconfig --name $CLUSTER_NAME > $TMP_DIR/kubeconfig
export KUBECONFIG=$(realpath $TMP_DIR/kubeconfig)
# prepare our k8s environment

# create these crds including dataset, model, joint-inference etc.
kubectl create -f build/crds/

kubectl create namespace $NAMESPACE

# create the cluster role for gm
kubectl create -f build/gm/rbac/



+ 158
- 0
pkg/apis/sedna/v1alpha1/lifelonglearningjob_types.go View File

@@ -0,0 +1,158 @@
/*
Copyright 2021 The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:resource:shortName=ll
// +kubebuilder:subresource:status

type LifelongLearningJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
Spec LLJobSpec `json:"spec"`
Status LLJobStatus `json:"status,omitempty"`
}

type LLJobSpec struct {
Dataset LLDataset `json:"dataset"`
TrainSpec LLTrainSpec `json:"trainSpec"`
EvalSpec LLEvalSpec `json:"evalSpec"`
DeploySpec LLDeploySpec `json:"deploySpec"`

// the credential referer for OutputDir
CredentialName string `json:"credentialName,omitempty"`
OutputDir string `json:"outputDir"`
}

type LLDataset struct {
Name string `json:"name"`
TrainProb float64 `json:"trainProb"`
}

// LLTrainSpec describes the data an train worker should have
type LLTrainSpec struct {
Template v1.PodTemplateSpec `json:"template"`
Trigger LLTrigger `json:"trigger"`
}

type LLTrigger struct {
CheckPeriodSeconds int `json:"checkPeriodSeconds,omitempty"`
Timer *LLTimer `json:"timer,omitempty"`
Condition LLCondition `json:"condition"`
}

type LLTimer struct {
Start string `json:"start"`
End string `json:"end"`
}

type LLCondition struct {
Operator string `json:"operator"`
Threshold float64 `json:"threshold"`
Metric string `json:"metric"`
}

// LLEvalSpec describes the data an eval worker should have
type LLEvalSpec struct {
Template v1.PodTemplateSpec `json:"template"`
}

// LLDeploySpec describes the deploy model to be updated
type LLDeploySpec struct {
Template v1.PodTemplateSpec `json:"template"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// LifelongLearningJobList is a list of LifelongLearningJobs.
type LifelongLearningJobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []LifelongLearningJob `json:"items"`
}

// LLJobStatus represents the current state of a lifelonglearning job
type LLJobStatus struct {
// The latest available observations of a lifelonglearning job's current state.
// +optional
Conditions []LLJobCondition `json:"conditions,omitempty"`

// Represents time when the job was acknowledged by the job controller.
// It is not guaranteed to be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// +optional
StartTime *metav1.Time `json:"startTime,omitempty"`

// Represents time when the job was completed. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// +optional
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
}

type LLJobStageConditionType string

// These are valid stage conditions of a job.
const (
LLJobStageCondWaiting LLJobStageConditionType = "Waiting"
LLJobStageCondReady LLJobStageConditionType = "Ready"
LLJobStageCondStarting LLJobStageConditionType = "Starting"
LLJobStageCondRunning LLJobStageConditionType = "Running"
LLJobStageCondCompleted LLJobStageConditionType = "Completed"
LLJobStageCondFailed LLJobStageConditionType = "Failed"
)

// LLJobCondition describes current state of a job.
// see https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties for details.
type LLJobCondition struct {
// Type of job condition, Complete or Failed.
Type LLJobStageConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
Status v1.ConditionStatus `json:"status"`
// Stage of the condition
Stage LLJobStage `json:"stage"`
// last time we got an update on a given condition
// +optional
LastHeartbeatTime metav1.Time `json:"lastHeartbeatTime,omitempty"`
// Last time the condition transit from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// (brief) reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty"`
// Human readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty"`
// The json data related to this condition
// +optional
Data string `json:"data,omitempty"`
}

// LLJobStage is a label for the stage of a job at the current time.
type LLJobStage string

const (
LLJobTrain LLJobStage = "Train"
LLJobEval LLJobStage = "Eval"
LLJobDeploy LLJobStage = "Deploy"
)

+ 2
- 0
pkg/apis/sedna/v1alpha1/register.go View File

@@ -57,6 +57,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&FederatedLearningJobList{},
&IncrementalLearningJob{},
&IncrementalLearningJobList{},
&LifelongLearningJob{},
&LifelongLearningJobList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil


+ 252
- 0
pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go View File

@@ -708,6 +708,258 @@ func (in *JointInferenceServiceStatus) DeepCopy() *JointInferenceServiceStatus {
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLCondition) DeepCopyInto(out *LLCondition) {
*out = *in
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLCondition.
func (in *LLCondition) DeepCopy() *LLCondition {
if in == nil {
return nil
}
out := new(LLCondition)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLDataset) DeepCopyInto(out *LLDataset) {
*out = *in
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLDataset.
func (in *LLDataset) DeepCopy() *LLDataset {
if in == nil {
return nil
}
out := new(LLDataset)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLDeploySpec) DeepCopyInto(out *LLDeploySpec) {
*out = *in
in.Template.DeepCopyInto(&out.Template)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLDeploySpec.
func (in *LLDeploySpec) DeepCopy() *LLDeploySpec {
if in == nil {
return nil
}
out := new(LLDeploySpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLEvalSpec) DeepCopyInto(out *LLEvalSpec) {
*out = *in
in.Template.DeepCopyInto(&out.Template)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLEvalSpec.
func (in *LLEvalSpec) DeepCopy() *LLEvalSpec {
if in == nil {
return nil
}
out := new(LLEvalSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLJobCondition) DeepCopyInto(out *LLJobCondition) {
*out = *in
in.LastHeartbeatTime.DeepCopyInto(&out.LastHeartbeatTime)
in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLJobCondition.
func (in *LLJobCondition) DeepCopy() *LLJobCondition {
if in == nil {
return nil
}
out := new(LLJobCondition)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLJobSpec) DeepCopyInto(out *LLJobSpec) {
*out = *in
out.Dataset = in.Dataset
in.TrainSpec.DeepCopyInto(&out.TrainSpec)
in.EvalSpec.DeepCopyInto(&out.EvalSpec)
in.DeploySpec.DeepCopyInto(&out.DeploySpec)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLJobSpec.
func (in *LLJobSpec) DeepCopy() *LLJobSpec {
if in == nil {
return nil
}
out := new(LLJobSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLJobStatus) DeepCopyInto(out *LLJobStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]LLJobCondition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.StartTime != nil {
in, out := &in.StartTime, &out.StartTime
*out = (*in).DeepCopy()
}
if in.CompletionTime != nil {
in, out := &in.CompletionTime, &out.CompletionTime
*out = (*in).DeepCopy()
}
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLJobStatus.
func (in *LLJobStatus) DeepCopy() *LLJobStatus {
if in == nil {
return nil
}
out := new(LLJobStatus)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLTimer) DeepCopyInto(out *LLTimer) {
*out = *in
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLTimer.
func (in *LLTimer) DeepCopy() *LLTimer {
if in == nil {
return nil
}
out := new(LLTimer)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLTrainSpec) DeepCopyInto(out *LLTrainSpec) {
*out = *in
in.Template.DeepCopyInto(&out.Template)
in.Trigger.DeepCopyInto(&out.Trigger)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLTrainSpec.
func (in *LLTrainSpec) DeepCopy() *LLTrainSpec {
if in == nil {
return nil
}
out := new(LLTrainSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LLTrigger) DeepCopyInto(out *LLTrigger) {
*out = *in
if in.Timer != nil {
in, out := &in.Timer, &out.Timer
*out = new(LLTimer)
**out = **in
}
out.Condition = in.Condition
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLTrigger.
func (in *LLTrigger) DeepCopy() *LLTrigger {
if in == nil {
return nil
}
out := new(LLTrigger)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LifelongLearningJob) DeepCopyInto(out *LifelongLearningJob) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LifelongLearningJob.
func (in *LifelongLearningJob) DeepCopy() *LifelongLearningJob {
if in == nil {
return nil
}
out := new(LifelongLearningJob)
in.DeepCopyInto(out)
return out
}

// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *LifelongLearningJob) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LifelongLearningJobList) DeepCopyInto(out *LifelongLearningJobList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]LifelongLearningJob, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LifelongLearningJobList.
func (in *LifelongLearningJobList) DeepCopy() *LifelongLearningJobList {
if in == nil {
return nil
}
out := new(LifelongLearningJobList)
in.DeepCopyInto(out)
return out
}

// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *LifelongLearningJobList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Metric) DeepCopyInto(out *Metric) {
*out = *in


+ 142
- 0
pkg/client/clientset/versioned/typed/sedna/v1alpha1/fake/fake_lifelonglearningjob.go View File

@@ -0,0 +1,142 @@
/*
Copyright The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Code generated by client-gen. DO NOT EDIT.

package fake

import (
"context"

v1alpha1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
schema "k8s.io/apimachinery/pkg/runtime/schema"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
)

// FakeLifelongLearningJobs implements LifelongLearningJobInterface
type FakeLifelongLearningJobs struct {
Fake *FakeSednaV1alpha1
ns string
}

var lifelonglearningjobsResource = schema.GroupVersionResource{Group: "sedna.io", Version: "v1alpha1", Resource: "lifelonglearningjobs"}

var lifelonglearningjobsKind = schema.GroupVersionKind{Group: "sedna.io", Version: "v1alpha1", Kind: "LifelongLearningJob"}

// Get takes name of the lifelongLearningJob, and returns the corresponding lifelongLearningJob object, and an error if there is any.
func (c *FakeLifelongLearningJobs) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.LifelongLearningJob, err error) {
obj, err := c.Fake.
Invokes(testing.NewGetAction(lifelonglearningjobsResource, c.ns, name), &v1alpha1.LifelongLearningJob{})

if obj == nil {
return nil, err
}
return obj.(*v1alpha1.LifelongLearningJob), err
}

// List takes label and field selectors, and returns the list of LifelongLearningJobs that match those selectors.
func (c *FakeLifelongLearningJobs) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.LifelongLearningJobList, err error) {
obj, err := c.Fake.
Invokes(testing.NewListAction(lifelonglearningjobsResource, lifelonglearningjobsKind, c.ns, opts), &v1alpha1.LifelongLearningJobList{})

if obj == nil {
return nil, err
}

label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &v1alpha1.LifelongLearningJobList{ListMeta: obj.(*v1alpha1.LifelongLearningJobList).ListMeta}
for _, item := range obj.(*v1alpha1.LifelongLearningJobList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}

// Watch returns a watch.Interface that watches the requested lifelongLearningJobs.
func (c *FakeLifelongLearningJobs) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewWatchAction(lifelonglearningjobsResource, c.ns, opts))

}

// Create takes the representation of a lifelongLearningJob and creates it. Returns the server's representation of the lifelongLearningJob, and an error, if there is any.
func (c *FakeLifelongLearningJobs) Create(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.CreateOptions) (result *v1alpha1.LifelongLearningJob, err error) {
obj, err := c.Fake.
Invokes(testing.NewCreateAction(lifelonglearningjobsResource, c.ns, lifelongLearningJob), &v1alpha1.LifelongLearningJob{})

if obj == nil {
return nil, err
}
return obj.(*v1alpha1.LifelongLearningJob), err
}

// Update takes the representation of a lifelongLearningJob and updates it. Returns the server's representation of the lifelongLearningJob, and an error, if there is any.
func (c *FakeLifelongLearningJobs) Update(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.UpdateOptions) (result *v1alpha1.LifelongLearningJob, err error) {
obj, err := c.Fake.
Invokes(testing.NewUpdateAction(lifelonglearningjobsResource, c.ns, lifelongLearningJob), &v1alpha1.LifelongLearningJob{})

if obj == nil {
return nil, err
}
return obj.(*v1alpha1.LifelongLearningJob), err
}

// UpdateStatus was generated because the type contains a Status member.
// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
func (c *FakeLifelongLearningJobs) UpdateStatus(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.UpdateOptions) (*v1alpha1.LifelongLearningJob, error) {
obj, err := c.Fake.
Invokes(testing.NewUpdateSubresourceAction(lifelonglearningjobsResource, "status", c.ns, lifelongLearningJob), &v1alpha1.LifelongLearningJob{})

if obj == nil {
return nil, err
}
return obj.(*v1alpha1.LifelongLearningJob), err
}

// Delete takes name of the lifelongLearningJob and deletes it. Returns an error if one occurs.
func (c *FakeLifelongLearningJobs) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error {
_, err := c.Fake.
Invokes(testing.NewDeleteAction(lifelonglearningjobsResource, c.ns, name), &v1alpha1.LifelongLearningJob{})

return err
}

// DeleteCollection deletes a collection of objects.
func (c *FakeLifelongLearningJobs) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
action := testing.NewDeleteCollectionAction(lifelonglearningjobsResource, c.ns, listOpts)

_, err := c.Fake.Invokes(action, &v1alpha1.LifelongLearningJobList{})
return err
}

// Patch applies the patch and returns the patched lifelongLearningJob.
func (c *FakeLifelongLearningJobs) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.LifelongLearningJob, err error) {
obj, err := c.Fake.
Invokes(testing.NewPatchSubresourceAction(lifelonglearningjobsResource, c.ns, name, pt, data, subresources...), &v1alpha1.LifelongLearningJob{})

if obj == nil {
return nil, err
}
return obj.(*v1alpha1.LifelongLearningJob), err
}

+ 4
- 0
pkg/client/clientset/versioned/typed/sedna/v1alpha1/fake/fake_sedna_client.go View File

@@ -44,6 +44,10 @@ func (c *FakeSednaV1alpha1) JointInferenceServices(namespace string) v1alpha1.Jo
return &FakeJointInferenceServices{c, namespace}
}

func (c *FakeSednaV1alpha1) LifelongLearningJobs(namespace string) v1alpha1.LifelongLearningJobInterface {
return &FakeLifelongLearningJobs{c, namespace}
}

func (c *FakeSednaV1alpha1) Models(namespace string) v1alpha1.ModelInterface {
return &FakeModels{c, namespace}
}


+ 2
- 0
pkg/client/clientset/versioned/typed/sedna/v1alpha1/generated_expansion.go View File

@@ -26,4 +26,6 @@ type IncrementalLearningJobExpansion interface{}

type JointInferenceServiceExpansion interface{}

type LifelongLearningJobExpansion interface{}

type ModelExpansion interface{}

+ 195
- 0
pkg/client/clientset/versioned/typed/sedna/v1alpha1/lifelonglearningjob.go View File

@@ -0,0 +1,195 @@
/*
Copyright The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Code generated by client-gen. DO NOT EDIT.

package v1alpha1

import (
"context"
"time"

v1alpha1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
scheme "github.com/kubeedge/sedna/pkg/client/clientset/versioned/scheme"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
rest "k8s.io/client-go/rest"
)

// LifelongLearningJobsGetter has a method to return a LifelongLearningJobInterface.
// A group's client should implement this interface.
type LifelongLearningJobsGetter interface {
LifelongLearningJobs(namespace string) LifelongLearningJobInterface
}

// LifelongLearningJobInterface has methods to work with LifelongLearningJob resources.
type LifelongLearningJobInterface interface {
Create(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.CreateOptions) (*v1alpha1.LifelongLearningJob, error)
Update(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.UpdateOptions) (*v1alpha1.LifelongLearningJob, error)
UpdateStatus(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.UpdateOptions) (*v1alpha1.LifelongLearningJob, error)
Delete(ctx context.Context, name string, opts v1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error
Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.LifelongLearningJob, error)
List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.LifelongLearningJobList, error)
Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.LifelongLearningJob, err error)
LifelongLearningJobExpansion
}

// lifelongLearningJobs implements LifelongLearningJobInterface
type lifelongLearningJobs struct {
client rest.Interface
ns string
}

// newLifelongLearningJobs returns a LifelongLearningJobs
func newLifelongLearningJobs(c *SednaV1alpha1Client, namespace string) *lifelongLearningJobs {
return &lifelongLearningJobs{
client: c.RESTClient(),
ns: namespace,
}
}

// Get takes name of the lifelongLearningJob, and returns the corresponding lifelongLearningJob object, and an error if there is any.
func (c *lifelongLearningJobs) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.LifelongLearningJob, err error) {
result = &v1alpha1.LifelongLearningJob{}
err = c.client.Get().
Namespace(c.ns).
Resource("lifelonglearningjobs").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do(ctx).
Into(result)
return
}

// List takes label and field selectors, and returns the list of LifelongLearningJobs that match those selectors.
func (c *lifelongLearningJobs) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.LifelongLearningJobList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1alpha1.LifelongLearningJobList{}
err = c.client.Get().
Namespace(c.ns).
Resource("lifelonglearningjobs").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}

// Watch returns a watch.Interface that watches the requested lifelongLearningJobs.
func (c *lifelongLearningJobs) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("lifelonglearningjobs").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}

// Create takes the representation of a lifelongLearningJob and creates it. Returns the server's representation of the lifelongLearningJob, and an error, if there is any.
func (c *lifelongLearningJobs) Create(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.CreateOptions) (result *v1alpha1.LifelongLearningJob, err error) {
result = &v1alpha1.LifelongLearningJob{}
err = c.client.Post().
Namespace(c.ns).
Resource("lifelonglearningjobs").
VersionedParams(&opts, scheme.ParameterCodec).
Body(lifelongLearningJob).
Do(ctx).
Into(result)
return
}

// Update takes the representation of a lifelongLearningJob and updates it. Returns the server's representation of the lifelongLearningJob, and an error, if there is any.
func (c *lifelongLearningJobs) Update(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.UpdateOptions) (result *v1alpha1.LifelongLearningJob, err error) {
result = &v1alpha1.LifelongLearningJob{}
err = c.client.Put().
Namespace(c.ns).
Resource("lifelonglearningjobs").
Name(lifelongLearningJob.Name).
VersionedParams(&opts, scheme.ParameterCodec).
Body(lifelongLearningJob).
Do(ctx).
Into(result)
return
}

// UpdateStatus was generated because the type contains a Status member.
// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
func (c *lifelongLearningJobs) UpdateStatus(ctx context.Context, lifelongLearningJob *v1alpha1.LifelongLearningJob, opts v1.UpdateOptions) (result *v1alpha1.LifelongLearningJob, err error) {
result = &v1alpha1.LifelongLearningJob{}
err = c.client.Put().
Namespace(c.ns).
Resource("lifelonglearningjobs").
Name(lifelongLearningJob.Name).
SubResource("status").
VersionedParams(&opts, scheme.ParameterCodec).
Body(lifelongLearningJob).
Do(ctx).
Into(result)
return
}

// Delete takes name of the lifelongLearningJob and deletes it. Returns an error if one occurs.
func (c *lifelongLearningJobs) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error {
return c.client.Delete().
Namespace(c.ns).
Resource("lifelonglearningjobs").
Name(name).
Body(&opts).
Do(ctx).
Error()
}

// DeleteCollection deletes a collection of objects.
func (c *lifelongLearningJobs) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
var timeout time.Duration
if listOpts.TimeoutSeconds != nil {
timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second
}
return c.client.Delete().
Namespace(c.ns).
Resource("lifelonglearningjobs").
VersionedParams(&listOpts, scheme.ParameterCodec).
Timeout(timeout).
Body(&opts).
Do(ctx).
Error()
}

// Patch applies the patch and returns the patched lifelongLearningJob.
func (c *lifelongLearningJobs) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.LifelongLearningJob, err error) {
result = &v1alpha1.LifelongLearningJob{}
err = c.client.Patch(pt).
Namespace(c.ns).
Resource("lifelonglearningjobs").
Name(name).
SubResource(subresources...).
VersionedParams(&opts, scheme.ParameterCodec).
Body(data).
Do(ctx).
Into(result)
return
}

+ 5
- 0
pkg/client/clientset/versioned/typed/sedna/v1alpha1/sedna_client.go View File

@@ -30,6 +30,7 @@ type SednaV1alpha1Interface interface {
FederatedLearningJobsGetter
IncrementalLearningJobsGetter
JointInferenceServicesGetter
LifelongLearningJobsGetter
ModelsGetter
}

@@ -54,6 +55,10 @@ func (c *SednaV1alpha1Client) JointInferenceServices(namespace string) JointInfe
return newJointInferenceServices(c, namespace)
}

func (c *SednaV1alpha1Client) LifelongLearningJobs(namespace string) LifelongLearningJobInterface {
return newLifelongLearningJobs(c, namespace)
}

func (c *SednaV1alpha1Client) Models(namespace string) ModelInterface {
return newModels(c, namespace)
}


+ 2
- 0
pkg/client/informers/externalversions/generic.go View File

@@ -61,6 +61,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource
return &genericInformer{resource: resource.GroupResource(), informer: f.Sedna().V1alpha1().IncrementalLearningJobs().Informer()}, nil
case v1alpha1.SchemeGroupVersion.WithResource("jointinferenceservices"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Sedna().V1alpha1().JointInferenceServices().Informer()}, nil
case v1alpha1.SchemeGroupVersion.WithResource("lifelonglearningjobs"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Sedna().V1alpha1().LifelongLearningJobs().Informer()}, nil
case v1alpha1.SchemeGroupVersion.WithResource("models"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Sedna().V1alpha1().Models().Informer()}, nil



+ 7
- 0
pkg/client/informers/externalversions/sedna/v1alpha1/interface.go View File

@@ -32,6 +32,8 @@ type Interface interface {
IncrementalLearningJobs() IncrementalLearningJobInformer
// JointInferenceServices returns a JointInferenceServiceInformer.
JointInferenceServices() JointInferenceServiceInformer
// LifelongLearningJobs returns a LifelongLearningJobInformer.
LifelongLearningJobs() LifelongLearningJobInformer
// Models returns a ModelInformer.
Models() ModelInformer
}
@@ -67,6 +69,11 @@ func (v *version) JointInferenceServices() JointInferenceServiceInformer {
return &jointInferenceServiceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

// LifelongLearningJobs returns a LifelongLearningJobInformer.
func (v *version) LifelongLearningJobs() LifelongLearningJobInformer {
return &lifelongLearningJobInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

// Models returns a ModelInformer.
func (v *version) Models() ModelInformer {
return &modelInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}


+ 90
- 0
pkg/client/informers/externalversions/sedna/v1alpha1/lifelonglearningjob.go View File

@@ -0,0 +1,90 @@
/*
Copyright The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Code generated by informer-gen. DO NOT EDIT.

package v1alpha1

import (
"context"
time "time"

sednav1alpha1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
versioned "github.com/kubeedge/sedna/pkg/client/clientset/versioned"
internalinterfaces "github.com/kubeedge/sedna/pkg/client/informers/externalversions/internalinterfaces"
v1alpha1 "github.com/kubeedge/sedna/pkg/client/listers/sedna/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
cache "k8s.io/client-go/tools/cache"
)

// LifelongLearningJobInformer provides access to a shared informer and lister for
// LifelongLearningJobs.
type LifelongLearningJobInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1alpha1.LifelongLearningJobLister
}

type lifelongLearningJobInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}

// NewLifelongLearningJobInformer constructs a new informer for LifelongLearningJob type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewLifelongLearningJobInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredLifelongLearningJobInformer(client, namespace, resyncPeriod, indexers, nil)
}

// NewFilteredLifelongLearningJobInformer constructs a new informer for LifelongLearningJob type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredLifelongLearningJobInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.SednaV1alpha1().LifelongLearningJobs(namespace).List(context.TODO(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.SednaV1alpha1().LifelongLearningJobs(namespace).Watch(context.TODO(), options)
},
},
&sednav1alpha1.LifelongLearningJob{},
resyncPeriod,
indexers,
)
}

func (f *lifelongLearningJobInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredLifelongLearningJobInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *lifelongLearningJobInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&sednav1alpha1.LifelongLearningJob{}, f.defaultInformer)
}

func (f *lifelongLearningJobInformer) Lister() v1alpha1.LifelongLearningJobLister {
return v1alpha1.NewLifelongLearningJobLister(f.Informer().GetIndexer())
}

+ 8
- 0
pkg/client/listers/sedna/v1alpha1/expansion_generated.go View File

@@ -50,6 +50,14 @@ type JointInferenceServiceListerExpansion interface{}
// JointInferenceServiceNamespaceLister.
type JointInferenceServiceNamespaceListerExpansion interface{}

// LifelongLearningJobListerExpansion allows custom methods to be added to
// LifelongLearningJobLister.
type LifelongLearningJobListerExpansion interface{}

// LifelongLearningJobNamespaceListerExpansion allows custom methods to be added to
// LifelongLearningJobNamespaceLister.
type LifelongLearningJobNamespaceListerExpansion interface{}

// ModelListerExpansion allows custom methods to be added to
// ModelLister.
type ModelListerExpansion interface{}


+ 99
- 0
pkg/client/listers/sedna/v1alpha1/lifelonglearningjob.go View File

@@ -0,0 +1,99 @@
/*
Copyright The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Code generated by lister-gen. DO NOT EDIT.

package v1alpha1

import (
v1alpha1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)

// LifelongLearningJobLister helps list LifelongLearningJobs.
// All objects returned here must be treated as read-only.
type LifelongLearningJobLister interface {
// List lists all LifelongLearningJobs in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1alpha1.LifelongLearningJob, err error)
// LifelongLearningJobs returns an object that can list and get LifelongLearningJobs.
LifelongLearningJobs(namespace string) LifelongLearningJobNamespaceLister
LifelongLearningJobListerExpansion
}

// lifelongLearningJobLister implements the LifelongLearningJobLister interface.
type lifelongLearningJobLister struct {
indexer cache.Indexer
}

// NewLifelongLearningJobLister returns a new LifelongLearningJobLister.
func NewLifelongLearningJobLister(indexer cache.Indexer) LifelongLearningJobLister {
return &lifelongLearningJobLister{indexer: indexer}
}

// List lists all LifelongLearningJobs in the indexer.
func (s *lifelongLearningJobLister) List(selector labels.Selector) (ret []*v1alpha1.LifelongLearningJob, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.LifelongLearningJob))
})
return ret, err
}

// LifelongLearningJobs returns an object that can list and get LifelongLearningJobs.
func (s *lifelongLearningJobLister) LifelongLearningJobs(namespace string) LifelongLearningJobNamespaceLister {
return lifelongLearningJobNamespaceLister{indexer: s.indexer, namespace: namespace}
}

// LifelongLearningJobNamespaceLister helps list and get LifelongLearningJobs.
// All objects returned here must be treated as read-only.
type LifelongLearningJobNamespaceLister interface {
// List lists all LifelongLearningJobs in the indexer for a given namespace.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1alpha1.LifelongLearningJob, err error)
// Get retrieves the LifelongLearningJob from the indexer for a given namespace and name.
// Objects returned here must be treated as read-only.
Get(name string) (*v1alpha1.LifelongLearningJob, error)
LifelongLearningJobNamespaceListerExpansion
}

// lifelongLearningJobNamespaceLister implements the LifelongLearningJobNamespaceLister
// interface.
type lifelongLearningJobNamespaceLister struct {
indexer cache.Indexer
namespace string
}

// List lists all LifelongLearningJobs in the indexer for a given namespace.
func (s lifelongLearningJobNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.LifelongLearningJob, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.LifelongLearningJob))
})
return ret, err
}

// Get retrieves the LifelongLearningJob from the indexer for a given namespace and name.
func (s lifelongLearningJobNamespaceLister) Get(name string) (*v1alpha1.LifelongLearningJob, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1alpha1.Resource("lifelonglearningjob"), name)
}
return obj.(*v1alpha1.LifelongLearningJob), nil
}

+ 13
- 0
pkg/globalmanager/config/config.go View File

@@ -33,6 +33,7 @@ const (
defaultWebsocketAddress = "0.0.0.0"
defaultWebsocketPort = 9000
defaultLCServer = "http://localhost:9100"
defaultKBServer = "http://localhost:9020"
)

// ControllerConfig indicates the config of controller
@@ -54,6 +55,9 @@ type ControllerConfig struct {

// lc config to info the worker
LC LCConfig `json:"localController,omitempty"`

// kb config to info the worker
KB KBConfig `json:"knowledgeBaseServer,omitempty"`
}

// WebSocket describes GM of websocket config
@@ -70,6 +74,12 @@ type LCConfig struct {
Server string `json:"server"`
}

// KBConfig describes KB config to inject the worker
type KBConfig struct {
// default defaultKBServer
Server string `json:"server"`
}

// Parse parses from filename
func (c *ControllerConfig) Parse(filename string) error {
data, err := ioutil.ReadFile(filename)
@@ -107,6 +117,9 @@ func NewDefaultControllerConfig() *ControllerConfig {
LC: LCConfig{
Server: defaultLCServer,
},
KB: KBConfig{
Server: defaultKBServer,
},
}
}



+ 1
- 0
pkg/globalmanager/controller.go View File

@@ -49,6 +49,7 @@ func (c *MainController) Start() {
NewFederatedController,
NewJointController,
NewIncrementalJobController,
NewLifelongLearningJobController,
} {
f, _ := featureFunc(c.Config)
err := f.Start()


+ 26
- 1
pkg/globalmanager/downstream.go View File

@@ -163,6 +163,23 @@ func (dc *DownstreamController) syncIncrementalJob(eventType watch.EventType, jo
return nil
}

// syncLifelongLearningJob syncs the lifelonglearning jobs
func (dc *DownstreamController) syncLifelongLearningJob(eventType watch.EventType, job *sednav1.LifelongLearningJob) error {
// Here only propagate to the nodes with non empty name

// FIXME(llhuii): only the case that all workers having the same nodeName are support,
// will support Spec.NodeSelector and differenect nodeName.
nodeName := job.Spec.TrainSpec.Template.Spec.NodeName
if len(nodeName) == 0 {
return fmt.Errorf("empty node name")
}

dc.injectSecret(job, job.Spec.CredentialName)
dc.messageLayer.SendResourceObject(nodeName, eventType, job)

return nil
}

// sync defines the entrypoint of syncing all resources
func (dc *DownstreamController) sync(stopCh <-chan struct{}) {
for {
@@ -215,7 +232,14 @@ func (dc *DownstreamController) sync(stopCh <-chan struct{}) {
namespace = t.Namespace
name = t.Name
err = dc.syncIncrementalJob(e.Type, t)

case (*sednav1.LifelongLearningJob):
if len(t.Kind) == 0 {
t.Kind = "LifelongLearningJob"
}
kind = t.Kind
namespace = t.Namespace
name = t.Name
err = dc.syncLifelongLearningJob(e.Type, t)
default:
klog.Warningf("object type: %T unsupported", e)
continue
@@ -264,6 +288,7 @@ func (dc *DownstreamController) watch(stopCh <-chan struct{}) {
"jointinferenceservices": &sednav1.JointInferenceService{},
"federatedlearningjobs": &sednav1.FederatedLearningJob{},
"incrementallearningjobs": &sednav1.IncrementalLearningJob{},
"lifelonglearningjobs": &sednav1.LifelongLearningJob{},
} {
lw := cache.NewListWatchFromClient(client, resourceName, namespace, fields.Everything())
si := cache.NewSharedInformer(lw, object, resyncPeriod)


+ 1
- 1
pkg/globalmanager/federatedlearningjob.go View File

@@ -513,7 +513,7 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act
"DATASET_NAME": datasetName,
"LC_SERVER": fc.cfg.LC.Server,
}
workerParam.workerType = "train"
workerParam.workerType = TrainPodType
workerParam.hostNetwork = true
workerParam.restartPolicy = v1.RestartPolicyOnFailure
// create train pod based on configured parameters


+ 5
- 5
pkg/globalmanager/incrementallearningjob.go View File

@@ -261,7 +261,7 @@ func (jc *IncrementalJobController) sync(key string) (bool, error) {
if incrementaljob.Status.StartTime == nil {
now := metav1.Now()
incrementaljob.Status.StartTime = &now
pod := jc.getSpecifiedPods(&incrementaljob, "inference")
pod := jc.getSpecifiedPods(&incrementaljob, InferencePodType)
if pod == nil {
err = jc.createInferPod(&incrementaljob)
} else {
@@ -430,7 +430,7 @@ func (jc *IncrementalJobController) generatePodName(jobName string, workerType s

func (jc *IncrementalJobController) getSpecifiedPods(job *sednav1.IncrementalLearningJob, podType string) *v1.Pod {
if podType == "Deploy" {
podType = "inference"
podType = InferencePodType
}
var latestPod *v1.Pod
selector, _ := GenerateSelector(job)
@@ -455,7 +455,7 @@ func (jc *IncrementalJobController) getSpecifiedPods(job *sednav1.IncrementalLea
}

func (jc *IncrementalJobController) restartInferPod(job *sednav1.IncrementalLearningJob) error {
inferPod := jc.getSpecifiedPods(job, "inference")
inferPod := jc.getSpecifiedPods(job, InferencePodType)
if inferPod == nil {
klog.V(2).Infof("No inferpod is running in incrementallearning job %v/%v", job.Namespace, job.Name)
err := jc.createInferPod(job)
@@ -572,7 +572,7 @@ func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJo

var workerParam *WorkerParam = new(WorkerParam)
if podtype == sednav1.ILJobTrain {
workerParam.workerType = "Train"
workerParam.workerType = TrainPodType

podTemplate = &job.Spec.TrainSpec.Template
// Env parameters for train
@@ -752,7 +752,7 @@ func (jc *IncrementalJobController) createInferPod(job *sednav1.IncrementalLearn
"LC_SERVER": jc.cfg.LC.Server,
}

workerParam.workerType = "inference"
workerParam.workerType = InferencePodType
workerParam.hostNetwork = true

// create edge pod


+ 770
- 0
pkg/globalmanager/lifelonglearningjob.go View File

@@ -0,0 +1,770 @@
/*
Copyright 2021 The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package globalmanager

import (
"context"
"fmt"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilrand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
k8scontroller "k8s.io/kubernetes/pkg/controller"

sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
clientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned"
sednaclientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
informers "github.com/kubeedge/sedna/pkg/client/informers/externalversions"
sednav1listers "github.com/kubeedge/sedna/pkg/client/listers/sedna/v1alpha1"
"github.com/kubeedge/sedna/pkg/globalmanager/config"
messageContext "github.com/kubeedge/sedna/pkg/globalmanager/messagelayer/ws"
"github.com/kubeedge/sedna/pkg/globalmanager/utils"
)

// ljControllerKind contains the schema.GroupVersionKind for this controller type.
var ljControllerKind = sednav1.SchemeGroupVersion.WithKind("LifelongLearningJob")

// LifelongLearningJobController ensures that all LifelongLearningJob objects have corresponding pods to
// run their configured workload.
type LifelongLearningJobController struct {
kubeClient kubernetes.Interface
client sednaclientset.SednaV1alpha1Interface

// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
// jobStoreSynced returns true if the lifelonglearningjob store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jobStoreSynced cache.InformerSynced

// A store of jobs
jobLister sednav1listers.LifelongLearningJobLister

// A store of pods, populated by the podController
podStore corelisters.PodLister

// LifelongLearningJobs that need to be updated
queue workqueue.RateLimitingInterface

recorder record.EventRecorder

cfg *config.ControllerConfig
}

// Run the main goroutine responsible for watching and syncing jobs.
func (jc *LifelongLearningJobController) Start() error {
workers := 1
stopCh := messageContext.Done()

go func() {
defer utilruntime.HandleCrash()
defer jc.queue.ShutDown()
klog.Infof("Starting lifelonglearning job controller")
defer klog.Infof("Shutting down lifelonglearning job controller")

if !cache.WaitForNamedCacheSync("lifelonglearningjob", stopCh, jc.podStoreSynced, jc.jobStoreSynced) {
klog.Errorf("failed to wait for caches to sync")

return
}
klog.Infof("Starting lifelonglearning job workers")
for i := 0; i < workers; i++ {
go wait.Until(jc.worker, time.Second, stopCh)
}

<-stopCh
}()
return nil
}

// enqueueByPod enqueues the lifelonglearningjob object of the specified pod.
func (jc *LifelongLearningJobController) enqueueByPod(pod *v1.Pod, immediate bool) {
controllerRef := metav1.GetControllerOf(pod)

if controllerRef == nil {
return
}

if controllerRef.Kind != ljControllerKind.Kind {
return
}

service, err := jc.jobLister.LifelongLearningJobs(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}

if service.UID != controllerRef.UID {
return
}

jc.enqueueController(service, immediate)
}

// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (jc *LifelongLearningJobController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
jc.deletePod(pod)
return
}

// backoff to queue when PodFailed
immediate := pod.Status.Phase != v1.PodFailed

jc.enqueueByPod(pod, immediate)
}

// When a pod is updated, figure out what lifelonglearning job manage it and wake them up.
func (jc *LifelongLearningJobController) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)

// no pod update, no queue
if curPod.ResourceVersion == oldPod.ResourceVersion {
return
}

jc.addPod(curPod)
}

// deletePod enqueues the lifelonglearningjob obj When a pod is deleted
func (jc *LifelongLearningJobController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)

// comment from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/job/job_controller.go

// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new lifelonglearningjob will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Warningf("couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
klog.Warningf("tombstone contained object that is not a pod %+v", obj)
return
}
}
jc.enqueueByPod(pod, true)
}

// obj could be an *sedna.LifelongLearningJob, or a DeletionFinalStateUnknown marker item,
// immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run.
func (jc *LifelongLearningJobController) enqueueController(obj interface{}, immediate bool) {
key, err := k8scontroller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}

backoff := time.Duration(0)
if !immediate {
backoff = getBackoff(jc.queue, key)
}

jc.queue.AddAfter(key, backoff)
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jc *LifelongLearningJobController) worker() {
for jc.processNextWorkItem() {
}
}

func (jc *LifelongLearningJobController) processNextWorkItem() bool {
key, quit := jc.queue.Get()
if quit {
return false
}
defer jc.queue.Done(key)

forget, err := jc.sync(key.(string))
if err == nil {
if forget {
jc.queue.Forget(key)
}
return true
}

utilruntime.HandleError(fmt.Errorf("Error syncing lifelonglearning job: %v", err))
jc.queue.AddRateLimited(key)

return true
}

// sync will sync the lifelonglearning job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jc *LifelongLearningJobController) sync(key string) (bool, error) {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing lifelonglearning job %q (%v)", key, time.Since(startTime))
}()

ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
}
if len(ns) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid lifelonglearning job key %q: either namespace or name is missing", key)
}
sharedLifelongLearningJob, err := jc.jobLister.LifelongLearningJobs(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.V(4).Infof("lifelonglearning job has been deleted: %v", key)
return true, nil
}
return false, err
}
lifelonglearningjob := *sharedLifelongLearningJob
// set kind for lifelonglearningjob in case that the kind is None
lifelonglearningjob.SetGroupVersionKind(sednav1.SchemeGroupVersion.WithKind("LifelongLearningJob"))

// lifelonglearningjob first start
if lifelonglearningjob.Status.StartTime == nil {
now := metav1.Now()
lifelonglearningjob.Status.StartTime = &now
}

// if lifelonglearningjob was finished previously, we don't want to redo the termination
if IsLifelongLearningJobFinished(&lifelonglearningjob) {
return true, nil
}

forget := false
jobFailed := false
needUpdated := false

// update conditions of lifelonglearning job
needUpdated, err = jc.updateLifelongLearningJobConditions(&lifelonglearningjob)
if err != nil {
klog.V(2).Infof("lifelonglearning job %v/%v faied to be updated, err:%s", lifelonglearningjob.Namespace, lifelonglearningjob.Name, err)
}

if needUpdated {
if err := jc.updateLifelongLearningJobStatus(&lifelonglearningjob); err != nil {
return forget, err
}

if jobFailed && !IsLifelongLearningJobFinished(&lifelonglearningjob) {
// returning an error will re-enqueue LifelongLearningJob after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for lifelonglearningjob key %q", key)
}

forget = true
}

return forget, err
}

// updateLifelongLearningJobConditions ensures that conditions of lifelonglearning job can be changed by podstatus
func (jc *LifelongLearningJobController) updateLifelongLearningJobConditions(lifelonglearningjob *sednav1.LifelongLearningJob) (bool, error) {
var initialType sednav1.LLJobStageConditionType
var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{
Stage: sednav1.LLJobTrain,
Type: initialType,
}
var newConditionType sednav1.LLJobStageConditionType
latestCondition.Stage = sednav1.LLJobTrain
var needUpdated = false
jobConditions := lifelonglearningjob.Status.Conditions
var podStatus v1.PodPhase = v1.PodUnknown
if len(jobConditions) > 0 {
// get latest pod and pod status
latestCondition = (jobConditions)[len(jobConditions)-1]
klog.V(2).Infof("lifelonglearning job %v/%v latest stage %v:", lifelonglearningjob.Namespace, lifelonglearningjob.Name,
latestCondition.Stage)
pod := jc.getSpecifiedPods(lifelonglearningjob, string(latestCondition.Stage))

if pod != nil {
podStatus = pod.Status.Phase
}
}
jobStage := latestCondition.Stage
currentType := latestCondition.Type
newConditionType = currentType

switch currentType {
case initialType:
newConditionType = sednav1.LLJobStageCondWaiting

case sednav1.LLJobStageCondWaiting:
// do nothing, waiting for LC to set type from waiting to ready

case sednav1.LLJobStageCondReady:
// create a pod, and set type from ready to starting
// include train, eval, deploy pod
var err error
if jobStage == sednav1.LLJobDeploy {
err = jc.restartInferPod(lifelonglearningjob)
if err != nil {
klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", lifelonglearningjob.Namespace, lifelonglearningjob.Name, err)
} else {
klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", lifelonglearningjob.Namespace, lifelonglearningjob.Name)
}
} else if podStatus != v1.PodPending && podStatus != v1.PodRunning {
err = jc.createPod(lifelonglearningjob, jobStage)
}
if err != nil {
return needUpdated, err
}
newConditionType = sednav1.LLJobStageCondStarting

case sednav1.LLJobStageCondStarting, sednav1.LLJobStageCondRunning:
if podStatus == v1.PodRunning {
if jobStage == sednav1.LLJobDeploy {
newConditionType = sednav1.LLJobStageCondCompleted
} else {
// watch pod status, if pod running, set type running
newConditionType = sednav1.LLJobStageCondRunning
}
} else if podStatus == v1.PodSucceeded {
// watch pod status, if pod completed, set type completed
newConditionType = sednav1.LLJobStageCondCompleted
klog.V(2).Infof("lifelonglearning job %v/%v %v stage completed!", lifelonglearningjob.Namespace, lifelonglearningjob.Name, jobStage)
} else if podStatus == v1.PodFailed {
newConditionType = sednav1.LLJobStageCondFailed
klog.V(2).Infof("lifelonglearning job %v/%v %v stage failed!", lifelonglearningjob.Namespace, lifelonglearningjob.Name, jobStage)
}
case sednav1.LLJobStageCondCompleted:
jobStage = jc.getNextStage(jobStage)
newConditionType = sednav1.LLJobStageCondWaiting

case sednav1.LLJobStageCondFailed:
jobStage = sednav1.LLJobTrain
newConditionType = sednav1.LLJobStageCondWaiting

default:
// do nothing when given other type out of cases
}
klog.V(2).Infof("lifelonglearning job %v/%v, conditions: %v", lifelonglearningjob.Namespace, lifelonglearningjob.Name, jobConditions)
if latestCondition.Type != newConditionType {
lifelonglearningjob.Status.Conditions = append(lifelonglearningjob.Status.Conditions, NewLifelongLearningJobCondition(newConditionType, jobStage))
needUpdated = true
return needUpdated, nil
}
return needUpdated, nil
}

// updateLifelongLearningJobStatus ensures that jobstatus can be updated rightly
func (jc *LifelongLearningJobController) updateLifelongLearningJobStatus(lifelonglearningjob *sednav1.LifelongLearningJob) error {
jobClient := jc.client.LifelongLearningJobs(lifelonglearningjob.Namespace)
var err error
for i := 0; i <= statusUpdateRetries; i = i + 1 {
var newLifelongLearningJob *sednav1.LifelongLearningJob
newLifelongLearningJob, err = jobClient.Get(context.TODO(), lifelonglearningjob.Name, metav1.GetOptions{})
if err != nil {
break
}
newLifelongLearningJob.Status = lifelonglearningjob.Status
if _, err = jobClient.UpdateStatus(context.TODO(), newLifelongLearningJob, metav1.UpdateOptions{}); err == nil {
break
}
}
return err
}

func NewLifelongLearningJobCondition(conditionType sednav1.LLJobStageConditionType, jobStage sednav1.LLJobStage) sednav1.LLJobCondition {
return sednav1.LLJobCondition{
Type: conditionType,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "",
Message: "",
Stage: jobStage,
}
}

func (jc *LifelongLearningJobController) generatePodName(jobName string, workerType string) string {
return jobName + "-" + strings.ToLower(workerType) + "-" + utilrand.String(5)
}

func (jc *LifelongLearningJobController) getSpecifiedPods(job *sednav1.LifelongLearningJob, podType string) *v1.Pod {
if podType == "Deploy" {
podType = InferencePodType
}
var latestPod *v1.Pod
selector, _ := GenerateSelector(job)
pods, err := jc.podStore.Pods(job.Namespace).List(selector)
if len(pods) == 0 || err != nil {
return nil
}
var matchTag = false
latestPod = pods[0]
for _, pod := range pods {
s := strings.Split(pod.Name, "-")
CurrentPodType := s[len(s)-2]
if (latestPod.CreationTimestamp.Before(&pod.CreationTimestamp) || latestPod.CreationTimestamp.Equal(&pod.CreationTimestamp)) && CurrentPodType == strings.ToLower(podType) {
latestPod = pod
matchTag = true
}
}
if !matchTag {
return nil
}
return latestPod
}

func (jc *LifelongLearningJobController) restartInferPod(job *sednav1.LifelongLearningJob) error {
inferPod := jc.getSpecifiedPods(job, InferencePodType)
if inferPod == nil {
klog.V(2).Infof("No inferpod is running in lifelonglearning job %v/%v", job.Namespace, job.Name)
err := jc.createInferPod(job)
return err
}
ctx := context.Background()
err := jc.kubeClient.CoreV1().Pods(job.Namespace).Delete(ctx, inferPod.Name, metav1.DeleteOptions{})
if err != nil {
klog.Warningf("failed to delete inference pod %s for lifelonglearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err)
return err
}
err = jc.createInferPod(job)
if err != nil {
klog.Warningf("failed to create inference pod %s for lifelonglearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err)
return err
}
return nil
}

func (jc *LifelongLearningJobController) getNextStage(currentStage sednav1.LLJobStage) sednav1.LLJobStage {
switch currentStage {
case sednav1.LLJobTrain:
return sednav1.LLJobEval
case sednav1.LLJobEval:
return sednav1.LLJobDeploy
case sednav1.LLJobDeploy:
return sednav1.LLJobTrain
default:
return sednav1.LLJobTrain
}
}

func (jc *LifelongLearningJobController) getSecret(namespace, name string, ownerStr string) (secret *v1.Secret, err error) {
if name != "" {
secret, err = jc.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("failed to get the secret %s for %s: %w",
name,
ownerStr, err)
}
}
return
}

func IsLifelongLearningJobFinished(j *sednav1.LifelongLearningJob) bool {
// TODO
return false
}

func (jc *LifelongLearningJobController) createPod(job *sednav1.LifelongLearningJob, podtype sednav1.LLJobStage) (err error) {
ctx := context.Background()
var podTemplate *v1.PodTemplateSpec

LLDatasetName := job.Spec.Dataset.Name

dataset, err := jc.client.Datasets(job.Namespace).Get(ctx, LLDatasetName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get dataset %s: %w", LLDatasetName, err)
}

datasetSecret, err := jc.getSecret(
job.Namespace,
dataset.Spec.CredentialName,
fmt.Sprintf("dataset %s", dataset.Name),
)
if err != nil {
return err
}

jobSecret, err := jc.getSecret(
job.Namespace,
job.Spec.CredentialName,
fmt.Sprintf("lifelonglearning job %s", job.Name),
)
if err != nil {
return err
}

// get all url for train and eval from data in condition
condDataStr := job.Status.Conditions[len(job.Status.Conditions)-1].Data
klog.V(2).Infof("lifelonglearning job %v/%v data condition:%s", job.Namespace, job.Name, condDataStr)
var cond LifelongLearningCondData
(&cond).Unmarshal([]byte(condDataStr))
if cond.Input == nil {
return fmt.Errorf("empty input from condData")
}
dataURL := cond.Input.DataURL
inputmodelURLs := cond.GetInputModelURLs()

var originalDataURLOrIndex string
if cond.Input.DataIndexURL != "" {
// this guarantee dataset.Spec.URL is not in host filesystem by LC,
// but cond.Input.DataIndexURL could be in host filesystem.
originalDataURLOrIndex = cond.Input.DataIndexURL
} else {
originalDataURLOrIndex = dataset.Spec.URL
}

var workerParam *WorkerParam = new(WorkerParam)
if podtype == sednav1.LLJobTrain {
workerParam.workerType = "Train"

podTemplate = &job.Spec.TrainSpec.Template
// Env parameters for train

workerParam.env = map[string]string{
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
"WORKER_NAME": "train-worker-" + utilrand.String(5),

"LC_SERVER": jc.cfg.LC.Server,
"KB_SERVER": jc.cfg.KB.Server,
}

workerParam.mounts = append(workerParam.mounts,
WorkerMount{
URL: &MountURL{
URL: cond.Input.OutputDir,
Secret: jobSecret,
Mode: workerMountWriteOnly,
},
EnvName: "OUTPUT_URL",
},

WorkerMount{
URL: &MountURL{
URL: dataURL,
Secret: jobSecret,
},
EnvName: "TRAIN_DATASET_URL",
},

// see https://github.com/kubeedge/sedna/issues/35
WorkerMount{
URL: &MountURL{
Secret: datasetSecret,
URL: originalDataURLOrIndex,
Indirect: dataset.Spec.URL != originalDataURLOrIndex,
},
EnvName: "ORIGINAL_DATASET_URL",
},
)
} else {
podTemplate = &job.Spec.EvalSpec.Template
workerParam.workerType = "Eval"

// Configure Env information for eval by initial WorkerParam
workerParam.env = map[string]string{
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
"WORKER_NAME": "eval-worker-" + utilrand.String(5),

"LC_SERVER": jc.cfg.LC.Server,
"KB_SERVER": jc.cfg.KB.Server,
}

var modelMountURLs []MountURL
for _, url := range inputmodelURLs {
modelMountURLs = append(modelMountURLs, MountURL{
URL: url,
Secret: jobSecret,
})
}
workerParam.mounts = append(workerParam.mounts,
WorkerMount{
URLs: modelMountURLs,
Name: "models",
EnvName: "MODEL_URLS",
},

WorkerMount{
URL: &MountURL{
URL: cond.Input.OutputDir,
Secret: jobSecret,
Mode: workerMountWriteOnly,
},
EnvName: "OUTPUT_URL",
},

WorkerMount{
URL: &MountURL{
URL: dataURL,
Secret: datasetSecret,
},
Name: "datasets",
EnvName: "TEST_DATASET_URL",
},

WorkerMount{
URL: &MountURL{
Secret: datasetSecret,
URL: originalDataURLOrIndex,
Indirect: dataset.Spec.URL != originalDataURLOrIndex,
},
Name: "origin-dataset",
EnvName: "ORIGINAL_DATASET_URL",
},
)
}

// set the default policy instead of Always policy
workerParam.restartPolicy = v1.RestartPolicyOnFailure
workerParam.hostNetwork = true

// create pod based on podtype
_, err = createPodWithTemplate(jc.kubeClient, job, podTemplate, workerParam)
if err != nil {
return err
}
return
}

func (jc *LifelongLearningJobController) createInferPod(job *sednav1.LifelongLearningJob) error {
inferModelURL := strings.Join([]string{strings.TrimRight(job.Spec.OutputDir, "/"), "deploy/index.pkl"}, "/")

jobSecret, err := jc.getSecret(
job.Namespace,
job.Spec.CredentialName,
fmt.Sprintf("lifelonglearning job %s", job.Name),
)
if err != nil {
return err
}

var workerParam *WorkerParam = new(WorkerParam)
workerParam.mounts = append(workerParam.mounts,
WorkerMount{
URL: &MountURL{
URL: inferModelURL,
Secret: jobSecret,
},
Name: "models",
EnvName: "MODEL_URLS",
},
)

workerParam.env = map[string]string{
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
"WORKER_NAME": "inferworker-" + utilrand.String(5),

"LC_SERVER": jc.cfg.LC.Server,
}

workerParam.workerType = InferencePodType
workerParam.hostNetwork = true

// create edge pod
_, err = createPodWithTemplate(jc.kubeClient, job, &job.Spec.DeploySpec.Template, workerParam)
return err
}

// GetName returns the name of the lifelonglearning job controller
func (jc *LifelongLearningJobController) GetName() string {
return "LifelongLearningJobController"
}

// NewLifelongLearningJobController creates a new LifelongLearningJob controller that keeps the relevant pods
// in sync with their corresponding LifelongLearningJob objects.
func NewLifelongLearningJobController(cfg *config.ControllerConfig) (FeatureControllerI, error) {
namespace := cfg.Namespace
if namespace == "" {
namespace = metav1.NamespaceAll
}
kubeClient, err := utils.KubeClient()
if err != nil {
return nil, err
}

kubecfg, err := utils.KubeConfig()
if err != nil {
return nil, err
}
crdclient, err := clientset.NewForConfig(kubecfg)
if err != nil {
return nil, err
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, time.Second*30, kubeinformers.WithNamespace(namespace))

podInformer := kubeInformerFactory.Core().V1().Pods()

jobInformerFactory := informers.NewSharedInformerFactoryWithOptions(crdclient, time.Second*30, informers.WithNamespace(namespace))
jobInformer := jobInformerFactory.Sedna().V1alpha1().LifelongLearningJobs()

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

jc := &LifelongLearningJobController{
kubeClient: kubeClient,
client: crdclient.SednaV1alpha1(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultBackOff, MaxBackOff), "lifelonglearningjob"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "lifelonglearningjob-controller"}),
cfg: cfg,
}

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
jc.enqueueController(obj, true)
},
UpdateFunc: func(old, cur interface{}) {
jc.enqueueController(cur, true)
},
DeleteFunc: func(obj interface{}) {
jc.enqueueController(obj, true)
},
})
jc.jobLister = jobInformer.Lister()
jc.jobStoreSynced = jobInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.addPod,
UpdateFunc: jc.updatePod,
DeleteFunc: jc.deletePod,
})
jc.podStore = podInformer.Lister()
jc.podStoreSynced = podInformer.Informer().HasSynced

stopCh := make(chan struct{})
kubeInformerFactory.Start(stopCh)
jobInformerFactory.Start(stopCh)
return jc, err
}

+ 63
- 0
pkg/globalmanager/types.go View File

@@ -69,6 +69,15 @@ type IncrementalCondData struct {
} `json:"output,omitempty"`
}

const (
// TrainPodType is type of train pod
TrainPodType = "train"
// EvalPodType is type of eval pod
EvalPodType = "eval"
// InferencePodType is type of inference pod
InferencePodType = "inference"
)

func (m *Model) GetURL() string {
return m.URL
}
@@ -100,3 +109,57 @@ func (cd *IncrementalCondData) Unmarshal(data []byte) error {
func (cd IncrementalCondData) Marshal() ([]byte, error) {
return json.Marshal(cd)
}

// the data of this condition including the input/output to do the next step
type LifelongLearningCondData struct {
Input *struct {
// Only one model cases
Model *Model `json:"model,omitempty"`
Models []Model `json:"models,omitempty"`

DataURL string `json:"dataURL,omitempty"`

// the data samples reference will be stored into this URL.
// The content of this url would be:
// # the first uncomment line means the directory
// s3://dataset/
// mnist/0.jpg
// mnist/1.jpg
DataIndexURL string `json:"dataIndexURL,omitempty"`

OutputDir string `json:"outputDir,omitempty"`
} `json:"input,omitempty"`

Output *struct {
Model *Model `json:"model,omitempty"`
Models []Model `json:"models,omitempty"`
} `json:"output,omitempty"`
}

func (cd *LifelongLearningCondData) joinModelURLs(model *Model, models []Model) []string {
var modelURLs []string
if model != nil {
modelURLs = append(modelURLs, model.GetURL())
} else {
for _, m := range models {
modelURLs = append(modelURLs, m.GetURL())
}
}
return modelURLs
}

func (cd *LifelongLearningCondData) Unmarshal(data []byte) error {
return json.Unmarshal(data, cd)
}

func (cd LifelongLearningCondData) Marshal() ([]byte, error) {
return json.Marshal(cd)
}

func (cd *LifelongLearningCondData) GetInputModelURLs() []string {
return cd.joinModelURLs(cd.Input.Model, cd.Input.Models)
}

func (cd *LifelongLearningCondData) GetOutputModelURLs() []string {
return cd.joinModelURLs(cd.Output.Model, cd.Output.Models)
}

+ 78
- 0
pkg/globalmanager/upstream.go View File

@@ -371,6 +371,83 @@ func (uc *UpstreamController) updateIncrementalLearningFromEdge(name, namespace,
return nil
}

func (uc *UpstreamController) appendLifelongLearningJobStatusCondition(name, namespace string, cond sednav1.LLJobCondition) error {
client := uc.client.LifelongLearningJobs(namespace)
return retryUpdateStatus(name, namespace, func() error {
job, err := client.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
job.Status.Conditions = append(job.Status.Conditions, cond)
_, err = client.UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
return err
})
}

// updateLifelongLearningJobFromEdge syncs the edge updates to k8s
func (uc *UpstreamController) updateLifelongLearningJobFromEdge(name, namespace, operation string, content []byte) error {
err := checkUpstreamOperation(operation)
if err != nil {
return err
}
var jobStatus struct {
Phase string `json:"phase"`
Status string `json:"status"`
}

err = json.Unmarshal(content, &jobStatus)
if err != nil {
return newUnmarshalError(namespace, name, operation, content)
}

// Get the condition data.
// Here unmarshal and marshal immediately to skip the unnecessary fields
var condData LifelongLearningCondData
err = json.Unmarshal(content, &condData)
if err != nil {
return newUnmarshalError(namespace, name, operation, content)
}
condDataBytes, _ := json.Marshal(&condData)

cond := sednav1.LLJobCondition{
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Data: string(condDataBytes),
Message: "reported by lc",
}

switch strings.ToLower(jobStatus.Phase) {
case "train":
cond.Stage = sednav1.LLJobTrain
case "eval":
cond.Stage = sednav1.LLJobEval
case "deploy":
cond.Stage = sednav1.LLJobDeploy
default:
return fmt.Errorf("invalid condition stage: %v", jobStatus.Phase)
}

switch strings.ToLower(jobStatus.Status) {
case "ready":
cond.Type = sednav1.LLJobStageCondReady
case "completed":
cond.Type = sednav1.LLJobStageCondCompleted
case "failed":
cond.Type = sednav1.LLJobStageCondFailed
case "waiting":
cond.Type = sednav1.LLJobStageCondWaiting
default:
return fmt.Errorf("invalid condition type: %v", jobStatus.Status)
}

err = uc.appendLifelongLearningJobStatusCondition(name, namespace, cond)
if err != nil {
return fmt.Errorf("failed to append condition, err:%+w", err)
}
return nil
}

// syncEdgeUpdate receives the updates from edge and syncs these to k8s.
func (uc *UpstreamController) syncEdgeUpdate() {
for {
@@ -435,6 +512,7 @@ func NewUpstreamController(cfg *config.ControllerConfig) (FeatureControllerI, er
"jointinferenceservice": uc.updateJointInferenceFromEdge,
"federatedlearningjob": uc.updateFederatedLearningJobFromEdge,
"incrementallearningjob": uc.updateIncrementalLearningFromEdge,
"lifelonglearningjob": uc.updateLifelongLearningJobFromEdge,
}

return uc, nil


+ 28
- 13
pkg/localcontroller/manager/dataset.go View File

@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"time"
@@ -66,9 +67,10 @@ type DatasetSpec struct {

// DataSource defines config for data source
type DataSource struct {
TrainSamples []string `json:"trainSamples"`
ValidSamples []string `json:"validSamples"`
NumberOfSamples int `json:"numberOfSamples"`
TrainSamples []string
ValidSamples []string
NumberOfSamples int
Header string
}

// NewDatasetManager creates a dataset manager
@@ -205,6 +207,10 @@ func (dm *DatasetManager) monitorDataSources(name string) {

// getDataSource gets data source info
func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, error) {
if path.Ext(dataURL) != ("." + format) {
return nil, fmt.Errorf("dataset file url(%s)'s suffix is different from format(%s)", dataURL, format)
}

localURL, err := ds.Storage.Download(dataURL, "")

if !ds.Storage.IsLocalStorage {
@@ -215,15 +221,11 @@ func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, er
return nil, err
}

switch format {
case "txt":
return ds.readByLine(localURL)
}
return nil, fmt.Errorf("not vaild file format")
return ds.readByLine(localURL, format)
}

// readByLine reads file by line
func (ds *Dataset) readByLine(url string) (*DataSource, error) {
func (ds *Dataset) readByLine(url string, format string) (*DataSource, error) {
samples, err := getSamples(url)
if err != nil {
klog.Errorf("read file %s failed, error: %v", url, err)
@@ -231,13 +233,26 @@ func (ds *Dataset) readByLine(url string) (*DataSource, error) {
}

numberOfSamples := 0
numberOfSamples += len(samples)
dataSource := DataSource{}
switch format {
case DatasetFormatTXT:
numberOfSamples += len(samples)
case DatasetFormatCSV:
// the first row of csv file is header
if len(samples) == 0 {
return nil, fmt.Errorf("file %s is empty", url)
}
dataSource.Header = samples[0]
samples = samples[1:]
numberOfSamples += len(samples)

dataSource := DataSource{
TrainSamples: samples,
NumberOfSamples: numberOfSamples,
default:
return nil, fmt.Errorf("invaild file format")
}

dataSource.TrainSamples = samples
dataSource.NumberOfSamples = numberOfSamples

return &dataSource, nil
}



+ 891
- 0
pkg/localcontroller/manager/lifelonglearningjob.go View File

@@ -0,0 +1,891 @@
/*
Copyright 2021 The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package manager

import (
"bufio"
"encoding/json"
"fmt"
"os"
"path"
"strconv"
"strings"
"sync"
"time"

"k8s.io/klog/v2"

"github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
"github.com/kubeedge/sedna/pkg/localcontroller/db"
"github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
"github.com/kubeedge/sedna/pkg/localcontroller/storage"
"github.com/kubeedge/sedna/pkg/localcontroller/trigger"
"github.com/kubeedge/sedna/pkg/localcontroller/util"
)

const (
//LifelongLearningJobKind is kind of lifelong-learning-job resource
LifelongLearningJobKind = "lifelonglearningjob"
)

// LifelongLearningJobManager defines lifelong-learning-job Manager
type LifelongLearningJobManager struct {
Client gmclient.ClientI
WorkerMessageChannel chan WorkerMessage
DatasetManager *DatasetManager
LifelongLearningJobMap map[string]*LifelongLearningJob
VolumeMountPrefix string
}

// LifelongLearningJob defines config for lifelong-learning-job
type LifelongLearningJob struct {
sednav1.LifelongLearningJob
Dataset *Dataset
Done chan struct{}
Storage storage.Storage
JobConfig *LLJobConfig
}

// LLJobConfig defines config for lifelong-learning-job
type LLJobConfig struct {
UniqueIdentifier string
Version int
Phase string
WorkerStatus string
TrainTrigger trigger.Base
TriggerStatus string
TriggerTime time.Time
TrainDataURL string
EvalDataURL string
OutputDir string
OutputConfig *LLOutputConfig
DataSamples *LLDataSamples
TrainModel *ModelInfo
DeployModel *ModelInfo
EvalResult *ModelInfo
Lock sync.Mutex
}

// LLOutputConfig defines config for job output
type LLOutputConfig struct {
SamplesOutput map[string]string
TrainOutput string
EvalOutput string
}

// LLDataSamples defines samples information
type LLDataSamples struct {
Numbers int
TrainSamples []string
EvalVersionSamples [][]string
EvalSamples []string
}

const (
// LLJobIterationIntervalSeconds is interval time of each iteration of job
LLJobIterationIntervalSeconds = 10
// LLHandlerDataIntervalSeconds is interval time of handling dataset
LLHandlerDataIntervalSeconds = 10
// LLLLEvalSamplesCapacity is capacity of eval samples
LLEvalSamplesCapacity = 5
)

// NewLifelongLearningJobManager creates a lifelong-learning-job manager
func NewLifelongLearningJobManager(client gmclient.ClientI, datasetManager *DatasetManager,
modelManager *ModelManager, options *options.LocalControllerOptions) *LifelongLearningJobManager {
lm := LifelongLearningJobManager{
Client: client,
WorkerMessageChannel: make(chan WorkerMessage, WorkerMessageChannelCacheSize),
DatasetManager: datasetManager,
LifelongLearningJobMap: make(map[string]*LifelongLearningJob),
VolumeMountPrefix: options.VolumeMountPrefix,
}

return &lm
}

// Insert inserts lifelong-learning-job config to db
func (lm *LifelongLearningJobManager) Insert(message *gmclient.Message) error {
name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)

first := false
job, ok := lm.LifelongLearningJobMap[name]
if !ok {
job = &LifelongLearningJob{}
job.Storage = storage.Storage{IsLocalStorage: false}
job.Done = make(chan struct{})
lm.LifelongLearningJobMap[name] = job
first = true
}

if err := json.Unmarshal(message.Content, &job); err != nil {
return err
}

credential := job.ObjectMeta.Annotations[CredentialAnnotationKey]
if credential != "" {
if err := job.Storage.SetCredential(credential); err != nil {
return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err)
}
}

if first {
go lm.startJob(name)
}

if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
return err
}

return nil
}

// startJob starts a job
func (lm *LifelongLearningJobManager) startJob(name string) {
var err error
job, ok := lm.LifelongLearningJobMap[name]
if !ok {
return
}

job.JobConfig = new(LLJobConfig)
jobConfig := job.JobConfig
jobConfig.UniqueIdentifier = name

err = lm.initJob(job)
if err != nil {
klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier)
return
}

klog.Infof("lifelong learning job(name=%s) is started", name)
defer klog.Infof("lifelong learning job(name=%s) is stopped", name)
go lm.handleData(job)

tick := time.NewTicker(LLJobIterationIntervalSeconds * time.Second)
for {
select {
case <-job.Done:
return
default:
}

if job.Dataset == nil {
klog.V(3).Infof("job(name=%s) dataset not ready",
jobConfig.UniqueIdentifier)

<-tick.C
continue
}

switch jobConfig.Phase {
case TrainPhase:
err = lm.trainTask(job)
case EvalPhase:
err = lm.evalTask(job)
case DeployPhase:
err = lm.deployTask(job)
default:
klog.Errorf("invalid phase: %s", jobConfig.Phase)
continue
}

if err != nil {
klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
jobConfig.UniqueIdentifier, jobConfig.Phase, err)
}

<-tick.C
}
}

// trainTask starts training task
func (lm *LifelongLearningJobManager) trainTask(job *LifelongLearningJob) error {
jobConfig := job.JobConfig

if jobConfig.WorkerStatus == WorkerReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus {
payload, ok, err := lm.triggerTrainTask(job)
if !ok {
return nil
}

if err != nil {
klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
jobConfig.UniqueIdentifier, jobConfig.Phase, err)
return err
}

err = lm.Client.WriteMessage(payload, job.getHeader())
if err != nil {
klog.Errorf("job(name=%s) failed to write message: %v",
jobConfig.UniqueIdentifier, err)
return err
}

jobConfig.TriggerStatus = TriggerCompletedStatus

klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobConfig.Phase)
}

if jobConfig.WorkerStatus == WorkerFailedStatus {
klog.Warningf("found the %sing phase worker that ran failed, "+
"back the training phase triggering task", jobConfig.Phase)
backLLTaskStatus(jobConfig)
}

if jobConfig.WorkerStatus == WorkerCompletedStatus {
klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase)
nextLLTask(jobConfig)
}

return nil
}

// evalTask starts eval task
func (lm *LifelongLearningJobManager) evalTask(job *LifelongLearningJob) error {
jobConfig := job.JobConfig

if jobConfig.WorkerStatus == WorkerReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus {
payload, err := lm.triggerEvalTask(job)
if err != nil {
klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
jobConfig.UniqueIdentifier, jobConfig.Phase, err)
return err
}

err = lm.Client.WriteMessage(payload, job.getHeader())
if err != nil {
return err
}

jobConfig.TriggerStatus = TriggerCompletedStatus

klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobConfig.Phase)
}

if jobConfig.WorkerStatus == WorkerFailedStatus {
msg := fmt.Sprintf("job(name=%s) found the %sing phase worker that ran failed, "+
"back the training phase triggering task", jobConfig.UniqueIdentifier, jobConfig.Phase)
klog.Errorf(msg)
return fmt.Errorf(msg)
}

if jobConfig.WorkerStatus == WorkerCompletedStatus {
klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase)
nextLLTask(jobConfig)
}

return nil
}

// deployTask starts deploy task
func (lm *LifelongLearningJobManager) deployTask(job *LifelongLearningJob) error {
jobConfig := job.JobConfig

if jobConfig.WorkerStatus == WorkerReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus {
status := UpstreamMessage{}
status.Phase = DeployPhase
deployModel, err := lm.deployModel(job)
if err != nil {
klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err)
} else {
klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier)
}
if err != nil || deployModel == nil {
status.Status = WorkerFailedStatus
} else {
status.Status = WorkerReadyStatus
status.Input = &WorkerInput{
Models: []ModelInfo{
*deployModel,
},
}
}

if err = lm.Client.WriteMessage(status, job.getHeader()); err != nil {
return err
}

jobConfig.TriggerStatus = TriggerCompletedStatus
}

nextLLTask(jobConfig)

klog.Infof("job(name=%s) complete the deploy task successfully", jobConfig.UniqueIdentifier)

return nil
}

// triggerTrainTask triggers the train task
func (lm *LifelongLearningJobManager) triggerTrainTask(job *LifelongLearningJob) (interface{}, bool, error) {
var err error
jobConfig := job.JobConfig

const numOfSamples = "num_of_samples"
samples := map[string]interface{}{
numOfSamples: len(jobConfig.DataSamples.TrainSamples),
}

isTrigger := jobConfig.TrainTrigger.Trigger(samples)

if !isTrigger {
return nil, false, nil
}

jobConfig.Version++

var dataIndexURL string
jobConfig.TrainDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.TrainSamples,
jobConfig.OutputConfig.SamplesOutput["train"])
if err != nil {
klog.Errorf("train phase: write samples to the file(%s) is failed, error: %v", jobConfig.TrainDataURL, err)
return nil, false, err
}

dataURL := jobConfig.TrainDataURL
outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version)}, "/")
if job.Storage.IsLocalStorage {
dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL)
dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL)
outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir)
}

input := WorkerInput{
DataURL: dataURL,
DataIndexURL: dataIndexURL,
OutputDir: outputDir,
}
msg := UpstreamMessage{
Phase: TrainPhase,
Status: WorkerReadyStatus,
Input: &input,
}
jobConfig.TriggerTime = time.Now()
return &msg, true, nil
}

// triggerEvalTask triggers the eval task
func (lm *LifelongLearningJobManager) triggerEvalTask(job *LifelongLearningJob) (*UpstreamMessage, error) {
jobConfig := job.JobConfig
var err error

var dataIndexURL string
jobConfig.EvalDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"])
if err != nil {
klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v",
jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
return nil, err
}

var models []ModelInfo
models = append(models, ModelInfo{
Format: jobConfig.TrainModel.Format,
URL: jobConfig.TrainModel.URL,
})

dataURL := jobConfig.EvalDataURL
outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version)}, "/")
if job.Storage.IsLocalStorage {
dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL)
dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL)
outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir)
}

input := WorkerInput{
Models: models,
DataURL: dataURL,
DataIndexURL: dataIndexURL,
OutputDir: outputDir,
}
msg := &UpstreamMessage{
Phase: EvalPhase,
Status: WorkerReadyStatus,
Input: &input,
}

return msg, nil
}

// deployModel deploys model
func (lm *LifelongLearningJobManager) deployModel(job *LifelongLearningJob) (*ModelInfo, error) {
jobConfig := job.JobConfig

model := &ModelInfo{}
model = jobConfig.EvalResult

if job.Storage.IsLocalStorage {
model.URL = util.AddPrefixPath(lm.VolumeMountPrefix, model.URL)
}

deployModelURL := jobConfig.DeployModel.URL
if err := job.Storage.CopyFile(model.URL, deployModelURL); err != nil {
return nil, fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v",
model.URL, deployModelURL, err)
}
klog.V(4).Infof("copy model(url=%s) to the deploy model(url=%s) successfully", model.URL, deployModelURL)

klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, model.URL)

return model, nil
}

// createOutputDir creates the job output dir
func (job *LifelongLearningJob) createOutputDir(jobConfig *LLJobConfig) error {
outputDir := jobConfig.OutputDir

dirNames := []string{"data/train", "data/eval", "train", "eval"}
// lifelong_kb_index.pkl

if job.Storage.IsLocalStorage {
if err := util.CreateFolder(outputDir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir)
return err
}

for _, v := range dirNames {
dir := path.Join(outputDir, v)
if err := util.CreateFolder(dir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir)
return err
}
}
}

outputConfig := LLOutputConfig{
SamplesOutput: map[string]string{
"train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
"eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
},
TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
}
jobConfig.OutputConfig = &outputConfig

return nil
}

// createFile creates data file and data index file
func (job *LifelongLearningJob) createFile(dir string, format string, isLocalStorage bool) (string, string) {
switch strings.ToLower(format) {
case DatasetFormatTXT:
if isLocalStorage {
return path.Join(dir, "data.txt"), ""
}
return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
case DatasetFormatCSV:
return strings.Join([]string{dir, "data.csv"}, "/"), ""
}

return "", ""
}

// writeLLJSamples writes samples information to a file
func (job *LifelongLearningJob) writeLLJSamples(samples []string, dir string) (string, string, error) {
version := job.JobConfig.Version
format := job.Dataset.Spec.Format
urlPrefix := job.Dataset.URLPrefix

subDir := strings.Join([]string{dir, strconv.Itoa(version)}, "/")
fileURL, absURLFile := job.createFile(subDir, format, job.Dataset.Storage.IsLocalStorage)

if job.Storage.IsLocalStorage {
if err := util.CreateFolder(subDir); err != nil {
return "", "", err
}
if err := job.writeByLine(samples, fileURL, format); err != nil {
return "", "", err
}

if !job.Dataset.Storage.IsLocalStorage && absURLFile != "" {
tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
if err := job.writeByLine(tempSamples, absURLFile, format); err != nil {
return "", "", err
}
}

return fileURL, absURLFile, nil
}

temporaryDir, err := util.CreateTemporaryDir()
if err != nil {
return "", "", err
}

localFileURL, localAbsURLFile := job.createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage)

if err := job.writeByLine(samples, localFileURL, format); err != nil {
return "", "", err
}

if err := job.Storage.Upload(localFileURL, fileURL); err != nil {
return "", "", err
}

if absURLFile != "" {
tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)

if err := job.writeByLine(tempSamples, localAbsURLFile, format); err != nil {
return "", "", err
}

if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
return "", "", err
}

defer os.RemoveAll(localFileURL)
}

defer os.RemoveAll(localAbsURLFile)

return fileURL, absURLFile, nil
}

// writeByLine writes file by line
func (job *LifelongLearningJob) writeByLine(samples []string, fileURL string, format string) error {
file, err := os.Create(fileURL)
if err != nil {
klog.Errorf("create file(%s) failed", fileURL)
return err
}

w := bufio.NewWriter(file)

if format == "csv" {
_, _ = fmt.Fprintln(w, job.Dataset.DataSource.Header)
}

for _, line := range samples {
_, _ = fmt.Fprintln(w, line)
}
if err := w.Flush(); err != nil {
klog.Errorf("write file(%s) failed", fileURL)
return err
}

if err := file.Close(); err != nil {
klog.Errorf("close file failed, error: %v", err)
return err
}

return nil
}

// handleData updates samples information
func (lm *LifelongLearningJobManager) handleData(job *LifelongLearningJob) {
tick := time.NewTicker(LLHandlerDataIntervalSeconds * time.Second)

jobConfig := job.JobConfig
iterCount := 0
for {
select {
case <-job.Done:
return
default:
}

// in case dataset is not synced to LC before job synced to LC
// here call loadDataset in each period
err := lm.loadDataset(job)
if iterCount%100 == 0 {
klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier)
}
iterCount++
if err != nil {
klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v",
jobConfig.UniqueIdentifier,
err)
<-tick.C
continue
}

dataset := job.Dataset

if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers {
samples := dataset.DataSource.TrainSamples
trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers))

jobConfig.Lock.Lock()
jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...)
klog.Infof("job(name=%s) current train samples nums is %d",
jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples))

jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
samples[(jobConfig.DataSamples.Numbers+trainNum+1):])
jobConfig.Lock.Unlock()

for _, v := range jobConfig.DataSamples.EvalVersionSamples {
jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
}
klog.Infof("job(name=%s) current eval samples nums is %d",
jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))

jobConfig.DataSamples.Numbers = len(samples)
}
<-tick.C
}
}

func (lm *LifelongLearningJobManager) loadDataset(job *LifelongLearningJob) error {
if job.Dataset != nil {
// already loaded
return nil
}

datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, DatasetResourceKind)
dataset, ok := lm.DatasetManager.GetDataset(datasetName)
if !ok || dataset == nil {
return fmt.Errorf("not exists dataset(name=%s)", datasetName)
}

jobConfig := job.JobConfig
jobConfig.DataSamples = &LLDataSamples{
Numbers: 0,
TrainSamples: make([]string, 0),
EvalVersionSamples: make([][]string, 0),
EvalSamples: make([]string, 0),
}

job.Dataset = dataset
return nil
}

// initJob inits the job object
func (lm *LifelongLearningJobManager) initJob(job *LifelongLearningJob) error {
jobConfig := job.JobConfig
jobConfig.TrainModel = new(ModelInfo)
jobConfig.EvalResult = new(ModelInfo)
jobConfig.Lock = sync.Mutex{}

jobConfig.Version = 0
jobConfig.Phase = TrainPhase
jobConfig.WorkerStatus = WorkerReadyStatus
jobConfig.TriggerStatus = TriggerReadyStatus
trainTrigger, err := newLLTrigger(job.Spec.TrainSpec.Trigger)
if err != nil {
return fmt.Errorf("failed to init train trigger: %+w", err)
}
jobConfig.TrainTrigger = trainTrigger

outputDir := job.Spec.OutputDir

isLocalURL, err := job.Storage.IsLocalURL(outputDir)
if err != nil {
return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir)
}

if isLocalURL {
job.Storage.IsLocalStorage = true
outputDir = util.AddPrefixPath(lm.VolumeMountPrefix, outputDir)
}

jobConfig.OutputDir = outputDir

if err := job.createOutputDir(jobConfig); err != nil {
return err
}

jobConfig.DeployModel = &ModelInfo{
Format: "pkl",
URL: strings.Join([]string{strings.TrimRight(outputDir, "/"), "deploy/index.pkl"}, "/"),
}

return nil
}

func newLLTrigger(t sednav1.LLTrigger) (trigger.Base, error) {
// convert trigger to map
triggerMap := make(map[string]interface{})
c, err := json.Marshal(t)
if err != nil {
return nil, err
}

err = json.Unmarshal(c, &triggerMap)
if err != nil {
return nil, err
}
return trigger.NewTrigger(triggerMap)
}

// forwardSamplesLL deletes the samples information in the memory
func forwardSamplesLL(jobConfig *LLJobConfig) {
switch jobConfig.Phase {
case TrainPhase:
{
jobConfig.Lock.Lock()
jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
jobConfig.Lock.Unlock()
}
case EvalPhase:
{
if len(jobConfig.DataSamples.EvalVersionSamples) > LLEvalSamplesCapacity {
jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
}
}
}
}

// backLLTaskStatus backs train task status
func backLLTaskStatus(jobConfig *LLJobConfig) {
jobConfig.Phase = TrainPhase
initLLTaskStatus(jobConfig)
}

// initLLTaskStatus inits task status
func initLLTaskStatus(jobConfig *LLJobConfig) {
jobConfig.WorkerStatus = WorkerReadyStatus
jobConfig.TriggerStatus = TriggerReadyStatus
}

// nextLLTask converts next task status
func nextLLTask(jobConfig *LLJobConfig) {
switch jobConfig.Phase {
case TrainPhase:
{
forwardSamplesLL(jobConfig)
initLLTaskStatus(jobConfig)
jobConfig.Phase = EvalPhase
}

case EvalPhase:
{
forwardSamplesLL(jobConfig)
initLLTaskStatus(jobConfig)
jobConfig.Phase = DeployPhase
}
case DeployPhase:
{
backLLTaskStatus(jobConfig)
}
}
}

// Delete deletes lifelong-learning-job config in db
func (lm *LifelongLearningJobManager) Delete(message *gmclient.Message) error {
name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)

if job, ok := lm.LifelongLearningJobMap[name]; ok && job.Done != nil {
close(job.Done)
}

delete(lm.LifelongLearningJobMap, name)

if err := db.DeleteResource(name); err != nil {
return err
}

return nil
}

// Start starts LifelongLearningJob manager
func (lm *LifelongLearningJobManager) Start() error {
go lm.monitorWorker()

return nil
}

// monitorWorker monitors message from worker
func (lm *LifelongLearningJobManager) monitorWorker() {
for {
workerMessageChannel := lm.WorkerMessageChannel
workerMessage, ok := <-workerMessageChannel
if !ok {
break
}
klog.V(4).Infof("handling worker message %+v", workerMessage)

name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)

job, ok := lm.LifelongLearningJobMap[name]
if !ok {
continue
}

// TODO: filter some worker messages out
wo := WorkerOutput{}
wo.Models = workerMessage.Results
wo.OwnerInfo = workerMessage.OwnerInfo

msg := &UpstreamMessage{
Phase: workerMessage.Kind,
Status: workerMessage.Status,
Output: &wo,
}
lm.Client.WriteMessage(msg, job.getHeader())

lm.handleWorkerMessage(job, workerMessage)
}
}

// handleWorkerMessage handles message from worker
func (lm *LifelongLearningJobManager) handleWorkerMessage(job *LifelongLearningJob, workerMessage WorkerMessage) {
jobPhase := job.JobConfig.Phase
workerKind := workerMessage.Kind
if jobPhase != workerKind {
klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier,
jobPhase, workerKind)
return
}

var models []*ModelInfo
for _, result := range workerMessage.Results {
model := ModelInfo{
Format: result["format"].(string),
URL: result["url"].(string)}
models = append(models, &model)
}

model := &ModelInfo{}
if len(models) != 1 {
return
}
model = models[0]

job.JobConfig.WorkerStatus = workerMessage.Status

if job.JobConfig.WorkerStatus == WorkerCompletedStatus {
switch job.JobConfig.Phase {
case TrainPhase:
job.JobConfig.TrainModel = model
case EvalPhase:
job.JobConfig.EvalResult = model
}
}
}

// AddWorkerMessage adds worker messages
func (lm *LifelongLearningJobManager) AddWorkerMessage(message WorkerMessage) {
lm.WorkerMessageChannel <- message
}

// GetName returns name of the manager
func (lm *LifelongLearningJobManager) GetName() string {
return LifelongLearningJobKind
}

func (job *LifelongLearningJob) getHeader() gmclient.MessageHeader {
return gmclient.MessageHeader{
Namespace: job.Namespace,
ResourceKind: job.Kind,
ResourceName: job.Name,
Operation: gmclient.StatusOperation,
}
}

+ 5
- 0
pkg/localcontroller/manager/types.go View File

@@ -48,6 +48,11 @@ const (

// CredentialAnnotationKey is credential of the storage service
CredentialAnnotationKey = "sedna.io/credential"

// DatasetFormatCSV is csv format of dataset
DatasetFormatCSV = "csv"
// DatasetFormatTXT is txt format of dataset
DatasetFormatTXT = "txt"
)

// WorkerMessage defines message struct from worker


+ 0
- 1
pkg/localcontroller/server/server.go View File

@@ -100,7 +100,6 @@ func (s *Server) messageHandler(request *restful.Request, response *restful.Resp
err = request.ReadEntity(&workerMessage)
if workerMessage.Name != workerName || err != nil {
var msg string

if workerMessage.Name != workerName {
msg = fmt.Sprintf("worker name(name=%s) in the api is different from that(name=%s) in the message body",
workerName, workerMessage.Name)


+ 1
- 1
pkg/localcontroller/util/util.go View File

@@ -54,7 +54,7 @@ func IsDir(path string) bool {
}

// CopyFile copies a file to other
func CopyFile(dstName, srcName string) (written int64, err error) {
func CopyFile(srcName, dstName string) (written int64, err error) {
src, err := os.Open(srcName)
if err != nil {
klog.Errorf("open file %s failed: %v", srcName, err)


+ 97
- 12
scripts/installation/install.sh View File

@@ -21,6 +21,10 @@ set -o pipefail
TMP_DIR=$(mktemp -d --suffix=.sedna)
SEDNA_ROOT=${SEDNA_ROOT:-$TMP_DIR}

GM_NODE_NAME=${SEDNA_GM_NODE:-}
KB_NODE_NAME=${SEDNA_GM_NODE:-}


trap "rm -rf '$TMP_DIR'" EXIT

_download_yamls() {
@@ -50,6 +54,7 @@ download_yamls() {
sedna.io_federatedlearningjobs.yaml
sedna.io_incrementallearningjobs.yaml
sedna.io_jointinferenceservices.yaml
sedna.io_lifelonglearningjobs.yaml
sedna.io_models.yaml
)
_download_yamls build/crds
@@ -59,14 +64,26 @@ download_yamls() {
_download_yamls build/gm/rbac
}

prepare_install(){
# need to create a namespace
kubectl create ns sedna

kubectl label node/$GM_NODE_NAME sedna=control-plane --overwrite
}

prepare() {
mkdir -p ${SEDNA_ROOT}

# we only need build directory
# here don't use git clone because of large vendor directory
download_yamls
}

cleanup(){
kubectl label node/$SEDNA_GM_NODE sedna- | sed 's/labeled$/un&/' || true
kubectl delete ns sedna
}

create_crds() {
cd ${SEDNA_ROOT}
kubectl create -f build/crds
@@ -77,7 +94,77 @@ delete_crds() {
kubectl delete -f build/crds --timeout=90s
}

create_kb(){
cd ${SEDNA_ROOT}

kubectl $action -f - <<EOF
apiVersion: v1
kind: Service
metadata:
name: kb
namespace: sedna
spec:
selector:
sedna: kb
type: NodePort
ports:
- protocol: TCP
port: 9020
targetPort: 9020
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kb
labels:
sedna: kb
namespace: sedna
spec:
replicas: 1
selector:
matchLabels:
sedna: kb
template:
metadata:
labels:
sedna: kb
spec:
nodeSelector:
sedna: control-plane
serviceAccountName: sedna
containers:
- name: kb
imagePullPolicy: IfNotPresent
image: kubeedge/sedna-kb:v0.3.0
env:
- name: KB_URL
value: "sqlite:///db/kb.sqlite3"
volumeMounts:
- name: kb-url
mountPath: /db
resources:
requests:
memory: 256Mi
cpu: 100m
limits:
memory: 512Mi
volumes:
- name: kb-url
hostPath:
path: /opt/kb-data
type: DirectoryOrCreate
EOF
}

prepare_gm_config_map() {
kb_node_port=$(kubectl -n sedna get svc kb -ojsonpath='{.spec.ports[0].nodePort}')

# here try to get node ip by kubectl
kb_node_ip=$(kubectl get node $KB_NODE_NAME -o jsonpath='{ .status.addresses[?(@.type=="ExternalIP")].address }')
kb_node_internal_ip=$(kubectl get node $KB_NODE_NAME -o jsonpath='{ .status.addresses[?(@.type=="InternalIP")].address }')

KB_ADDRESS=${kb_node_ip:-$kb_node_internal_ip}:$kb_node_port

cm_name=${1:-gm-config}
config_file=${TMP_DIR}/${2:-gm.yaml}

@@ -93,6 +180,8 @@ websocket:
port: 9000
localController:
server: http://localhost:${SEDNA_LC_BIND_PORT:-9100}
knowledgeBaseServer:
server: http://$KB_ADDRESS
EOF
fi

@@ -103,9 +192,7 @@ create_gm() {

cd ${SEDNA_ROOT}

kubectl apply -f build/gm/rbac/

kubectl label node/$GM_NODE_NAME sedna=gm --overwrite
kubectl create -f build/gm/rbac/

cm_name=gm-config
config_file_name=gm.yaml
@@ -145,7 +232,7 @@ spec:
sedna: gm
spec:
nodeSelector:
sedna: gm
sedna: control-plane
serviceAccountName: sedna
containers:
- name: gm
@@ -170,10 +257,8 @@ EOF
delete_gm() {
cd ${SEDNA_ROOT}

# sedna namespace would be deleted in here
kubectl delete -f build/gm/rbac/

kubectl label node/$GM_NODE_NAME sedna- | sed 's/labeled$/un&/' || true

# no need to clean gm deployment alone
}
@@ -269,9 +354,7 @@ check_action() {
}

check_gm_node() {
GM_NODE_NAME=${SEDNA_GM_NODE:-}

check_node() {
if [ -z "$GM_NODE_NAME" ] || ! kubectl get node $GM_NODE_NAME; then
echo "ERROR: $(red_text GM node name \`$GM_NODE_NAME\` does not exist in k8s cluster)!" >&2
echo "You need to specify it by setting $(red_text SEDNA_GM_NODE) environment variable when running this script!" >&2
@@ -282,7 +365,7 @@ check_gm_node() {
do_check() {
check_kubectl
check_action
check_gm_node
check_node
}

show_debug_infos() {
@@ -308,10 +391,11 @@ red_text() {
do_check

prepare

case "$action" in
create)
prepare_install
create_crds
create_kb
create_gm
create_lc
wait_ok
@@ -323,6 +407,7 @@ case "$action" in
delete_gm
delete_lc
delete_crds
cleanup
echo "$(green_text Sedna is uninstalled successfully)"
;;
esac

Loading…
Cancel
Save