From f88f6b1d50204ca3a36a55c723e6aefb53cb0345 Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Mon, 26 Apr 2021 16:00:15 +0800 Subject: [PATCH] Add shared storage code for LC Signed-off-by: JimmyYang20 --- go.mod | 1 + go.sum | 24 +- pkg/localcontroller/manager/dataset.go | 63 ++++-- .../manager/incrementallearningjob.go | 176 +++++++++++---- pkg/localcontroller/manager/types.go | 11 +- pkg/localcontroller/storage/minio.go | 147 ++++++++++++ pkg/localcontroller/storage/storage.go | 212 ++++++++++++++++++ pkg/localcontroller/util/util.go | 24 ++ 8 files changed, 595 insertions(+), 63 deletions(-) create mode 100644 pkg/localcontroller/storage/minio.go create mode 100644 pkg/localcontroller/storage/storage.go diff --git a/go.mod b/go.mod index c4da80ac..d9ee091d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.14 require ( github.com/emicklei/go-restful/v3 v3.4.0 github.com/gorilla/websocket v1.4.0 + github.com/minio/minio-go/v7 v7.0.10 github.com/onsi/ginkgo v1.11.0 github.com/onsi/gomega v1.7.0 github.com/spf13/cobra v1.0.0 diff --git a/go.sum b/go.sum index 57fab893..81944275 100644 --- a/go.sum +++ b/go.sum @@ -284,6 +284,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I= github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg= github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= @@ -330,6 +331,7 @@ github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -338,6 +340,9 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= +github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= @@ -381,7 +386,14 @@ github.com/mholt/certmagic v0.6.2-0.20190624175158-6a42ef9fe8c2/go.mod h1:g4cOPx github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.4/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mindprince/gonvml v0.0.0-20190828220739-9ebdce4bb989/go.mod h1:2eu9pRWp8mo84xCg6KswZ+USQHjwgRhNp06sozOdsTY= +github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4= +github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= +github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94= +github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo= +github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= +github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -465,6 +477,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uY github.com/robfig/cron v1.1.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rubiojr/go-vhd v0.0.0-20200706105327-02e210299021/go.mod h1:DM5xW0nvfNNm2uytzsvhI3OnX8uzaRAg8UX/CnDqbto= github.com/russross/blackfriday v0.0.0-20170610170232-067529f716f4/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -478,7 +492,9 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= @@ -558,8 +574,9 @@ golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 h1:DZhuSZLsGlFL4CmhA8BcRA0mnthyA/nZ00AqCUo7vHg= +golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -663,8 +680,9 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200327173247-9dae0f8f5775/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4 h1:5/PjkGUjvEU5Gl6BxmvKRPpqo2uNMv4rcHBMwzk/st8= golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo= +golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -771,6 +789,8 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/gcfg.v1 v1.2.0/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww= +gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/mcuadros/go-syslog.v2 v2.2.1/go.mod h1:l5LPIyOOyIdQquNg+oU6Z3524YwrcqEm0aKH+5zpt2U= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= diff --git a/pkg/localcontroller/manager/dataset.go b/pkg/localcontroller/manager/dataset.go index 87e21dc2..4cf55f94 100644 --- a/pkg/localcontroller/manager/dataset.go +++ b/pkg/localcontroller/manager/dataset.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" "k8s.io/klog/v2" @@ -30,6 +31,7 @@ import ( sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/localcontroller/db" "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + "github.com/kubeedge/sedna/pkg/localcontroller/storage" "github.com/kubeedge/sedna/pkg/localcontroller/util" ) @@ -50,8 +52,11 @@ type DatasetManager struct { // Dataset defines config for dataset type Dataset struct { *sednav1.Dataset - DataSource *DataSource `json:"dataSource"` - Done chan struct{} + DataSource *DataSource `json:"dataSource"` + Done chan struct{} + URLPrefix string + Storage storage.Storage + IsLocalStorage bool } // DatasetSpec defines dataset spec @@ -105,6 +110,14 @@ func (dm *DatasetManager) Insert(message *gmclient.Message) error { return err } + credential := dataset.ObjectMeta.Annotations[Credential] + if credential != "" { + dataset.Storage = storage.Storage{} + if err := dataset.Storage.SetCredential(credential); err != nil { + klog.Errorf("dataset(name=%s) sets storage credential failed, error: %+v", name, err) + } + } + if first { go dm.monitorDataSources(name) } @@ -140,11 +153,20 @@ func (dm *DatasetManager) monitorDataSources(name string) { return } - if ds.Spec.URL == "" { - klog.Errorf("dataset(name=%s) empty data source url.", name) + dataURL := ds.Spec.URL + prefix, err := storage.CheckURL(dataURL) + if err != nil { + klog.Errorf("dataset(name=%) has the invalid url, error: %+v", name, err) return } + if prefix == storage.LocalPrefix { + dataURL = util.AddPrefixPath(dm.VolumeMountPrefix, dataURL) + ds.IsLocalStorage = true + } + + ds.URLPrefix = strings.TrimRight(dataURL, filepath.Base(dataURL)) + samplesNumber := 0 for { select { case <-ds.Done: @@ -152,16 +174,16 @@ func (dm *DatasetManager) monitorDataSources(name string) { default: } - dataURL := util.AddPrefixPath(dm.VolumeMountPrefix, filepath.Join(ds.Spec.URL)) - dataSource, err := dm.getDataSource(dataURL, ds.Spec.Format) + dataSource, err := ds.getDataSource(dataURL, ds.Spec.Format) if err != nil { - klog.Errorf("dataset(name=%s) get samples from %s failed", name, dataURL) + klog.Errorf("dataset(name=%s) get samples from %s failed, error: %+v", name, dataURL, err) } else { ds.DataSource = dataSource - - klog.Infof("dataset(name=%s) get samples from data source(url=%s) successfully. number of samples: %d", - name, dataURL, dataSource.NumberOfSamples) - + if samplesNumber != dataSource.NumberOfSamples { + samplesNumber = dataSource.NumberOfSamples + klog.Infof("dataset(name=%s) get samples from data source(url=%s) successfully. number of samples: %d", + name, dataURL, dataSource.NumberOfSamples) + } header := gmclient.MessageHeader{ Namespace: ds.Namespace, ResourceKind: ds.Kind, @@ -174,7 +196,7 @@ func (dm *DatasetManager) monitorDataSources(name string) { }{ dataSource.NumberOfSamples, }, header); err != nil { - klog.Errorf("dataset(name=%s) publish samples info failed", name) + klog.Errorf("dataset(name=%s) publish samples info failed, error: %+v", name, err) } } <-time.After(MonitorDataSourceIntervalSeconds * time.Second) @@ -182,16 +204,21 @@ func (dm *DatasetManager) monitorDataSources(name string) { } // getDataSource gets data source info -func (dm *DatasetManager) getDataSource(dataURL string, format string) (*DataSource, error) { +func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, error) { + localURL, err := ds.Storage.Download(dataURL, "") + if err != nil { + return nil, err + } + switch format { case "txt": - return dm.readByLine(dataURL) + return ds.readByLine(localURL) } return nil, fmt.Errorf("not vaild file format") } // readByLine reads file by line -func (dm *DatasetManager) readByLine(url string) (*DataSource, error) { +func (ds *Dataset) readByLine(url string) (*DataSource, error) { samples, err := getSamples(url) if err != nil { klog.Errorf("read file %s failed, error: %v", url, err) @@ -206,6 +233,12 @@ func (dm *DatasetManager) readByLine(url string) (*DataSource, error) { NumberOfSamples: numberOfSamples, } + if !ds.IsLocalStorage { + if err := os.Remove(url); err != nil { + return nil, err + } + } + return &dataSource, nil } diff --git a/pkg/localcontroller/manager/incrementallearningjob.go b/pkg/localcontroller/manager/incrementallearningjob.go index 41f8854c..7faba736 100644 --- a/pkg/localcontroller/manager/incrementallearningjob.go +++ b/pkg/localcontroller/manager/incrementallearningjob.go @@ -23,6 +23,7 @@ import ( "os" "path" "strconv" + "strings" "sync" "time" @@ -32,6 +33,7 @@ import ( sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/localcontroller/db" "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + "github.com/kubeedge/sedna/pkg/localcontroller/storage" "github.com/kubeedge/sedna/pkg/localcontroller/trigger" "github.com/kubeedge/sedna/pkg/localcontroller/util" ) @@ -42,6 +44,7 @@ type IncrementalLearningJob struct { JobConfig *JobConfig Dataset *Dataset Done chan struct{} + Storage storage.Storage } // JobConfig defines config for incremental-learning-job @@ -63,6 +66,7 @@ type JobConfig struct { DeployModel *ModelInfo EvalResult []*ModelInfo Lock sync.Mutex + IsLocalStorage bool } // OutputConfig defines config for job output @@ -348,6 +352,14 @@ func (im *IncrementalJobManager) Insert(message *gmclient.Message) error { return err } + credential := job.ObjectMeta.Annotations[Credential] + if credential != "" { + job.Storage = storage.Storage{} + if err := job.Storage.SetCredential(credential); err != nil { + klog.Errorf("job(name=%s) sets storage credential failed, error: %+v", name, err) + } + } + if first { go im.startJob(name) } @@ -379,7 +391,6 @@ func (im *IncrementalJobManager) Delete(message *gmclient.Message) error { // initJob inits the job object func (im *IncrementalJobManager) initJob(job *IncrementalLearningJob) error { jobConfig := job.JobConfig - jobConfig.OutputDir = util.AddPrefixPath(im.VolumeMountPrefix, job.Spec.OutputDir) jobConfig.TrainModel = new(TrainModel) jobConfig.TrainModel.OutputURL = jobConfig.OutputDir jobConfig.DeployModel = new(ModelInfo) @@ -400,6 +411,20 @@ func (im *IncrementalJobManager) initJob(job *IncrementalLearningJob) error { jobConfig.TrainTrigger = trainTrigger jobConfig.DeployTrigger = deployTrigger + outputDir := job.Spec.OutputDir + + prefix, err := storage.CheckURL(outputDir) + if err != nil { + return fmt.Errorf("outout dir is unvalid, error: %+v", err) + } + + jobConfig.IsLocalStorage = false + jobConfig.OutputDir = outputDir + if prefix == storage.LocalPrefix { + jobConfig.IsLocalStorage = true + jobConfig.OutputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir) + } + if err := createOutputDir(jobConfig); err != nil { return err } @@ -440,8 +465,9 @@ func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) ( jobConfig.Version++ - jobConfig.TrainDataURL, err = im.writeSamples(jobConfig.DataSamples.TrainSamples, - jobConfig.OutputConfig.SamplesOutput["train"], jobConfig.Version, job.Dataset.Spec.Format) + var dataIndexURL string + jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples, + jobConfig.OutputConfig.SamplesOutput["train"], jobConfig.Version, job.Dataset.Spec.Format, job.Dataset.URLPrefix) if err != nil { klog.Errorf("train phase: write samples to the file(%s) is failed, error: %v", jobConfig.TrainDataURL, err) return nil, false, err @@ -452,11 +478,21 @@ func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) ( Format: format, URL: jobConfig.TrainModel.TrainedModel[format], } + + dataURL := jobConfig.TrainDataURL + if jobConfig.IsLocalStorage { + dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.TrainDataURL) + if !job.Dataset.IsLocalStorage { + dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL) + } + } + input := WorkerInput{ - Models: []ModelInfo{m}, - DataURL: util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.TrainDataURL), + Models: []ModelInfo{m}, + DataURL: dataURL, + DataIndexURL: dataIndexURL, OutputDir: util.TrimPrefixPath(im.VolumeMountPrefix, - path.Join(jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version))), + strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version)}, "/")), } msg := UpstreamMessage{ Phase: TrainPhase, @@ -472,8 +508,9 @@ func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (* jobConfig := job.JobConfig var err error - (*jobConfig).EvalDataURL, err = im.writeSamples(jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"], - jobConfig.Version, job.Dataset.Spec.Format) + var dataIndexURL string + jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"], + jobConfig.Version, job.Dataset.Spec.Format, job.Dataset.URLPrefix) if err != nil { klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v", jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err) @@ -491,11 +528,20 @@ func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (* URL: jobConfig.DeployModel.URL, }) + dataURL := jobConfig.EvalDataURL + if jobConfig.IsLocalStorage { + dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.EvalDataURL) + if !job.Dataset.IsLocalStorage { + dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL) + } + } + input := WorkerInput{ - Models: models, - DataURL: util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.EvalDataURL), + Models: models, + DataURL: dataURL, + DataIndexURL: dataIndexURL, OutputDir: util.TrimPrefixPath(im.VolumeMountPrefix, - path.Join(jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version))), + strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version)}, "/")), } msg := &UpstreamMessage{ Phase: EvalPhase, @@ -567,10 +613,8 @@ func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*Mode trainedModel := util.AddPrefixPath(im.VolumeMountPrefix, models[0].URL) deployModel := util.AddPrefixPath(im.VolumeMountPrefix, models[1].URL) - if _, err = util.CopyFile(trainedModel, deployModel); err != nil { - return nil, fmt.Errorf("failed to copy the trained model file(url=%s) to the deployment model file(url=%s): %v", - trainedModel, deployModel, - err) + if err = job.updateDeployModel(deployModel, trainedModel); err != nil { + return nil, err } jobConfig.DeployModel.Format = models[1].Format @@ -581,30 +625,43 @@ func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*Mode return &models[0], nil } +func (job *IncrementalLearningJob) updateDeployModel(deployModel string, objectModel string) error { + if _, err := job.Storage.Download(objectModel, deployModel); err != nil { + return fmt.Errorf("copy model file(url=%s) to the deployment model file failed(url=%s): %v", + objectModel, deployModel, err) + } + + return nil +} + // createOutputDir creates the job output dir func createOutputDir(jobConfig *JobConfig) error { - if err := util.CreateFolder(jobConfig.OutputDir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, jobConfig.OutputDir) - return err - } + outputDir := jobConfig.OutputDir dirNames := []string{"data/train", "data/eval", "train", "eval"} - for _, v := range dirNames { - dir := path.Join(jobConfig.OutputDir, v) - if err := util.CreateFolder(dir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir) + if jobConfig.IsLocalStorage { + if err := util.CreateFolder(outputDir); err != nil { + klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir) return err } + + for _, v := range dirNames { + dir := path.Join(outputDir, v) + if err := util.CreateFolder(dir); err != nil { + klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir) + return err + } + } } outputConfig := OutputConfig{ SamplesOutput: map[string]string{ - "train": path.Join(jobConfig.OutputDir, dirNames[0]), - "eval": path.Join(jobConfig.OutputDir, dirNames[1]), + "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"), + "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"), }, - TrainOutput: path.Join(jobConfig.OutputDir, dirNames[2]), - EvalOutput: path.Join(jobConfig.OutputDir, dirNames[3]), + TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"), + EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"), } jobConfig.OutputConfig = &outputConfig @@ -735,32 +792,67 @@ func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) { } } -// createFile creates a file -func createFile(dir string, format string) (string, bool) { +// createFile creates data file and data index file +func createFile(dir string, format string, isLocalStorage bool) (string, string) { switch format { case "txt": - return path.Join(dir, "data.txt"), true + if isLocalStorage { + return path.Join(dir, "data.txt"), "" + } + return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/") } - return "", false + return "", "" } // writeSamples writes samples information to a file -func (im *IncrementalJobManager) writeSamples(samples []string, dir string, version int, format string) (string, error) { - subDir := path.Join(dir, strconv.Itoa(version)) - if err := util.CreateFolder(subDir); err != nil { - return "", err - } +func (im *IncrementalJobManager) writeSamples(job *IncrementalLearningJob, samples []string, dir string, version int, format string, urlPrefix string) (string, string, error) { + subDir := strings.Join([]string{dir, strconv.Itoa(version)}, "/") + fileURL, absURLFile := createFile(subDir, format, job.Dataset.IsLocalStorage) - fileURL, isFile := createFile(subDir, format) - if isFile { + if job.JobConfig.IsLocalStorage { + if err := util.CreateFolder(subDir); err != nil { + return "", "", err + } if err := im.writeByLine(samples, fileURL); err != nil { - return "", err + return "", "", err } - } else { - return "", fmt.Errorf("create a %s format file in %s failed", format, subDir) + + if !job.Dataset.IsLocalStorage { + tempSamples := util.ParsingDatasetIndex(samples, urlPrefix) + if err := im.writeByLine(tempSamples, absURLFile); err != nil { + return "", "", err + } + } + + return fileURL, absURLFile, nil + } + + temporaryDir, err := util.CreateTemporaryDir() + if err != nil { + return "", "", err + } + + localFileURL, localAbsURLFile := createFile(temporaryDir, format, job.Dataset.IsLocalStorage) + + if err := im.writeByLine(samples, localFileURL); err != nil { + return "", "", err + } + + if err := job.Storage.Upload(localFileURL, fileURL); err != nil { + return "", "", err + } + + tempSamples := util.ParsingDatasetIndex(samples, urlPrefix) + + if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil { + return "", "", err + } + + if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil { + return "", "", err } - return fileURL, nil + return fileURL, absURLFile, nil } // writeByLine writes file by line diff --git a/pkg/localcontroller/manager/types.go b/pkg/localcontroller/manager/types.go index 84b38d28..b3836568 100644 --- a/pkg/localcontroller/manager/types.go +++ b/pkg/localcontroller/manager/types.go @@ -45,6 +45,9 @@ const ( TriggerReadyStatus = "ready" // TriggerCompletedStatus is the completed status about trigger in incremental-learning-job TriggerCompletedStatus = "completed" + + // Credential is credential of the storage service + Credential = "sedna.io/credential" ) // WorkerMessage defines message struct from worker @@ -82,10 +85,10 @@ type UpstreamMessage struct { type WorkerInput struct { // Only one model cases - Models []ModelInfo `json:"models,omitempty"` - - DataURL string `json:"dataURL,omitempty"` - OutputDir string `json:"outputDir,omitempty"` + Models []ModelInfo `json:"models,omitempty"` + DataURL string `json:"dataURL,omitempty"` + DataIndexURL string `json:"dataIndexURL,omitempty"` + OutputDir string `json:"outputDir,omitempty"` } // WorkerOutput defines output information of worker diff --git a/pkg/localcontroller/storage/minio.go b/pkg/localcontroller/storage/minio.go new file mode 100644 index 00000000..a4453ff4 --- /dev/null +++ b/pkg/localcontroller/storage/minio.go @@ -0,0 +1,147 @@ +/* +Copyright 2021 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + "fmt" + "net/url" + "strings" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/sedna/pkg/localcontroller/util" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +// MinioClient defines a minio client +type MinioClient struct { + Client *minio.Client +} + +// MaxTimeOut is max deadline time of client working +const MaxTimeOut = 100 * time.Second + +// createMinioClient creates client +func createMinioClient(endpoint string, useHTTPS string, accessKeyID string, secretAccessKey string) (*MinioClient, error) { + token := "" + useSSL := true + if useHTTPS == "0" { + useSSL = false + } + + client, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, token), + Secure: useSSL, + }) + if err != nil { + return nil, fmt.Errorf("initialize minio client failed, endpoint = %+v, error: %+v", endpoint, err) + } + + c := MinioClient{ + Client: client, + } + + return &c, nil +} + +// uploadFile uploads file from local host to storage service +func (mc *MinioClient) uploadFile(localPath string, objectURL string) error { + bucket, absPath, err := mc.parseURL(objectURL) + if err != nil { + return err + } + + if !util.IsExists(localPath) { + return fmt.Errorf("file(%s) in the local host is not exists", localPath) + } + + ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut) + defer cancel() + + if _, err = mc.Client.FPutObject(ctx, bucket, absPath, localPath, minio.PutObjectOptions{}); err != nil { + return fmt.Errorf("upload file from file path(%s) to file url(%s) failed, error: %+v", localPath, objectURL, err) + } + + return nil +} + +// copyFile copies file from the bucket to another bucket in storage service +func (mc *MinioClient) copyFile(srcURL string, objectURL string) error { + srcBucket, srcAbsPath, err := mc.parseURL(srcURL) + if err != nil { + return err + } + srcOptions := minio.CopySrcOptions{ + Bucket: srcBucket, + Object: srcAbsPath, + } + + objectBucket, objectAbsPath, err := mc.parseURL(objectURL) + if err != nil { + return err + } + objectOptions := minio.CopyDestOptions{ + Bucket: objectBucket, + Object: objectAbsPath, + } + + ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut) + defer cancel() + if _, err := mc.Client.CopyObject(ctx, objectOptions, srcOptions); err != nil { + return fmt.Errorf("copy file from file url(%s) to file url(%s) failed, error: %+v", srcURL, objectURL, err) + } + + return nil +} + +// DownloadFile downloads file from storage service to the local host +func (mc *MinioClient) downloadFile(objectURL string, localPath string) error { + bucket, absPath, err := mc.parseURL(objectURL) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut) + defer cancel() + + if err = mc.Client.FGetObject(ctx, bucket, absPath, localPath, minio.GetObjectOptions{}); err != nil { + return fmt.Errorf("download file from file url(%s) to file path(%s) failed, error: %+v", objectURL, localPath, err) + } + + return nil +} + +// parseURL parses url +func (mc *MinioClient) parseURL(URL string) (string, string, error) { + u, err := url.Parse(URL) + if err != nil { + return "", "", fmt.Errorf("invalid url(%s)", URL) + } + + scheme := u.Scheme + switch scheme { + case S3Prefix: + return u.Host, strings.TrimPrefix(u.Path, "/"), nil + default: + klog.Errorf("invalid scheme(%s)", scheme) + } + + return "", "", fmt.Errorf("invalid url(%s)", URL) +} diff --git a/pkg/localcontroller/storage/storage.go b/pkg/localcontroller/storage/storage.go new file mode 100644 index 00000000..225d522f --- /dev/null +++ b/pkg/localcontroller/storage/storage.go @@ -0,0 +1,212 @@ +/* +Copyright 2021 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "encoding/json" + "fmt" + "net/url" + "path" + "path/filepath" + + "k8s.io/klog/v2" + + "github.com/kubeedge/sedna/pkg/localcontroller/util" +) + +const ( + // S3Prefix defines that prefix of url is s3 + S3Prefix = "s3" + // LocalPrefix defines that prefix of url is local host + LocalPrefix = "" + // S3EndPoint is s3 endpoint of the storage service + S3Endpoint = "s3-endpoint" + // S3UseHTTPS determines whether to use HTTPS protocol + S3UseHTTPS = "s3-usehttps" + // AccessKeyId is access key id of the storage service + AccessKeyID = "ACCESS_KEY_ID" + // SecretAccessKey is secret access key of the storage service + SecretAccessKey = "SECRET_ACCESS_KEY" +) + +type Storage struct { + MinioClient *MinioClient +} + +// Download downloads the file to the local host +func (s *Storage) Download(objectURL string, localPath string) (string, error) { + prefix, err := CheckURL(objectURL) + if err != nil { + return "", err + } + + switch prefix { + case LocalPrefix: + return s.localCopy(objectURL, localPath) + case S3Prefix: + return s.downloadS3(objectURL, localPath) + default: + return "", fmt.Errorf("invalid url(%s)", objectURL) + } +} + +// downloadLocal copies the local file to another in local host +func (s *Storage) localCopy(objectURL string, localPath string) (string, error) { + if !util.IsExists(objectURL) { + return "", fmt.Errorf("url(%s) is not exists", objectURL) + } + + if localPath == "" { + return objectURL, nil + } + + dir := path.Dir(localPath) + if !util.IsDir(dir) { + util.CreateFolder(dir) + } + + util.CopyFile(objectURL, localPath) + + return localPath, nil +} + +// downloadS3 downloads the file from url of s3 to the local host +func (s *Storage) downloadS3(objectURL string, localPath string) (string, error) { + if localPath == "" { + temporaryDir, err := util.CreateTemporaryDir() + if err != nil { + return "", err + } + + localPath = path.Join(temporaryDir, filepath.Base(objectURL)) + } + + dir := path.Dir(localPath) + if !util.IsDir(dir) { + util.CreateFolder(dir) + } + + if err := s.MinioClient.downloadFile(objectURL, localPath); err != nil { + return "", err + } + + return localPath, nil +} + +// checkMapKey checks whether key exists in the dict +func checkMapKey(m map[string]string, key string) (string, error) { + v, ok := m[key] + if !ok { + return "", fmt.Errorf("%s is empty", key) + } + return v, nil +} + +// SetCredential sets credential of the storage service +func (s *Storage) SetCredential(credential string) error { + c := credential + m := make(map[string]string) + if err := json.Unmarshal([]byte(c), &m); err != nil { + return err + } + + endpoint, err := checkMapKey(m, S3Endpoint) + if err != nil { + return err + } + + useHTTPS, err := checkMapKey(m, S3UseHTTPS) + if err != nil { + useHTTPS = "1" + } + + ak, err := checkMapKey(m, AccessKeyID) + if err != nil { + return err + } + + sk, err := checkMapKey(m, SecretAccessKey) + if err != nil { + return err + } + + mc, err := createMinioClient(endpoint, useHTTPS, ak, sk) + if err != nil { + return err + } + + s.MinioClient = mc + + return nil +} + +// Upload uploads the file in local host to another(e.g., "s3") +func (s *Storage) Upload(localPath string, objectURL string) error { + prefix, err := CheckURL(objectURL) + if err != nil { + return err + } + + switch prefix { + case S3Prefix: + return s.uploadS3(localPath, objectURL) + default: + return fmt.Errorf("invalid url(%s)", objectURL) + } +} + +// uploadS3 uploads the file in local host to the url of s3 +func (s *Storage) uploadS3(srcURL string, objectURL string) error { + prefix, err := CheckURL(srcURL) + if err != nil { + return err + } + switch prefix { + case LocalPrefix: + if err := s.MinioClient.uploadFile(srcURL, objectURL); err != nil { + return err + } + case S3Prefix: + if err := s.MinioClient.copyFile(srcURL, objectURL); err != nil { + return err + } + } + return nil +} + +// CheckURL checks prefix of the url +func CheckURL(objectURL string) (string, error) { + if objectURL == "" { + return "", fmt.Errorf("empty url") + } + + u, err := url.Parse(objectURL) + if err != nil { + klog.Errorf("invalid url(%s), error: %v", objectURL, err) + return "", fmt.Errorf("invalid url(%s)", objectURL) + } + + l := []string{LocalPrefix, S3Prefix} + + for _, v := range l { + if u.Scheme == v { + return u.Scheme, nil + } + } + + return "", fmt.Errorf("unvalid url(%s), not support prefix(%s)", objectURL, u.Scheme) +} diff --git a/pkg/localcontroller/util/util.go b/pkg/localcontroller/util/util.go index f79d48a0..e44fa40d 100644 --- a/pkg/localcontroller/util/util.go +++ b/pkg/localcontroller/util/util.go @@ -19,9 +19,13 @@ package util import ( "fmt" "io" + "math/rand" "os" + "path" "path/filepath" + "strconv" "strings" + "time" "k8s.io/klog/v2" ) @@ -105,3 +109,23 @@ func AddPrefixPath(prefix string, path string) string { func GetUniqueIdentifier(namespace string, name string, kind string) string { return fmt.Sprintf("%s/%s/%s", namespace, kind, name) } + +// CreateTemporaryDir creates a temporary dir +func CreateTemporaryDir() (string, error) { + var src = rand.NewSource(time.Now().UnixNano()) + dir := path.Join("/tmp/", strconv.FormatInt(src.Int63(), 10), "/") + err := CreateFolder(dir) + return dir, err +} + +// ParsingDatasetIndex parses index file of dataset and adds the prefix to abs url of sample +func ParsingDatasetIndex(samples []string, prefix string) []string { + var l []string + l = append(l, prefix) + for _, v := range samples { + absURL := strings.Split(v, " ")[0] + l = append(l, absURL) + } + + return l +}