You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

dataset.go 7.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package manager
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path"
  20. "path/filepath"
  21. "strings"
  22. "time"
  23. "k8s.io/klog/v2"
  24. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  25. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  26. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  27. "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  28. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  29. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  30. )
  31. const (
  32. // MonitorDataSourceIntervalSeconds is interval time of monitoring data source
  33. MonitorDataSourceIntervalSeconds = 10
  34. // DatasetResourceKind is kind of dataset resource
  35. DatasetResourceKind = "dataset"
  36. )
  37. // DatasetManager defines dataset manager
  38. type DatasetManager struct {
  39. Client gmclient.ClientI
  40. DatasetMap map[string]*Dataset
  41. VolumeMountPrefix string
  42. }
  43. // Dataset defines config for dataset
  44. type Dataset struct {
  45. *sednav1.Dataset
  46. DataSource *DataSource `json:"dataSource"`
  47. Done chan struct{}
  48. URLPrefix string
  49. Storage storage.Storage
  50. }
  51. // DatasetSpec defines dataset spec
  52. type DatasetSpec struct {
  53. Format string `json:"format"`
  54. DataURL string `json:"url"`
  55. }
  56. // DataSource defines config for data source
  57. type DataSource struct {
  58. TrainSamples []string
  59. ValidSamples []string
  60. NumberOfSamples int
  61. Header string
  62. }
  63. // NewDatasetManager creates a dataset manager
  64. func NewDatasetManager(client gmclient.ClientI, options *options.LocalControllerOptions) *DatasetManager {
  65. dm := DatasetManager{
  66. Client: client,
  67. DatasetMap: make(map[string]*Dataset),
  68. VolumeMountPrefix: options.VolumeMountPrefix,
  69. }
  70. return &dm
  71. }
  72. // Start starts dataset manager
  73. func (dm *DatasetManager) Start() error {
  74. return nil
  75. }
  76. // GetDatasetChannel gets dataset
  77. func (dm *DatasetManager) GetDataset(name string) (*Dataset, bool) {
  78. d, ok := dm.DatasetMap[name]
  79. return d, ok
  80. }
  81. // Insert inserts dataset to db
  82. func (dm *DatasetManager) Insert(message *gmclient.Message) error {
  83. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  84. first := false
  85. dataset, ok := dm.DatasetMap[name]
  86. if !ok {
  87. dataset = &Dataset{}
  88. dataset.Storage = storage.Storage{IsLocalStorage: false}
  89. dataset.Done = make(chan struct{})
  90. dm.DatasetMap[name] = dataset
  91. first = true
  92. }
  93. if err := json.Unmarshal(message.Content, dataset); err != nil {
  94. return err
  95. }
  96. credential := dataset.ObjectMeta.Annotations[CredentialAnnotationKey]
  97. if credential != "" {
  98. if err := dataset.Storage.SetCredential(credential); err != nil {
  99. return fmt.Errorf("failed to set dataset(name=%s)'s storage credential, error: %+v", name, err)
  100. }
  101. }
  102. isLocalURL, err := dataset.Storage.IsLocalURL(dataset.Spec.URL)
  103. if err != nil {
  104. return fmt.Errorf("dataset(name=%s)'s url is invalid, error: %+v", name, err)
  105. }
  106. if isLocalURL {
  107. dataset.Storage.IsLocalStorage = true
  108. }
  109. if first {
  110. go dm.monitorDataSources(name)
  111. }
  112. if err := db.SaveResource(name, dataset.TypeMeta, dataset.ObjectMeta, dataset.Spec); err != nil {
  113. return err
  114. }
  115. return nil
  116. }
  117. // Delete deletes dataset config in db
  118. func (dm *DatasetManager) Delete(message *gmclient.Message) error {
  119. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  120. if ds, ok := dm.DatasetMap[name]; ok && ds.Done != nil {
  121. close(ds.Done)
  122. }
  123. delete(dm.DatasetMap, name)
  124. if err := db.DeleteResource(name); err != nil {
  125. return err
  126. }
  127. return nil
  128. }
  129. // monitorDataSources monitors the data url of specified dataset
  130. func (dm *DatasetManager) monitorDataSources(name string) {
  131. ds, ok := dm.DatasetMap[name]
  132. if !ok || ds == nil {
  133. return
  134. }
  135. dataURL := ds.Spec.URL
  136. if ds.Storage.IsLocalStorage {
  137. dataURL = util.AddPrefixPath(dm.VolumeMountPrefix, dataURL)
  138. }
  139. ds.URLPrefix = strings.TrimRight(dataURL, filepath.Base(dataURL))
  140. samplesNumber := 0
  141. for {
  142. select {
  143. case <-ds.Done:
  144. return
  145. default:
  146. }
  147. dataSource, err := ds.getDataSource(dataURL, ds.Spec.Format)
  148. if err != nil {
  149. klog.Errorf("dataset(name=%s) get samples from %s failed, error: %+v", name, dataURL, err)
  150. } else {
  151. ds.DataSource = dataSource
  152. if samplesNumber != dataSource.NumberOfSamples {
  153. samplesNumber = dataSource.NumberOfSamples
  154. klog.Infof("dataset(name=%s) get samples from data source(url=%s) successfully. number of samples: %d",
  155. name, dataURL, dataSource.NumberOfSamples)
  156. }
  157. header := gmclient.MessageHeader{
  158. Namespace: ds.Namespace,
  159. ResourceKind: ds.Kind,
  160. ResourceName: ds.Name,
  161. Operation: gmclient.StatusOperation,
  162. }
  163. if err := dm.Client.WriteMessage(struct {
  164. NumberOfSamples int `json:"numberOfSamples"`
  165. }{
  166. dataSource.NumberOfSamples,
  167. }, header); err != nil {
  168. klog.Errorf("dataset(name=%s) publish samples info failed, error: %+v", name, err)
  169. }
  170. }
  171. <-time.After(MonitorDataSourceIntervalSeconds * time.Second)
  172. }
  173. }
  174. // getDataSource gets data source info
  175. func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, error) {
  176. if path.Ext(dataURL) != ("." + format) {
  177. return nil, fmt.Errorf("dataset file url(%s)'s suffix is different from format(%s)", dataURL, format)
  178. }
  179. localURL, err := ds.Storage.Download(dataURL, "")
  180. if !ds.Storage.IsLocalStorage {
  181. defer os.RemoveAll(localURL)
  182. }
  183. if err != nil {
  184. return nil, err
  185. }
  186. return ds.readByLine(localURL, format)
  187. }
  188. // readByLine reads file by line
  189. func (ds *Dataset) readByLine(url string, format string) (*DataSource, error) {
  190. samples, err := getSamples(url)
  191. if err != nil {
  192. klog.Errorf("read file %s failed, error: %v", url, err)
  193. return nil, err
  194. }
  195. numberOfSamples := 0
  196. dataSource := DataSource{}
  197. switch format {
  198. case DatasetFormatTXT:
  199. numberOfSamples += len(samples)
  200. case DatasetFormatCSV:
  201. // the first row of csv file is header
  202. if len(samples) == 0 {
  203. return nil, fmt.Errorf("file %s is empty", url)
  204. }
  205. dataSource.Header = samples[0]
  206. samples = samples[1:]
  207. numberOfSamples += len(samples)
  208. default:
  209. return nil, fmt.Errorf("invaild file format")
  210. }
  211. dataSource.TrainSamples = samples
  212. dataSource.NumberOfSamples = numberOfSamples
  213. return &dataSource, nil
  214. }
  215. func (dm *DatasetManager) GetName() string {
  216. return DatasetResourceKind
  217. }
  218. func (dm *DatasetManager) AddWorkerMessage(message WorkerMessage) {
  219. // dummy
  220. }
  221. // getSamples gets samples in a file
  222. func getSamples(url string) ([]string, error) {
  223. var samples = []string{}
  224. if !util.IsExists(url) {
  225. return nil, fmt.Errorf("url(%s) does not exist", url)
  226. }
  227. if !util.IsFile(url) {
  228. return nil, fmt.Errorf("url(%s) is not a file, not vaild", url)
  229. }
  230. file, err := os.Open(url)
  231. if err != nil {
  232. klog.Errorf("read %s failed, error: %v", url, err)
  233. return samples, err
  234. }
  235. fileScanner := bufio.NewScanner(file)
  236. for fileScanner.Scan() {
  237. samples = append(samples, fileScanner.Text())
  238. }
  239. if err = file.Close(); err != nil {
  240. klog.Errorf("close file(url=%s) failed, error: %v", url, err)
  241. return samples, err
  242. }
  243. return samples, nil
  244. }