Browse Source

Add shared storage code for LC

Signed-off-by: JimmyYang20 <yangjin39@huawei.com>
tags/v0.2.0
JimmyYang20 4 years ago
parent
commit
f88f6b1d50
8 changed files with 595 additions and 63 deletions
  1. +1
    -0
      go.mod
  2. +22
    -2
      go.sum
  3. +48
    -15
      pkg/localcontroller/manager/dataset.go
  4. +134
    -42
      pkg/localcontroller/manager/incrementallearningjob.go
  5. +7
    -4
      pkg/localcontroller/manager/types.go
  6. +147
    -0
      pkg/localcontroller/storage/minio.go
  7. +212
    -0
      pkg/localcontroller/storage/storage.go
  8. +24
    -0
      pkg/localcontroller/util/util.go

+ 1
- 0
go.mod View File

@@ -5,6 +5,7 @@ go 1.14
require (
github.com/emicklei/go-restful/v3 v3.4.0
github.com/gorilla/websocket v1.4.0
github.com/minio/minio-go/v7 v7.0.10
github.com/onsi/ginkgo v1.11.0
github.com/onsi/gomega v1.7.0
github.com/spf13/cobra v1.0.0


+ 22
- 2
go.sum View File

@@ -284,6 +284,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
@@ -330,6 +331,7 @@ github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
@@ -338,6 +340,9 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
@@ -381,7 +386,14 @@ github.com/mholt/certmagic v0.6.2-0.20190624175158-6a42ef9fe8c2/go.mod h1:g4cOPx
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.4/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mindprince/gonvml v0.0.0-20190828220739-9ebdce4bb989/go.mod h1:2eu9pRWp8mo84xCg6KswZ+USQHjwgRhNp06sozOdsTY=
github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94=
github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
@@ -465,6 +477,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uY
github.com/robfig/cron v1.1.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rubiojr/go-vhd v0.0.0-20200706105327-02e210299021/go.mod h1:DM5xW0nvfNNm2uytzsvhI3OnX8uzaRAg8UX/CnDqbto=
github.com/russross/blackfriday v0.0.0-20170610170232-067529f716f4/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
@@ -478,7 +492,9 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -558,8 +574,9 @@ golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 h1:DZhuSZLsGlFL4CmhA8BcRA0mnthyA/nZ00AqCUo7vHg=
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -663,8 +680,9 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200327173247-9dae0f8f5775/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4 h1:5/PjkGUjvEU5Gl6BxmvKRPpqo2uNMv4rcHBMwzk/st8=
golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
@@ -771,6 +789,8 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/gcfg.v1 v1.2.0/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/mcuadros/go-syslog.v2 v2.2.1/go.mod h1:l5LPIyOOyIdQquNg+oU6Z3524YwrcqEm0aKH+5zpt2U=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=


+ 48
- 15
pkg/localcontroller/manager/dataset.go View File

@@ -22,6 +22,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"k8s.io/klog/v2"
@@ -30,6 +31,7 @@ import (
sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
"github.com/kubeedge/sedna/pkg/localcontroller/db"
"github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
"github.com/kubeedge/sedna/pkg/localcontroller/storage"
"github.com/kubeedge/sedna/pkg/localcontroller/util"
)

@@ -50,8 +52,11 @@ type DatasetManager struct {
// Dataset defines config for dataset
type Dataset struct {
*sednav1.Dataset
DataSource *DataSource `json:"dataSource"`
Done chan struct{}
DataSource *DataSource `json:"dataSource"`
Done chan struct{}
URLPrefix string
Storage storage.Storage
IsLocalStorage bool
}

// DatasetSpec defines dataset spec
@@ -105,6 +110,14 @@ func (dm *DatasetManager) Insert(message *gmclient.Message) error {
return err
}

credential := dataset.ObjectMeta.Annotations[Credential]
if credential != "" {
dataset.Storage = storage.Storage{}
if err := dataset.Storage.SetCredential(credential); err != nil {
klog.Errorf("dataset(name=%s) sets storage credential failed, error: %+v", name, err)
}
}

if first {
go dm.monitorDataSources(name)
}
@@ -140,11 +153,20 @@ func (dm *DatasetManager) monitorDataSources(name string) {
return
}

if ds.Spec.URL == "" {
klog.Errorf("dataset(name=%s) empty data source url.", name)
dataURL := ds.Spec.URL
prefix, err := storage.CheckURL(dataURL)
if err != nil {
klog.Errorf("dataset(name=%) has the invalid url, error: %+v", name, err)
return
}

if prefix == storage.LocalPrefix {
dataURL = util.AddPrefixPath(dm.VolumeMountPrefix, dataURL)
ds.IsLocalStorage = true
}

ds.URLPrefix = strings.TrimRight(dataURL, filepath.Base(dataURL))
samplesNumber := 0
for {
select {
case <-ds.Done:
@@ -152,16 +174,16 @@ func (dm *DatasetManager) monitorDataSources(name string) {
default:
}

dataURL := util.AddPrefixPath(dm.VolumeMountPrefix, filepath.Join(ds.Spec.URL))
dataSource, err := dm.getDataSource(dataURL, ds.Spec.Format)
dataSource, err := ds.getDataSource(dataURL, ds.Spec.Format)
if err != nil {
klog.Errorf("dataset(name=%s) get samples from %s failed", name, dataURL)
klog.Errorf("dataset(name=%s) get samples from %s failed, error: %+v", name, dataURL, err)
} else {
ds.DataSource = dataSource

klog.Infof("dataset(name=%s) get samples from data source(url=%s) successfully. number of samples: %d",
name, dataURL, dataSource.NumberOfSamples)

if samplesNumber != dataSource.NumberOfSamples {
samplesNumber = dataSource.NumberOfSamples
klog.Infof("dataset(name=%s) get samples from data source(url=%s) successfully. number of samples: %d",
name, dataURL, dataSource.NumberOfSamples)
}
header := gmclient.MessageHeader{
Namespace: ds.Namespace,
ResourceKind: ds.Kind,
@@ -174,7 +196,7 @@ func (dm *DatasetManager) monitorDataSources(name string) {
}{
dataSource.NumberOfSamples,
}, header); err != nil {
klog.Errorf("dataset(name=%s) publish samples info failed", name)
klog.Errorf("dataset(name=%s) publish samples info failed, error: %+v", name, err)
}
}
<-time.After(MonitorDataSourceIntervalSeconds * time.Second)
@@ -182,16 +204,21 @@ func (dm *DatasetManager) monitorDataSources(name string) {
}

// getDataSource gets data source info
func (dm *DatasetManager) getDataSource(dataURL string, format string) (*DataSource, error) {
func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, error) {
localURL, err := ds.Storage.Download(dataURL, "")
if err != nil {
return nil, err
}

switch format {
case "txt":
return dm.readByLine(dataURL)
return ds.readByLine(localURL)
}
return nil, fmt.Errorf("not vaild file format")
}

// readByLine reads file by line
func (dm *DatasetManager) readByLine(url string) (*DataSource, error) {
func (ds *Dataset) readByLine(url string) (*DataSource, error) {
samples, err := getSamples(url)
if err != nil {
klog.Errorf("read file %s failed, error: %v", url, err)
@@ -206,6 +233,12 @@ func (dm *DatasetManager) readByLine(url string) (*DataSource, error) {
NumberOfSamples: numberOfSamples,
}

if !ds.IsLocalStorage {
if err := os.Remove(url); err != nil {
return nil, err
}
}

return &dataSource, nil
}



+ 134
- 42
pkg/localcontroller/manager/incrementallearningjob.go View File

@@ -23,6 +23,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"
"time"

@@ -32,6 +33,7 @@ import (
sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
"github.com/kubeedge/sedna/pkg/localcontroller/db"
"github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
"github.com/kubeedge/sedna/pkg/localcontroller/storage"
"github.com/kubeedge/sedna/pkg/localcontroller/trigger"
"github.com/kubeedge/sedna/pkg/localcontroller/util"
)
@@ -42,6 +44,7 @@ type IncrementalLearningJob struct {
JobConfig *JobConfig
Dataset *Dataset
Done chan struct{}
Storage storage.Storage
}

// JobConfig defines config for incremental-learning-job
@@ -63,6 +66,7 @@ type JobConfig struct {
DeployModel *ModelInfo
EvalResult []*ModelInfo
Lock sync.Mutex
IsLocalStorage bool
}

// OutputConfig defines config for job output
@@ -348,6 +352,14 @@ func (im *IncrementalJobManager) Insert(message *gmclient.Message) error {
return err
}

credential := job.ObjectMeta.Annotations[Credential]
if credential != "" {
job.Storage = storage.Storage{}
if err := job.Storage.SetCredential(credential); err != nil {
klog.Errorf("job(name=%s) sets storage credential failed, error: %+v", name, err)
}
}

if first {
go im.startJob(name)
}
@@ -379,7 +391,6 @@ func (im *IncrementalJobManager) Delete(message *gmclient.Message) error {
// initJob inits the job object
func (im *IncrementalJobManager) initJob(job *IncrementalLearningJob) error {
jobConfig := job.JobConfig
jobConfig.OutputDir = util.AddPrefixPath(im.VolumeMountPrefix, job.Spec.OutputDir)
jobConfig.TrainModel = new(TrainModel)
jobConfig.TrainModel.OutputURL = jobConfig.OutputDir
jobConfig.DeployModel = new(ModelInfo)
@@ -400,6 +411,20 @@ func (im *IncrementalJobManager) initJob(job *IncrementalLearningJob) error {
jobConfig.TrainTrigger = trainTrigger
jobConfig.DeployTrigger = deployTrigger

outputDir := job.Spec.OutputDir

prefix, err := storage.CheckURL(outputDir)
if err != nil {
return fmt.Errorf("outout dir is unvalid, error: %+v", err)
}

jobConfig.IsLocalStorage = false
jobConfig.OutputDir = outputDir
if prefix == storage.LocalPrefix {
jobConfig.IsLocalStorage = true
jobConfig.OutputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
}

if err := createOutputDir(jobConfig); err != nil {
return err
}
@@ -440,8 +465,9 @@ func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) (

jobConfig.Version++

jobConfig.TrainDataURL, err = im.writeSamples(jobConfig.DataSamples.TrainSamples,
jobConfig.OutputConfig.SamplesOutput["train"], jobConfig.Version, job.Dataset.Spec.Format)
var dataIndexURL string
jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
jobConfig.OutputConfig.SamplesOutput["train"], jobConfig.Version, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
if err != nil {
klog.Errorf("train phase: write samples to the file(%s) is failed, error: %v", jobConfig.TrainDataURL, err)
return nil, false, err
@@ -452,11 +478,21 @@ func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) (
Format: format,
URL: jobConfig.TrainModel.TrainedModel[format],
}

dataURL := jobConfig.TrainDataURL
if jobConfig.IsLocalStorage {
dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.TrainDataURL)
if !job.Dataset.IsLocalStorage {
dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
}
}

input := WorkerInput{
Models: []ModelInfo{m},
DataURL: util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.TrainDataURL),
Models: []ModelInfo{m},
DataURL: dataURL,
DataIndexURL: dataIndexURL,
OutputDir: util.TrimPrefixPath(im.VolumeMountPrefix,
path.Join(jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version))),
strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version)}, "/")),
}
msg := UpstreamMessage{
Phase: TrainPhase,
@@ -472,8 +508,9 @@ func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (*
jobConfig := job.JobConfig
var err error

(*jobConfig).EvalDataURL, err = im.writeSamples(jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
jobConfig.Version, job.Dataset.Spec.Format)
var dataIndexURL string
jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
jobConfig.Version, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
if err != nil {
klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v",
jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
@@ -491,11 +528,20 @@ func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (*
URL: jobConfig.DeployModel.URL,
})

dataURL := jobConfig.EvalDataURL
if jobConfig.IsLocalStorage {
dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.EvalDataURL)
if !job.Dataset.IsLocalStorage {
dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
}
}

input := WorkerInput{
Models: models,
DataURL: util.TrimPrefixPath(im.VolumeMountPrefix, jobConfig.EvalDataURL),
Models: models,
DataURL: dataURL,
DataIndexURL: dataIndexURL,
OutputDir: util.TrimPrefixPath(im.VolumeMountPrefix,
path.Join(jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version))),
strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version)}, "/")),
}
msg := &UpstreamMessage{
Phase: EvalPhase,
@@ -567,10 +613,8 @@ func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*Mode
trainedModel := util.AddPrefixPath(im.VolumeMountPrefix, models[0].URL)
deployModel := util.AddPrefixPath(im.VolumeMountPrefix, models[1].URL)

if _, err = util.CopyFile(trainedModel, deployModel); err != nil {
return nil, fmt.Errorf("failed to copy the trained model file(url=%s) to the deployment model file(url=%s): %v",
trainedModel, deployModel,
err)
if err = job.updateDeployModel(deployModel, trainedModel); err != nil {
return nil, err
}

jobConfig.DeployModel.Format = models[1].Format
@@ -581,30 +625,43 @@ func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*Mode
return &models[0], nil
}

func (job *IncrementalLearningJob) updateDeployModel(deployModel string, objectModel string) error {
if _, err := job.Storage.Download(objectModel, deployModel); err != nil {
return fmt.Errorf("copy model file(url=%s) to the deployment model file failed(url=%s): %v",
objectModel, deployModel, err)
}

return nil
}

// createOutputDir creates the job output dir
func createOutputDir(jobConfig *JobConfig) error {
if err := util.CreateFolder(jobConfig.OutputDir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, jobConfig.OutputDir)
return err
}
outputDir := jobConfig.OutputDir

dirNames := []string{"data/train", "data/eval", "train", "eval"}

for _, v := range dirNames {
dir := path.Join(jobConfig.OutputDir, v)
if err := util.CreateFolder(dir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir)
if jobConfig.IsLocalStorage {
if err := util.CreateFolder(outputDir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir)
return err
}

for _, v := range dirNames {
dir := path.Join(outputDir, v)
if err := util.CreateFolder(dir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir)
return err
}
}
}

