You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

dataset.go 8.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package dataset
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path/filepath"
  20. "strings"
  21. "time"
  22. "k8s.io/klog/v2"
  23. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  24. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  25. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  26. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  27. clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  28. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  29. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  30. workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker"
  31. )
  32. const (
  33. // MonitorDataSourceIntervalSeconds is interval time of monitoring data source
  34. MonitorDataSourceIntervalSeconds = 60
  35. // KindName is kind of dataset resource
  36. KindName = "dataset"
  37. // CSVFormat is commas separated value format with a extra header.
  38. // It can be used in structured data scenarios.
  39. CSVFormat = "csv"
  40. // TXTFormat is line separated format.
  41. // It can be used in unstructured data scenarios.
  42. TXTFormat = "txt"
  43. )
  44. // DatasetManager defines dataset manager
  45. type Manager struct {
  46. Client clienttypes.ClientI
  47. DatasetMap map[string]*Dataset
  48. VolumeMountPrefix string
  49. }
  50. // Dataset defines config for dataset
  51. type Dataset struct {
  52. *sednav1.Dataset
  53. DataSource *DataSource `json:"dataSource"`
  54. Done chan struct{}
  55. URLPrefix string
  56. Storage storage.Storage
  57. }
  58. // DataSource defines config for data source
  59. type DataSource struct {
  60. TrainSamples []string
  61. NumberOfSamples int
  62. Header string
  63. }
  64. // New creates a dataset manager
  65. func New(client clienttypes.ClientI, options *options.LocalControllerOptions) *Manager {
  66. dm := Manager{
  67. Client: client,
  68. DatasetMap: make(map[string]*Dataset),
  69. VolumeMountPrefix: options.VolumeMountPrefix,
  70. }
  71. return &dm
  72. }
  73. // Start starts dataset manager
  74. func (dm *Manager) Start() error {
  75. return nil
  76. }
  77. // GetDatasetChannel gets dataset
  78. func (dm *Manager) GetDataset(name string) (*Dataset, bool) {
  79. d, ok := dm.DatasetMap[name]
  80. return d, ok
  81. }
  82. // Insert inserts dataset to db
  83. func (dm *Manager) Insert(message *clienttypes.Message) error {
  84. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  85. first := false
  86. dataset, ok := dm.DatasetMap[name]
  87. if !ok {
  88. dataset = &Dataset{}
  89. dataset.Storage = storage.Storage{IsLocalStorage: false}
  90. dataset.Done = make(chan struct{})
  91. dm.DatasetMap[name] = dataset
  92. first = true
  93. }
  94. if err := json.Unmarshal(message.Content, dataset); err != nil {
  95. return err
  96. }
  97. credential := dataset.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
  98. if credential != "" {
  99. if err := dataset.Storage.SetCredential(credential); err != nil {
  100. return fmt.Errorf("failed to set dataset(name=%s)'s storage credential, error: %+v", name, err)
  101. }
  102. }
  103. isLocalURL, err := dataset.Storage.IsLocalURL(dataset.Spec.URL)
  104. if err != nil {
  105. return fmt.Errorf("dataset(name=%s)'s url is invalid, error: %+v", name, err)
  106. }
  107. if isLocalURL {
  108. dataset.Storage.IsLocalStorage = true
  109. }
  110. if first {
  111. go dm.monitorDataSources(name)
  112. }
  113. if err := db.SaveResource(name, dataset.TypeMeta, dataset.ObjectMeta, dataset.Spec); err != nil {
  114. return err
  115. }
  116. return nil
  117. }
  118. // Delete deletes dataset config in db
  119. func (dm *Manager) Delete(message *clienttypes.Message) error {
  120. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  121. if ds, ok := dm.DatasetMap[name]; ok && ds.Done != nil {
  122. close(ds.Done)
  123. }
  124. delete(dm.DatasetMap, name)
  125. if err := db.DeleteResource(name); err != nil {
  126. return err
  127. }
  128. return nil
  129. }
  130. // monitorDataSources monitors the data url of specified dataset
  131. func (dm *Manager) monitorDataSources(name string) {
  132. ds, ok := dm.DatasetMap[name]
  133. if !ok || ds == nil {
  134. return
  135. }
  136. dataURL := ds.Spec.URL
  137. if ds.Storage.IsLocalStorage {
  138. dataURL = util.AddPrefixPath(dm.VolumeMountPrefix, dataURL)
  139. }
  140. ds.URLPrefix = strings.TrimRight(dataURL, filepath.Base(dataURL))
  141. samplesNumber := 0
  142. for {
  143. select {
  144. case <-ds.Done:
  145. return
  146. default:
  147. }
  148. dataSource, err := ds.getDataSource(dataURL, ds.Spec.Format)
  149. if err != nil {
  150. klog.Errorf("dataset(name=%s) get samples from %s failed, error: %+v", name, dataURL, err)
  151. } else {
  152. ds.DataSource = dataSource
  153. if samplesNumber != dataSource.NumberOfSamples {
  154. samplesNumber = dataSource.NumberOfSamples
  155. klog.Infof("dataset(name=%s) get samples from data source(url=%s) successfully. number of samples: %d",
  156. name, dataURL, dataSource.NumberOfSamples)
  157. header := clienttypes.MessageHeader{
  158. Namespace: ds.Namespace,
  159. ResourceKind: ds.Kind,
  160. ResourceName: ds.Name,
  161. Operation: clienttypes.StatusOperation,
  162. }
  163. if err := dm.Client.WriteMessage(struct {
  164. NumberOfSamples int `json:"numberOfSamples"`
  165. }{
  166. dataSource.NumberOfSamples,
  167. }, header); err != nil {
  168. klog.Errorf("dataset(name=%s) publish samples info failed, error: %+v", name, err)
  169. }
  170. }
  171. }
  172. <-time.After(MonitorDataSourceIntervalSeconds * time.Second)
  173. }
  174. }
  175. // getDataSource gets data source info
  176. func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, error) {
  177. if err := ds.validFormat(format); err != nil {
  178. return nil, err
  179. }
  180. localURL, err := ds.Storage.Download(dataURL, "")
  181. if !ds.Storage.IsLocalStorage {
  182. defer os.RemoveAll(localURL)
  183. }
  184. if err != nil {
  185. return nil, err
  186. }
  187. return ds.readByLine(localURL, format)
  188. }
  189. // readByLine reads file by line
  190. func (ds *Dataset) readByLine(url string, format string) (*DataSource, error) {
  191. samples, err := GetSamples(url)
  192. if err != nil {
  193. klog.Errorf("read file %s failed, error: %v", url, err)
  194. return nil, err
  195. }
  196. numberOfSamples := 0
  197. dataSource := DataSource{}
  198. switch strings.ToLower(format) {
  199. case TXTFormat:
  200. numberOfSamples += len(samples)
  201. case CSVFormat:
  202. // the first row of csv file is header
  203. if len(samples) == 0 {
  204. return nil, fmt.Errorf("file %s is empty", url)
  205. }
  206. dataSource.Header = samples[0]
  207. samples = samples[1:]
  208. numberOfSamples += len(samples)
  209. default:
  210. return nil, fmt.Errorf("invaild file format")
  211. }
  212. dataSource.TrainSamples = samples
  213. dataSource.NumberOfSamples = numberOfSamples
  214. return &dataSource, nil
  215. }
  216. func (dm *Manager) GetName() string {
  217. return KindName
  218. }
  219. func (dm *Manager) AddWorkerMessage(message workertypes.MessageContent) {
  220. // dummy
  221. }
  222. // GetSamples gets samples in a file
  223. func GetSamples(url string) ([]string, error) {
  224. var samples = []string{}
  225. if !util.IsExists(url) {
  226. return nil, fmt.Errorf("url(%s) does not exist", url)
  227. }
  228. if !util.IsFile(url) {
  229. return nil, fmt.Errorf("url(%s) is not a file, not vaild", url)
  230. }
  231. file, err := os.Open(url)
  232. if err != nil {
  233. klog.Errorf("read %s failed, error: %v", url, err)
  234. return samples, err
  235. }
  236. fileScanner := bufio.NewScanner(file)
  237. for fileScanner.Scan() {
  238. samples = append(samples, fileScanner.Text())
  239. }
  240. if err = file.Close(); err != nil {
  241. klog.Errorf("close file(url=%s) failed, error: %v", url, err)
  242. return samples, err
  243. }
  244. return samples, nil
  245. }
  246. // validFormat checks data format is valid
  247. func (ds *Dataset) validFormat(format string) error {
  248. for _, v := range []string{TXTFormat, CSVFormat} {
  249. if strings.ToLower(format) == v {
  250. return nil
  251. }
  252. }
  253. return fmt.Errorf("dataset format(%s) is invalid", format)
  254. }