You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

lifelonglearningjob.go 29 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package lifelonglearning
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/klog/v2"
  26. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  27. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  28. gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/lifelonglearning"
  29. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  30. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  31. clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  32. "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset"
  33. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  34. "github.com/kubeedge/sedna/pkg/localcontroller/trigger"
  35. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  36. workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker"
  37. "github.com/microcosm-cc/bluemonday"
  38. )
  39. const (
  40. // JobIterationIntervalSeconds is interval time of each iteration of job
  41. JobIterationIntervalSeconds = 10
  42. // DatasetHandlerIntervalSeconds is interval time of handling dataset
  43. DatasetHandlerIntervalSeconds = 10
  44. // EvalSamplesCapacity is capacity of eval samples
  45. EvalSamplesCapacity = 5
  46. //KindName is kind of lifelong-learning-job resource
  47. KindName = "lifelonglearningjob"
  48. // TriggerReadyStatus is the ready status about trigger
  49. TriggerReadyStatus = "ready"
  50. // TriggerCompletedStatus is the completed status about trigger
  51. TriggerCompletedStatus = "completed"
  52. AnnotationsRoundsKey = "sedna.io/rounds"
  53. AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples"
  54. AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval"
  55. )
  56. // LifelongLearningJobManager defines lifelong-learning-job Manager
  57. type Manager struct {
  58. Client clienttypes.ClientI
  59. WorkerMessageChannel chan workertypes.MessageContent
  60. DatasetManager *dataset.Manager
  61. LifelongLearningJobMap map[string]*Job
  62. VolumeMountPrefix string
  63. }
  64. // LifelongLearningJob defines config for lifelong-learning-job
  65. type Job struct {
  66. sednav1.LifelongLearningJob
  67. JobConfig *JobConfig
  68. }
  69. // JobConfig defines config for lifelong-learning-job
  70. type JobConfig struct {
  71. UniqueIdentifier string
  72. Rounds int
  73. TrainTrigger trigger.Base
  74. TriggerTime time.Time
  75. TrainTriggerStatus string
  76. EvalTriggerStatus string
  77. DeployTriggerStatus string
  78. TrainDataURL string
  79. EvalDataURL string
  80. OutputDir string
  81. OutputConfig *OutputConfig
  82. DataSamples *DataSamples
  83. DeployModel *Model
  84. Lock sync.Mutex
  85. Dataset *dataset.Dataset
  86. Storage storage.Storage
  87. Done chan struct{}
  88. }
  89. type Model = clienttypes.Model
  90. // OutputConfig defines config for job output
  91. type OutputConfig struct {
  92. SamplesOutput map[string]string `json:"trainData"`
  93. TrainOutput string `json:"trainOutput"`
  94. EvalOutput string `json:"evalOutput"`
  95. }
  96. // DataSamples defines samples information
  97. type DataSamples struct {
  98. PreviousNumbers int
  99. TrainSamples []string
  100. EvalVersionSamples [][]string
  101. EvalSamples []string
  102. }
  103. // New creates a lifelong-learning-job manager
  104. func New(client clienttypes.ClientI, datasetManager *dataset.Manager, options *options.LocalControllerOptions) *Manager {
  105. lm := Manager{
  106. Client: client,
  107. WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize),
  108. DatasetManager: datasetManager,
  109. LifelongLearningJobMap: make(map[string]*Job),
  110. VolumeMountPrefix: options.VolumeMountPrefix,
  111. }
  112. return &lm
  113. }
  114. // Insert inserts lifelong-learning-job config to db
  115. func (lm *Manager) Insert(message *clienttypes.Message) error {
  116. p := bluemonday.NewPolicy()
  117. name := p.Sanitize(util.GetUniqueIdentifier(message.Header.Namespace,
  118. message.Header.ResourceName, message.Header.ResourceKind))
  119. first := false
  120. job, ok := lm.LifelongLearningJobMap[name]
  121. if !ok {
  122. job = &Job{}
  123. lm.LifelongLearningJobMap[name] = job
  124. first = true
  125. }
  126. if err := json.Unmarshal(message.Content, &job); err != nil {
  127. return err
  128. }
  129. if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
  130. return err
  131. }
  132. if first {
  133. go lm.startJob(name)
  134. }
  135. return nil
  136. }
  137. // startJob starts a job
  138. func (lm *Manager) startJob(name string) {
  139. var err error
  140. job, ok := lm.LifelongLearningJobMap[name]
  141. if !ok {
  142. return
  143. }
  144. if err = lm.initJob(job, name); err != nil {
  145. klog.Errorf("failed to init job(%s): %+v", name)
  146. return
  147. }
  148. klog.Infof("lifelong learning job(%s) is started", name)
  149. defer klog.Infof("lifelong learning job(%s) is stopped", name)
  150. // handle data from dataset
  151. go lm.handleData(job)
  152. tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
  153. for {
  154. select {
  155. case <-job.JobConfig.Done:
  156. return
  157. case <-tick.C:
  158. cond := lm.getLatestCondition(job)
  159. jobStage := cond.Stage
  160. switch jobStage {
  161. case sednav1.LLJobTrain:
  162. err = lm.trainTask(job)
  163. case sednav1.LLJobEval:
  164. err = lm.evalTask(job)
  165. case sednav1.LLJobDeploy:
  166. err = lm.deployTask(job)
  167. default:
  168. klog.Errorf("invalid phase: %s", jobStage)
  169. continue
  170. }
  171. if err != nil {
  172. klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err)
  173. }
  174. }
  175. }
  176. }
  177. // trainTask starts training task
  178. func (lm *Manager) trainTask(job *Job) error {
  179. jobConfig := job.JobConfig
  180. latestCond := lm.getLatestCondition(job)
  181. jobStage := latestCond.Stage
  182. currentType := latestCond.Type
  183. if currentType == sednav1.LLJobStageCondWaiting {
  184. err := lm.loadDataset(job)
  185. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  186. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  187. jobConfig.UniqueIdentifier, err)
  188. }
  189. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  190. return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier)
  191. }
  192. initTriggerStatus(jobConfig)
  193. if jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  194. payload, ok, err := lm.triggerTrainTask(job)
  195. if !ok {
  196. return nil
  197. }
  198. if err != nil {
  199. klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v",
  200. jobConfig.UniqueIdentifier, jobStage, err)
  201. job.JobConfig.Rounds--
  202. return err
  203. }
  204. err = lm.Client.WriteMessage(payload, job.getHeader())
  205. if err != nil {
  206. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  207. job.JobConfig.Rounds--
  208. return err
  209. }
  210. forwardSamples(jobConfig, jobStage)
  211. err = lm.saveJobToDB(job)
  212. if err != nil {
  213. klog.Errorf("job(%s) failed to save job to db: %v",
  214. jobConfig.UniqueIdentifier, err)
  215. // continue anyway
  216. }
  217. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  218. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  219. jobConfig.UniqueIdentifier, jobStage)
  220. }
  221. }
  222. return nil
  223. }
  224. // evalTask starts eval task
  225. func (lm *Manager) evalTask(job *Job) error {
  226. jobConfig := job.JobConfig
  227. latestCond := lm.getLatestCondition(job)
  228. jobStage := latestCond.Stage
  229. currentType := latestCond.Type
  230. if currentType == sednav1.LLJobStageCondWaiting {
  231. err := lm.loadDataset(job)
  232. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  233. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  234. jobConfig.UniqueIdentifier, err)
  235. }
  236. if jobConfig.EvalTriggerStatus == TriggerReadyStatus {
  237. payload, err := lm.triggerEvalTask(job)
  238. if err != nil {
  239. klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v",
  240. jobConfig.UniqueIdentifier, jobStage, err)
  241. return err
  242. }
  243. err = lm.Client.WriteMessage(payload, job.getHeader())
  244. if err != nil {
  245. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  246. return err
  247. }
  248. forwardSamples(jobConfig, jobStage)
  249. jobConfig.EvalTriggerStatus = TriggerCompletedStatus
  250. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  251. jobConfig.UniqueIdentifier, jobStage)
  252. }
  253. }
  254. return nil
  255. }
  256. // deployTask starts deploy task
  257. func (lm *Manager) deployTask(job *Job) error {
  258. if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus {
  259. jobConfig := job.JobConfig
  260. var err error
  261. status := clienttypes.UpstreamMessage{Phase: string(sednav1.LLJobDeploy)}
  262. models := lm.getJobStageModel(job, sednav1.LLJobDeploy)
  263. if models != nil {
  264. err = lm.updateDeployModelFile(job, models[0].URL, jobConfig.DeployModel.URL)
  265. if err != nil {
  266. status.Status = string(sednav1.LLJobStageCondFailed)
  267. klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err)
  268. return err
  269. }
  270. status.Status = string(sednav1.LLJobStageCondReady)
  271. status.Input = &clienttypes.Input{Models: []Model{{Format: models[0].Format, URL: models[0].URL}}}
  272. } else {
  273. klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier)
  274. status.Status = string(sednav1.LLJobStageCondCompleted)
  275. }
  276. err = lm.Client.WriteMessage(status, job.getHeader())
  277. if err != nil {
  278. klog.Errorf("job(%s) completed the %s task failed: %v",
  279. jobConfig.UniqueIdentifier, sednav1.LLJobDeploy, err)
  280. return err
  281. }
  282. job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus
  283. klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.LLJobDeploy)
  284. }
  285. return nil
  286. }
  287. // triggerTrainTask triggers the train task
  288. func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) {
  289. var err error
  290. jobConfig := job.JobConfig
  291. const numOfSamples = "num_of_samples"
  292. samples := map[string]interface{}{
  293. numOfSamples: len(jobConfig.DataSamples.TrainSamples),
  294. }
  295. isTrigger := jobConfig.TrainTrigger.Trigger(samples)
  296. if !isTrigger {
  297. return nil, false, nil
  298. }
  299. job.JobConfig.Rounds++
  300. rounds := jobConfig.Rounds
  301. var dataIndexURL string
  302. jobConfig.TrainDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.TrainSamples,
  303. jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  304. if err != nil {
  305. job.JobConfig.Rounds--
  306. klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v",
  307. jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
  308. return nil, false, err
  309. }
  310. dataURL := jobConfig.TrainDataURL
  311. outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
  312. if jobConfig.Storage.IsLocalStorage {
  313. dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL)
  314. dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL)
  315. outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir)
  316. }
  317. input := clienttypes.Input{
  318. DataURL: dataURL,
  319. DataIndexURL: dataIndexURL,
  320. OutputDir: outputDir,
  321. }
  322. msg := clienttypes.UpstreamMessage{
  323. Phase: string(sednav1.LLJobTrain),
  324. Status: string(sednav1.LLJobStageCondReady),
  325. Input: &input,
  326. }
  327. jobConfig.TriggerTime = time.Now()
  328. return &msg, true, nil
  329. }
  330. // triggerEvalTask triggers the eval task
  331. func (lm *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, error) {
  332. jobConfig := job.JobConfig
  333. var err error
  334. latestCondition := lm.getLatestCondition(job)
  335. ms := lm.getJobStageModel(job, latestCondition.Stage)
  336. if ms == nil {
  337. return nil, err
  338. }
  339. var dataIndexURL string
  340. jobConfig.EvalDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
  341. job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  342. if err != nil {
  343. klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v",
  344. jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
  345. return nil, err
  346. }
  347. dataURL := jobConfig.EvalDataURL
  348. outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds)}, "/")
  349. if jobConfig.Storage.IsLocalStorage {
  350. dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL)
  351. dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL)
  352. outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir)
  353. }
  354. input := clienttypes.Input{
  355. Models: ms,
  356. DataURL: dataURL,
  357. DataIndexURL: dataIndexURL,
  358. OutputDir: outputDir,
  359. }
  360. msg := &clienttypes.UpstreamMessage{
  361. Phase: string(sednav1.LLJobEval),
  362. Status: string(sednav1.LLJobStageCondReady),
  363. Input: &input,
  364. }
  365. return msg, nil
  366. }
  367. // updateDeployModelFile updates deploy model file
  368. func (lm *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error {
  369. if job.JobConfig.Storage.IsLocalStorage {
  370. trainedModel = util.AddPrefixPath(lm.VolumeMountPrefix, trainedModel)
  371. }
  372. if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil {
  373. return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w",
  374. trainedModel, deployModel, err)
  375. }
  376. klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel)
  377. return nil
  378. }
  379. // createOutputDir creates the job output dir
  380. func (lm *Manager) createOutputDir(jobConfig *JobConfig) error {
  381. outputDir := jobConfig.OutputDir
  382. dirNames := []string{"data/train", "data/eval", "train", "eval"}
  383. if jobConfig.Storage.IsLocalStorage {
  384. if err := util.CreateFolder(outputDir); err != nil {
  385. return fmt.Errorf("failed to create folder %s: %v", outputDir, err)
  386. }
  387. for _, v := range dirNames {
  388. dir := path.Join(outputDir, v)
  389. if err := util.CreateFolder(dir); err != nil {
  390. return fmt.Errorf("failed to create folder %s: %v", dir, err)
  391. }
  392. }
  393. }
  394. outputConfig := OutputConfig{
  395. SamplesOutput: map[string]string{
  396. "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
  397. "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
  398. },
  399. TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
  400. EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
  401. }
  402. jobConfig.OutputConfig = &outputConfig
  403. return nil
  404. }
  405. func (lm *Manager) getLatestCondition(job *Job) sednav1.LLJobCondition {
  406. jobConditions := job.Status.Conditions
  407. var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{}
  408. if len(jobConditions) > 0 {
  409. // get latest pod and pod status
  410. latestCondition = jobConditions[len(jobConditions)-1]
  411. }
  412. return latestCondition
  413. }
  414. // createFile creates data file and data index file
  415. func createFile(dir string, format string, isLocalStorage bool) (string, string) {
  416. switch strings.ToLower(format) {
  417. case dataset.TXTFormat:
  418. if isLocalStorage {
  419. return path.Join(dir, "data.txt"), ""
  420. }
  421. return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
  422. case dataset.CSVFormat:
  423. return strings.Join([]string{dir, "data.csv"}, "/"), ""
  424. }
  425. return "", ""
  426. }
  427. // writeSamples writes samples information to a file
  428. func (lm *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
  429. if samples == nil {
  430. return "", "", fmt.Errorf("not samples")
  431. }
  432. jobConfig := job.JobConfig
  433. subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
  434. fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  435. if jobConfig.Storage.IsLocalStorage {
  436. if err := util.CreateFolder(subDir); err != nil {
  437. return "", "", err
  438. }
  439. if err := job.writeByLine(samples, fileURL, format); err != nil {
  440. return "", "", err
  441. }
  442. if !jobConfig.Dataset.Storage.IsLocalStorage && absURLFile != "" {
  443. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  444. if err := job.writeByLine(tempSamples, absURLFile, format); err != nil {
  445. return "", "", err
  446. }
  447. }
  448. return fileURL, absURLFile, nil
  449. }
  450. temporaryDir, err := util.CreateTemporaryDir()
  451. if err != nil {
  452. return "", "", err
  453. }
  454. localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  455. if err := job.writeByLine(samples, localFileURL, format); err != nil {
  456. return "", "", err
  457. }
  458. if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil {
  459. return "", "", err
  460. }
  461. if absURLFile != "" {
  462. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  463. if err := job.writeByLine(tempSamples, localAbsURLFile, format); err != nil {
  464. return "", "", err
  465. }
  466. if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
  467. return "", "", err
  468. }
  469. defer os.RemoveAll(localFileURL)
  470. }
  471. defer os.RemoveAll(localAbsURLFile)
  472. return fileURL, absURLFile, nil
  473. }
  474. // writeByLine writes file by line
  475. func (job *Job) writeByLine(samples []string, fileURL string, format string) error {
  476. file, err := os.Create(fileURL)
  477. if err != nil {
  478. klog.Errorf("create file(%s) failed", fileURL)
  479. return err
  480. }
  481. w := bufio.NewWriter(file)
  482. if format == "csv" {
  483. _, _ = fmt.Fprintln(w, job.JobConfig.Dataset.DataSource.Header)
  484. }
  485. for _, line := range samples {
  486. _, _ = fmt.Fprintln(w, line)
  487. }
  488. if err := w.Flush(); err != nil {
  489. klog.Errorf("write file(%s) failed", fileURL)
  490. return err
  491. }
  492. if err := file.Close(); err != nil {
  493. klog.Errorf("close file failed, error: %v", err)
  494. return err
  495. }
  496. return nil
  497. }
  498. // handleData updates samples information
  499. func (lm *Manager) handleData(job *Job) {
  500. tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)
  501. jobConfig := job.JobConfig
  502. iterCount := 0
  503. for {
  504. select {
  505. case <-jobConfig.Done:
  506. return
  507. default:
  508. }
  509. if iterCount%100 == 0 {
  510. klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier)
  511. }
  512. iterCount++
  513. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  514. // already loaded dataset
  515. <-tick.C
  516. continue
  517. }
  518. dataset := jobConfig.Dataset
  519. currentNumberOfSamples := dataset.DataSource.NumberOfSamples
  520. previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers
  521. if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples {
  522. samples := dataset.DataSource.TrainSamples
  523. newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples
  524. trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples))
  525. jobConfig.Lock.Lock()
  526. jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
  527. samples[previousNumberOfSamples:previousNumberOfSamples+trainNum]...)
  528. klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum)
  529. jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
  530. samples[previousNumberOfSamples+trainNum:])
  531. jobConfig.Lock.Unlock()
  532. for _, v := range jobConfig.DataSamples.EvalVersionSamples {
  533. jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
  534. }
  535. klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
  536. jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples
  537. }
  538. <-tick.C
  539. }
  540. }
  541. // loadDataset loads dataset information
  542. func (lm *Manager) loadDataset(job *Job) error {
  543. if job.JobConfig.Dataset != nil {
  544. // already loaded
  545. return nil
  546. }
  547. datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, dataset.KindName)
  548. dataset, ok := lm.DatasetManager.GetDataset(datasetName)
  549. if !ok || dataset == nil {
  550. return fmt.Errorf("not exists dataset(name=%s)", datasetName)
  551. }
  552. job.JobConfig.Dataset = dataset
  553. return nil
  554. }
  555. // initJob inits the job object
  556. func (lm *Manager) initJob(job *Job, name string) error {
  557. var err error
  558. jobConfig := new(JobConfig)
  559. jobConfig.UniqueIdentifier = name
  560. jobConfig.Storage = storage.Storage{IsLocalStorage: false}
  561. credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
  562. if credential != "" {
  563. if err := jobConfig.Storage.SetCredential(credential); err != nil {
  564. return fmt.Errorf("failed to set storage credential: %w", err)
  565. }
  566. }
  567. jobConfig.Done = make(chan struct{})
  568. jobConfig.Lock = sync.Mutex{}
  569. jobConfig.Rounds = 0
  570. jobConfig.DataSamples = &DataSamples{
  571. PreviousNumbers: 0,
  572. TrainSamples: make([]string, 0),
  573. EvalVersionSamples: make([][]string, 0),
  574. EvalSamples: make([]string, 0),
  575. }
  576. trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
  577. if err != nil {
  578. return fmt.Errorf("failed to init train trigger: %+w", err)
  579. }
  580. jobConfig.TrainTrigger = trainTrigger
  581. outputDir := job.Spec.OutputDir
  582. isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir)
  583. if err != nil {
  584. return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", name, outputDir, err)
  585. }
  586. if isLocalURL {
  587. jobConfig.Storage.IsLocalStorage = true
  588. outputDir = util.AddPrefixPath(lm.VolumeMountPrefix, outputDir)
  589. }
  590. jobConfig.OutputDir = outputDir
  591. if err = lm.createOutputDir(jobConfig); err != nil {
  592. return err
  593. }
  594. jobConfig.DeployModel = &Model{
  595. Format: "pkl",
  596. URL: strings.Join([]string{strings.TrimRight(outputDir, "/"), "deploy/index.pkl"}, "/"),
  597. }
  598. initTriggerStatus(jobConfig)
  599. job.JobConfig = jobConfig
  600. if err = lm.updateJobFromDB(job); err != nil {
  601. return fmt.Errorf("failed to update job from db, error: %v", err)
  602. }
  603. return nil
  604. }
  605. func initTriggerStatus(jobConfig *JobConfig) {
  606. jobConfig.TrainTriggerStatus = TriggerReadyStatus
  607. jobConfig.EvalTriggerStatus = TriggerReadyStatus
  608. jobConfig.DeployTriggerStatus = TriggerReadyStatus
  609. }
  610. func newTrigger(t sednav1.LLTrigger) (trigger.Base, error) {
  611. // convert trigger to map
  612. triggerMap := make(map[string]interface{})
  613. c, err := json.Marshal(t)
  614. if err != nil {
  615. return nil, err
  616. }
  617. err = json.Unmarshal(c, &triggerMap)
  618. if err != nil {
  619. return nil, err
  620. }
  621. return trigger.NewTrigger(triggerMap)
  622. }
  623. // getModelsFromJobConditions gets models from job condition
  624. func (lm *Manager) getModelsFromJobConditions(jobConditions []sednav1.LLJobCondition, stage sednav1.LLJobStage, currentType sednav1.LLJobStageConditionType, dataType string) []Model {
  625. // TODO: runtime.type changes to common.type for gm and lc
  626. for i := len(jobConditions) - 1; i >= 0; i-- {
  627. var cond gmtypes.ConditionData
  628. jobCond := jobConditions[i]
  629. if jobCond.Stage == stage && jobCond.Type == currentType {
  630. if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
  631. continue
  632. }
  633. if dataType == "input" {
  634. if cond.Input == nil {
  635. continue
  636. }
  637. return cond.Input.Models
  638. } else if dataType == "output" {
  639. if cond.Output == nil {
  640. continue
  641. }
  642. return cond.Output.Models
  643. }
  644. }
  645. }
  646. return nil
  647. }
  648. // getEvalResult gets eval result from job conditions
  649. func (lm *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) {
  650. jobConditions := job.Status.Conditions
  651. models := lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output")
  652. var result []map[string][]float64
  653. var err error
  654. for _, m := range models {
  655. bytes, err := json.Marshal(m.Metrics)
  656. if err != nil {
  657. return nil, err
  658. }
  659. data := make(map[string][]float64)
  660. if err = json.Unmarshal(bytes, &data); err != nil {
  661. return nil, err
  662. }
  663. result = append(result, data)
  664. }
  665. return result, err
  666. }
  667. // getJobStageModel gets model from job conditions for eval/deploy
  668. func (lm *Manager) getJobStageModel(job *Job, jobStage sednav1.LLJobStage) (models []Model) {
  669. jobConditions := job.Status.Conditions
  670. switch jobStage {
  671. case sednav1.LLJobEval:
  672. models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobTrain, sednav1.LLJobStageCondCompleted, "output")
  673. case sednav1.LLJobDeploy:
  674. models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output")
  675. }
  676. return models
  677. }
  678. // forwardSamples deletes the samples information in the memory
  679. func forwardSamples(jobConfig *JobConfig, jobStage sednav1.LLJobStage) {
  680. switch jobStage {
  681. case sednav1.LLJobTrain:
  682. jobConfig.Lock.Lock()
  683. jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
  684. jobConfig.Lock.Unlock()
  685. case sednav1.LLJobEval:
  686. if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity {
  687. jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
  688. }
  689. }
  690. }
  691. // Delete deletes lifelong-learning-job config in db
  692. func (lm *Manager) Delete(message *clienttypes.Message) error {
  693. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  694. if job, ok := lm.LifelongLearningJobMap[name]; ok && job.JobConfig.Done != nil {
  695. close(job.JobConfig.Done)
  696. }
  697. delete(lm.LifelongLearningJobMap, name)
  698. if err := db.DeleteResource(name); err != nil {
  699. return err
  700. }
  701. return nil
  702. }
  703. // updateJobFromDB updates job from db
  704. func (lm *Manager) updateJobFromDB(job *Job) error {
  705. var err error
  706. previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier)
  707. if err != nil {
  708. return err
  709. }
  710. m := metav1.ObjectMeta{}
  711. if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) {
  712. return err
  713. }
  714. rounds, ok := m.Annotations[AnnotationsRoundsKey]
  715. if !ok {
  716. return nil
  717. }
  718. if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil {
  719. return err
  720. }
  721. numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey]
  722. if !ok {
  723. return nil
  724. }
  725. if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil {
  726. return err
  727. }
  728. dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey]
  729. if !ok {
  730. return nil
  731. }
  732. localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "")
  733. if !job.JobConfig.Storage.IsLocalStorage {
  734. defer os.RemoveAll(localURL)
  735. }
  736. if err != nil {
  737. return err
  738. }
  739. samples, err := dataset.GetSamples(dataFileOfEval)
  740. if err != nil {
  741. klog.Errorf("read file %s failed: %v", dataFileOfEval, err)
  742. return err
  743. }
  744. job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples)
  745. return nil
  746. }
  747. // saveJobToDB saves job info to db
  748. func (lm *Manager) saveJobToDB(job *Job) error {
  749. ann := job.ObjectMeta.Annotations
  750. if ann == nil {
  751. ann = make(map[string]string)
  752. }
  753. ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds)
  754. ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers)
  755. ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL
  756. return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec)
  757. }
  758. // Start starts lifelong-learning-job manager
  759. func (lm *Manager) Start() error {
  760. go lm.monitorWorker()
  761. return nil
  762. }
  763. // monitorWorker monitors message from worker
  764. func (lm *Manager) monitorWorker() {
  765. for {
  766. workerMessageChannel := lm.WorkerMessageChannel
  767. workerMessage, ok := <-workerMessageChannel
  768. if !ok {
  769. break
  770. }
  771. klog.V(4).Infof("handling worker message %+v", workerMessage)
  772. name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
  773. job, ok := lm.LifelongLearningJobMap[name]
  774. if !ok {
  775. continue
  776. }
  777. // TODO: filter some worker messages out
  778. wo := clienttypes.Output{}
  779. wo.Models = workerMessage.Results
  780. wo.OwnerInfo = workerMessage.OwnerInfo
  781. msg := &clienttypes.UpstreamMessage{
  782. Phase: workerMessage.Kind,
  783. Status: workerMessage.Status,
  784. Output: &wo,
  785. }
  786. if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil {
  787. klog.Errorf("job(%s) failed to write message: %v", name, err)
  788. continue
  789. }
  790. }
  791. }
  792. // AddWorkerMessage adds worker messages
  793. func (lm *Manager) AddWorkerMessage(message workertypes.MessageContent) {
  794. lm.WorkerMessageChannel <- message
  795. }
  796. // GetName returns name of the manager
  797. func (lm *Manager) GetName() string {
  798. return KindName
  799. }
  800. func (job *Job) getHeader() clienttypes.MessageHeader {
  801. return clienttypes.MessageHeader{
  802. Namespace: job.Namespace,
  803. ResourceKind: job.Kind,
  804. ResourceName: job.Name,
  805. Operation: clienttypes.StatusOperation,
  806. }
  807. }