outputConfig := OutputConfig{
SamplesOutput: map[string]string{
"train": path.Join(jobConfig.OutputDir, dirNames[0]),
"eval": path.Join(jobConfig.OutputDir, dirNames[1]),
"train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
"eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
},
TrainOutput: path.Join(jobConfig.OutputDir, dirNames[2]),
EvalOutput: path.Join(jobConfig.OutputDir, dirNames[3]),
TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
}
jobConfig.OutputConfig = &outputConfig

@@ -735,32 +792,67 @@ func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) {
}
}

// createFile creates a file
func createFile(dir string, format string) (string, bool) {
// createFile creates data file and data index file
func createFile(dir string, format string, isLocalStorage bool) (string, string) {
switch format {
case "txt":
return path.Join(dir, "data.txt"), true
if isLocalStorage {
return path.Join(dir, "data.txt"), ""
}
return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
}
return "", false
return "", ""
}

// writeSamples writes samples information to a file
func (im *IncrementalJobManager) writeSamples(samples []string, dir string, version int, format string) (string, error) {
subDir := path.Join(dir, strconv.Itoa(version))
if err := util.CreateFolder(subDir); err != nil {
return "", err
}
func (im *IncrementalJobManager) writeSamples(job *IncrementalLearningJob, samples []string, dir string, version int, format string, urlPrefix string) (string, string, error) {
subDir := strings.Join([]string{dir, strconv.Itoa(version)}, "/")
fileURL, absURLFile := createFile(subDir, format, job.Dataset.IsLocalStorage)

fileURL, isFile := createFile(subDir, format)
if isFile {
if job.JobConfig.IsLocalStorage {
if err := util.CreateFolder(subDir); err != nil {
return "", "", err
}
if err := im.writeByLine(samples, fileURL); err != nil {
return "", err
return "", "", err
}
} else {
return "", fmt.Errorf("create a %s format file in %s failed", format, subDir)

if !job.Dataset.IsLocalStorage {
tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
if err := im.writeByLine(tempSamples, absURLFile); err != nil {
return "", "", err
}
}

return fileURL, absURLFile, nil
}

temporaryDir, err := util.CreateTemporaryDir()
if err != nil {
return "", "", err
}

localFileURL, localAbsURLFile := createFile(temporaryDir, format, job.Dataset.IsLocalStorage)

if err := im.writeByLine(samples, localFileURL); err != nil {
return "", "", err
}

if err := job.Storage.Upload(localFileURL, fileURL); err != nil {
return "", "", err
}

tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)

if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil {
return "", "", err
}

if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
return "", "", err
}

return fileURL, nil
return fileURL, absURLFile, nil
}

// writeByLine writes file by line


+ 7
- 4
pkg/localcontroller/manager/types.go View File

@@ -45,6 +45,9 @@ const (
TriggerReadyStatus = "ready"
// TriggerCompletedStatus is the completed status about trigger in incremental-learning-job
TriggerCompletedStatus = "completed"

// Credential is credential of the storage service
Credential = "sedna.io/credential"
)

// WorkerMessage defines message struct from worker
@@ -82,10 +85,10 @@ type UpstreamMessage struct {

type WorkerInput struct {
// Only one model cases
Models []ModelInfo `json:"models,omitempty"`
DataURL string `json:"dataURL,omitempty"`
OutputDir string `json:"outputDir,omitempty"`
Models []ModelInfo `json:"models,omitempty"`
DataURL string `json:"dataURL,omitempty"`
DataIndexURL string `json:"dataIndexURL,omitempty"`
OutputDir string `json:"outputDir,omitempty"`
}

// WorkerOutput defines output information of worker


+ 147
- 0
pkg/localcontroller/storage/minio.go View File

@@ -0,0 +1,147 @@
/*
Copyright 2021 The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package storage

import (
"context"
"fmt"
"net/url"
"strings"
"time"

"k8s.io/klog/v2"

"github.com/kubeedge/sedna/pkg/localcontroller/util"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)

// MinioClient defines a minio client
type MinioClient struct {
Client *minio.Client
}

// MaxTimeOut is max deadline time of client working
const MaxTimeOut = 100 * time.Second

// createMinioClient creates client
func createMinioClient(endpoint string, useHTTPS string, accessKeyID string, secretAccessKey string) (*MinioClient, error) {
token := ""
useSSL := true
if useHTTPS == "0" {
useSSL = false
}

client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, token),
Secure: useSSL,
})
if err != nil {
return nil, fmt.Errorf("initialize minio client failed, endpoint = %+v, error: %+v", endpoint, err)
}

c := MinioClient{
Client: client,
}

return &c, nil
}

// uploadFile uploads file from local host to storage service
func (mc *MinioClient) uploadFile(localPath string, objectURL string) error {
bucket, absPath, err := mc.parseURL(objectURL)
if err != nil {
return err
}

if !util.IsExists(localPath) {
return fmt.Errorf("file(%s) in the local host is not exists", localPath)
}

ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut)
defer cancel()

if _, err = mc.Client.FPutObject(ctx, bucket, absPath, localPath, minio.PutObjectOptions{}); err != nil {
return fmt.Errorf("upload file from file path(%s) to file url(%s) failed, error: %+v", localPath, objectURL, err)
}

return nil
}

// copyFile copies file from the bucket to another bucket in storage service
func (mc *MinioClient) copyFile(srcURL string, objectURL string) error {
srcBucket, srcAbsPath, err := mc.parseURL(srcURL)
if err != nil {
return err
}
srcOptions := minio.CopySrcOptions{
Bucket: srcBucket,
Object: srcAbsPath,
}

objectBucket, objectAbsPath, err := mc.parseURL(objectURL)
if err != nil {
return err
}
objectOptions := minio.CopyDestOptions{
Bucket: objectBucket,
Object: objectAbsPath,
}

ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut)
defer cancel()
if _, err := mc.Client.CopyObject(ctx, objectOptions, srcOptions); err != nil {
return fmt.Errorf("copy file from file url(%s) to file url(%s) failed, error: %+v", srcURL, objectURL, err)
}

return nil
}

// DownloadFile downloads file from storage service to the local host
func (mc *MinioClient) downloadFile(objectURL string, localPath string) error {
bucket, absPath, err := mc.parseURL(objectURL)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut)
defer cancel()

if err = mc.Client.FGetObject(ctx, bucket, absPath, localPath, minio.GetObjectOptions{}); err != nil {
return fmt.Errorf("download file from file url(%s) to file path(%s) failed, error: %+v", objectURL, localPath, err)
}

return nil
}

// parseURL parses url
func (mc *MinioClient) parseURL(URL string) (string, string, error) {
u, err := url.Parse(URL)
if err != nil {
return "", "", fmt.Errorf("invalid url(%s)", URL)
}

scheme := u.Scheme
switch scheme {
case S3Prefix:
return u.Host, strings.TrimPrefix(u.Path, "/"), nil
default:
klog.Errorf("invalid scheme(%s)", scheme)
}

return "", "", fmt.Errorf("invalid url(%s)", URL)
}

+ 212
- 0
pkg/localcontroller/storage/storage.go View File

@@ -0,0 +1,212 @@
/*
Copyright 2021 The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package storage

import (
"encoding/json"
"fmt"
"net/url"
"path"
"path/filepath"

"k8s.io/klog/v2"

"github.com/kubeedge/sedna/pkg/localcontroller/util"
)

const (
// S3Prefix defines that prefix of url is s3
S3Prefix = "s3"
// LocalPrefix defines that prefix of url is local host
LocalPrefix = ""
// S3EndPoint is s3 endpoint of the storage service
S3Endpoint = "s3-endpoint"
// S3UseHTTPS determines whether to use HTTPS protocol
S3UseHTTPS = "s3-usehttps"
// AccessKeyId is access key id of the storage service
AccessKeyID = "ACCESS_KEY_ID"
// SecretAccessKey is secret access key of the storage service
SecretAccessKey = "SECRET_ACCESS_KEY"
)

type Storage struct {
MinioClient *MinioClient
}

// Download downloads the file to the local host
func (s *Storage) Download(objectURL string, localPath string) (string, error) {
prefix, err := CheckURL(objectURL)
if err != nil {
return "", err
}

switch prefix {
case LocalPrefix:
return s.localCopy(objectURL, localPath)
case S3Prefix:
return s.downloadS3(objectURL, localPath)
default:
return "", fmt.Errorf("invalid url(%s)", objectURL)
}
}

// downloadLocal copies the local file to another in local host
func (s *Storage) localCopy(objectURL string, localPath string) (string, error) {
if !util.IsExists(objectURL) {
return "", fmt.Errorf("url(%s) is not exists", objectURL)
}

if localPath == "" {
return objectURL, nil
}

dir := path.Dir(localPath)
if !util.IsDir(dir) {
util.CreateFolder(dir)
}

util.CopyFile(objectURL, localPath)

return localPath, nil
}

// downloadS3 downloads the file from url of s3 to the local host
func (s *Storage) downloadS3(objectURL string, localPath string) (string, error) {
if localPath == "" {
temporaryDir, err := util.CreateTemporaryDir()
if err != nil {
return "", err
}

localPath = path.Join(temporaryDir, filepath.Base(objectURL))
}

dir := path.Dir(localPath)
if !util.IsDir(dir) {
util.CreateFolder(dir)
}

if err := s.MinioClient.downloadFile(objectURL, localPath); err != nil {
return "", err
}

return localPath, nil
}

// checkMapKey checks whether key exists in the dict
func checkMapKey(m map[string]string, key string) (string, error) {
v, ok := m[key]
if !ok {
return "", fmt.Errorf("%s is empty", key)
}
return v, nil
}

// SetCredential sets credential of the storage service
func (s *Storage) SetCredential(credential string) error {
c := credential
m := make(map[string]string)
if err := json.Unmarshal([]byte(c), &m); err != nil {
return err
}

endpoint, err := checkMapKey(m, S3Endpoint)
if err != nil {
return err
}

useHTTPS, err := checkMapKey(m, S3UseHTTPS)
if err != nil {
useHTTPS = "1"
}

ak, err := checkMapKey(m, AccessKeyID)
if err != nil {
return err
}

sk, err := checkMapKey(m, SecretAccessKey)
if err != nil {
return err
}

mc, err := createMinioClient(endpoint, useHTTPS, ak, sk)
if err != nil {
return err
}

s.MinioClient = mc

return nil
}

// Upload uploads the file in local host to another(e.g., "s3")
func (s *Storage) Upload(localPath string, objectURL string) error {
prefix, err := CheckURL(objectURL)
if err != nil {
return err
}

switch prefix {
case S3Prefix:
return s.uploadS3(localPath, objectURL)
default:
return fmt.Errorf("invalid url(%s)", objectURL)
}
}

// uploadS3 uploads the file in local host to the url of s3
func (s *Storage) uploadS3(srcURL string, objectURL string) error {
prefix, err := CheckURL(srcURL)
if err != nil {
return err
}
switch prefix {
case LocalPrefix:
if err := s.MinioClient.uploadFile(srcURL, objectURL); err != nil {
return err
}
case S3Prefix:
if err := s.MinioClient.copyFile(srcURL, objectURL); err != nil {
return err
}
}
return nil
}

// CheckURL checks prefix of the url
func CheckURL(objectURL string) (string, error) {
if objectURL == "" {
return "", fmt.Errorf("empty url")
}

u, err := url.Parse(objectURL)
if err != nil {
klog.Errorf("invalid url(%s), error: %v", objectURL, err)
return "", fmt.Errorf("invalid url(%s)", objectURL)
}

l := []string{LocalPrefix, S3Prefix}

for _, v := range l {
if u.Scheme == v {
return u.Scheme, nil
}
}

return "", fmt.Errorf("unvalid url(%s), not support prefix(%s)", objectURL, u.Scheme)
}

+ 24
- 0
pkg/localcontroller/util/util.go View File

@@ -19,9 +19,13 @@ package util
import (
"fmt"
"io"
"math/rand"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"k8s.io/klog/v2"
)
@@ -105,3 +109,23 @@ func AddPrefixPath(prefix string, path string) string {
func GetUniqueIdentifier(namespace string, name string, kind string) string {
return fmt.Sprintf("%s/%s/%s", namespace, kind, name)
}

// CreateTemporaryDir creates a temporary dir
func CreateTemporaryDir() (string, error) {
var src = rand.NewSource(time.Now().UnixNano())
dir := path.Join("/tmp/", strconv.FormatInt(src.Int63(), 10), "/")
err := CreateFolder(dir)
return dir, err
}

// ParsingDatasetIndex parses index file of dataset and adds the prefix to abs url of sample
func ParsingDatasetIndex(samples []string, prefix string) []string {
var l []string
l = append(l, prefix)
for _, v := range samples {
absURL := strings.Split(v, " ")[0]
l = append(l, absURL)
}

return l
}

Loading…
Cancel
Save