From babb8f9bde1dba45f52114f53972a654312d2148 Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Thu, 3 Jun 2021 10:26:43 +0800 Subject: [PATCH] Fix the review comments Signed-off-by: JimmyYang20 --- build/gm/gm-config.yaml | 2 -- pkg/globalmanager/lifelonglearningjob.go | 19 +++++++------------ pkg/globalmanager/upstream.go | 2 +- pkg/localcontroller/manager/dataset.go | 3 +++ pkg/localcontroller/util/util.go | 2 +- 5 files changed, 12 insertions(+), 16 deletions(-) diff --git a/build/gm/gm-config.yaml b/build/gm/gm-config.yaml index c4749b9f..23e00593 100644 --- a/build/gm/gm-config.yaml +++ b/build/gm/gm-config.yaml @@ -6,5 +6,3 @@ websocket: port: 9000 localController: server: http://localhost:9100 -knowledgeBaseService: - server: http://localhost:9020 diff --git a/pkg/globalmanager/lifelonglearningjob.go b/pkg/globalmanager/lifelonglearningjob.go index 2122127d..f224aef4 100644 --- a/pkg/globalmanager/lifelonglearningjob.go +++ b/pkg/globalmanager/lifelonglearningjob.go @@ -57,7 +57,6 @@ var ljControllerKind = sednav1.SchemeGroupVersion.WithKind("LifelongLearningJob" type LifelongLearningJobController struct { kubeClient kubernetes.Interface client sednaclientset.SednaV1alpha1Interface - podControl k8scontroller.PodControlInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. @@ -273,7 +272,7 @@ func (jc *LifelongLearningJobController) sync(key string) (bool, error) { jobFailed := false needUpdated := false - // update conditions of incremental job + // update conditions of lifelonglearning job needUpdated, err = jc.updateLifelongLearningJobConditions(&lifelonglearningjob) if err != nil { klog.V(2).Infof("lifelonglearning job %v/%v faied to be updated, err:%s", lifelonglearningjob.Namespace, lifelonglearningjob.Name, err) @@ -599,6 +598,7 @@ func (jc *LifelongLearningJobController) createPod(job *sednav1.LifelongLearning "WORKER_NAME": "eval-worker-" + utilrand.String(5), "LC_SERVER": jc.cfg.LC.Server, + "KB_SERVER": jc.cfg.KB.Server, } var modelMountURLs []MountURL @@ -676,8 +676,8 @@ func (jc *LifelongLearningJobController) createInferPod(job *sednav1.LifelongLea URL: inferModelURL, Secret: jobSecret, }, - Name: "model", - EnvName: "MODEL_URL", + Name: "models", + EnvName: "MODEL_URLS", }, ) @@ -736,14 +736,9 @@ func NewLifelongLearningJobController(cfg *config.ControllerConfig) (FeatureCont jc := &LifelongLearningJobController{ kubeClient: kubeClient, client: crdclient.SednaV1alpha1(), - podControl: k8scontroller.RealPodControl{ - KubeClient: kubeClient, - Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "lifelonglearningjob-controller"}), - }, - - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultBackOff, MaxBackOff), "lifelonglearningjob"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "lifelonglearningjob-controller"}), - cfg: cfg, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultBackOff, MaxBackOff), "lifelonglearningjob"), + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "lifelonglearningjob-controller"}), + cfg: cfg, } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/globalmanager/upstream.go b/pkg/globalmanager/upstream.go index f28c94ea..13d64483 100644 --- a/pkg/globalmanager/upstream.go +++ b/pkg/globalmanager/upstream.go @@ -384,7 +384,7 @@ func (uc *UpstreamController) appendLifelongLearningJobStatusCondition(name, nam }) } -// updateIncrementalLearningFromEdge syncs the edge updates to k8s +// updateLifelongLearningJobFromEdge syncs the edge updates to k8s func (uc *UpstreamController) updateLifelongLearningJobFromEdge(name, namespace, operation string, content []byte) error { err := checkUpstreamOperation(operation) if err != nil { diff --git a/pkg/localcontroller/manager/dataset.go b/pkg/localcontroller/manager/dataset.go index c1b48cbd..77eafd8d 100644 --- a/pkg/localcontroller/manager/dataset.go +++ b/pkg/localcontroller/manager/dataset.go @@ -239,6 +239,9 @@ func (ds *Dataset) readByLine(url string, format string) (*DataSource, error) { numberOfSamples += len(samples) case DatasetFormatCSV: // the first row of csv file is header + if len(samples) == 0 { + return nil, fmt.Errorf("file %s is empty", url) + } dataSource.Header = samples[0] samples = samples[1:] numberOfSamples += len(samples) diff --git a/pkg/localcontroller/util/util.go b/pkg/localcontroller/util/util.go index 9e0a9935..e61652ba 100644 --- a/pkg/localcontroller/util/util.go +++ b/pkg/localcontroller/util/util.go @@ -54,7 +54,7 @@ func IsDir(path string) bool { } // CopyFile copies a file to other -func CopyFile(dstName, srcName string) (written int64, err error) { +func CopyFile(srcName, dstName string) (written int64, err error) { src, err := os.Open(srcName) if err != nil { klog.Errorf("open file %s failed: %v", srcName, err)