You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 36 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package incrementallearning
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "k8s.io/klog/v2"
  27. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  28. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  29. gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/incrementallearning"
  30. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  31. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  32. clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  33. "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset"
  34. "github.com/kubeedge/sedna/pkg/localcontroller/managers/model"
  35. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  36. "github.com/kubeedge/sedna/pkg/localcontroller/trigger"
  37. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  38. workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. )
  41. // IncrementalLearningJob defines config for incremental-learning-job
  42. type Job struct {
  43. sednav1.IncrementalLearningJob
  44. JobConfig *JobConfig
  45. }
  46. // JobConfig defines config for incremental-learning-job
  47. type JobConfig struct {
  48. UniqueIdentifier string
  49. Rounds int
  50. TrainTrigger trigger.Base
  51. DeployTrigger trigger.Base
  52. TriggerTime time.Time
  53. TrainTriggerStatus string
  54. EvalTriggerStatus string
  55. DeployTriggerStatus string
  56. HotModelUpdateDeployTriggerStatus string
  57. TrainDataURL string
  58. EvalDataURL string
  59. OutputDir string
  60. OutputConfig *OutputConfig
  61. DataSamples *DataSamples
  62. TrainModel *Model
  63. DeployModel *Model
  64. EvalModels []Model
  65. Lock sync.Mutex
  66. Dataset *dataset.Dataset
  67. Storage storage.Storage
  68. Done chan struct{}
  69. }
  70. type Model = clienttypes.Model
  71. // OutputConfig defines config for job output
  72. type OutputConfig struct {
  73. SamplesOutput map[string]string `json:"trainData"`
  74. TrainOutput string `json:"trainOutput"`
  75. EvalOutput string `json:"evalOutput"`
  76. }
  77. // DataSamples defines samples information
  78. type DataSamples struct {
  79. PreviousNumbers int
  80. TrainSamples []string
  81. EvalVersionSamples [][]string
  82. EvalSamples []string
  83. }
  84. // IncrementalLearningJob defines incremental-learning-job manager
  85. type Manager struct {
  86. Client clienttypes.ClientI
  87. WorkerMessageChannel chan workertypes.MessageContent
  88. DatasetManager *dataset.Manager
  89. ModelManager *model.Manager
  90. IncrementalJobMap map[string]*Job
  91. VolumeMountPrefix string
  92. }
  93. const (
  94. // JobIterationIntervalSeconds is interval time of each iteration of job
  95. JobIterationIntervalSeconds = 10
  96. // DatasetHandlerIntervalSeconds is interval time of handling dataset
  97. DatasetHandlerIntervalSeconds = 10
  98. // EvalSamplesCapacity is capacity of eval samples
  99. EvalSamplesCapacity = 5
  100. //KindName is kind of incremental-learning-job resource
  101. KindName = "incrementallearningjob"
  102. // TriggerReadyStatus is the ready status about trigger
  103. TriggerReadyStatus = "ready"
  104. // TriggerCompletedStatus is the completed status about trigger
  105. TriggerCompletedStatus = "completed"
  106. AnnotationsRoundsKey = "sedna.io/rounds"
  107. AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples"
  108. AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval"
  109. )
  110. // New creates a incremental-learning-job manager
  111. func New(client clienttypes.ClientI, datasetManager *dataset.Manager,
  112. modelManager *model.Manager, options *options.LocalControllerOptions) *Manager {
  113. im := Manager{
  114. Client: client,
  115. WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize),
  116. DatasetManager: datasetManager,
  117. ModelManager: modelManager,
  118. IncrementalJobMap: make(map[string]*Job),
  119. VolumeMountPrefix: options.VolumeMountPrefix,
  120. }
  121. return &im
  122. }
  123. // Start starts incremental-learning-job manager
  124. func (im *Manager) Start() error {
  125. go im.monitorWorker()
  126. return nil
  127. }
  128. // trainTask starts training task
  129. func (im *Manager) trainTask(job *Job) error {
  130. jobConfig := job.JobConfig
  131. latestCond := im.getLatestCondition(job)
  132. jobStage := latestCond.Stage
  133. currentType := latestCond.Type
  134. if currentType == sednav1.ILJobStageCondWaiting {
  135. var err error
  136. err = im.loadDataset(job)
  137. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  138. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  139. jobConfig.UniqueIdentifier, err)
  140. }
  141. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  142. return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier)
  143. }
  144. err = im.loadTrainModel(job)
  145. if err != nil {
  146. return fmt.Errorf("failed to sync train model, and waiting it: %w", err)
  147. }
  148. initTriggerStatus(jobConfig)
  149. if jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  150. payload, ok, err := im.triggerTrainTask(job)
  151. if !ok {
  152. return nil
  153. }
  154. if err != nil {
  155. klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v",
  156. jobConfig.UniqueIdentifier, jobStage, err)
  157. job.JobConfig.Rounds--
  158. return err
  159. }
  160. err = im.Client.WriteMessage(payload, job.getHeader())
  161. if err != nil {
  162. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  163. job.JobConfig.Rounds--
  164. return err
  165. }
  166. forwardSamples(jobConfig, jobStage)
  167. err = im.saveJobToDB(job)
  168. if err != nil {
  169. klog.Errorf("job(%s) failed to save job to db: %v",
  170. jobConfig.UniqueIdentifier, err)
  171. // continue anyway
  172. }
  173. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  174. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  175. jobConfig.UniqueIdentifier, jobStage)
  176. }
  177. }
  178. return nil
  179. }
  180. // evalTask starts eval task
  181. func (im *Manager) evalTask(job *Job) error {
  182. jobConfig := job.JobConfig
  183. latestCond := im.getLatestCondition(job)
  184. jobStage := latestCond.Stage
  185. currentType := latestCond.Type
  186. if currentType == sednav1.ILJobStageCondWaiting {
  187. var err error
  188. err = im.loadDataset(job)
  189. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  190. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  191. jobConfig.UniqueIdentifier, err)
  192. }
  193. err = im.loadDeployModel(job)
  194. if err != nil {
  195. return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err)
  196. }
  197. if jobConfig.EvalTriggerStatus == TriggerReadyStatus {
  198. payload, err := im.triggerEvalTask(job)
  199. if err != nil {
  200. klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v",
  201. jobConfig.UniqueIdentifier, jobStage, err)
  202. return err
  203. }
  204. err = im.Client.WriteMessage(payload, job.getHeader())
  205. if err != nil {
  206. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  207. return err
  208. }
  209. forwardSamples(jobConfig, jobStage)
  210. jobConfig.EvalTriggerStatus = TriggerCompletedStatus
  211. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  212. jobConfig.UniqueIdentifier, jobStage)
  213. }
  214. }
  215. return nil
  216. }
  217. // hotModelUpdateDeployTask starts deploy task when job supports hot model update
  218. func (im *Manager) hotModelUpdateDeployTask(job *Job) error {
  219. if job.JobConfig.HotModelUpdateDeployTriggerStatus == TriggerReadyStatus {
  220. var localModelConfigFile string
  221. if v, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok {
  222. localModelConfigFile = v
  223. } else {
  224. return nil
  225. }
  226. models := im.getJobStageModel(job, sednav1.ILJobDeploy)
  227. if models == nil {
  228. return nil
  229. }
  230. trainedModel := models[0]
  231. deployModel := models[1]
  232. trainedModelURL := trainedModel.URL
  233. modelName := filepath.Base(trainedModelURL)
  234. localHostDir := filepath.Dir(localModelConfigFile)
  235. localHostModelFile := filepath.Join(localHostDir, modelName)
  236. modelFile := util.AddPrefixPath(im.VolumeMountPrefix, localHostModelFile)
  237. if err := im.updateDeployModelFile(job, trainedModelURL, modelFile); err != nil {
  238. return err
  239. }
  240. deployModelURL := deployModel.URL
  241. if err := im.updateDeployModelFile(job, trainedModelURL, deployModelURL); err != nil {
  242. return err
  243. }
  244. config := map[string]map[string]string{
  245. "model_config": {
  246. "model_path": strings.Replace(localHostModelFile, localHostDir,
  247. runtime.ModelHotUpdateContainerPrefix, 1),
  248. "model_update_time": time.Now().String(),
  249. },
  250. }
  251. jsonConfig, err := json.MarshalIndent(config, "", " ")
  252. if err != nil {
  253. return err
  254. }
  255. modelConfigFile := util.AddPrefixPath(im.VolumeMountPrefix, localModelConfigFile)
  256. // overwrite file
  257. err = ioutil.WriteFile(modelConfigFile, jsonConfig, 0644)
  258. if err != nil {
  259. klog.Errorf("job(%s) write model config file(url=%s) failed in deploy phase: %v",
  260. job.JobConfig.UniqueIdentifier, modelConfigFile, err)
  261. return err
  262. }
  263. job.JobConfig.HotModelUpdateDeployTriggerStatus = TriggerCompletedStatus
  264. klog.V(4).Infof("job(%s) write model config file(url=%s) successfully in deploy phase",
  265. job.JobConfig.UniqueIdentifier, modelConfigFile)
  266. klog.Infof("job(%s) completed the %s task successfully", job.JobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  267. }
  268. return nil
  269. }
  270. // deployTask starts deploy task
  271. func (im *Manager) deployTask(job *Job) error {
  272. if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus {
  273. jobConfig := job.JobConfig
  274. var err error
  275. var neededDeploy bool
  276. neededDeploy, err = im.triggerDeployTask(job)
  277. status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)}
  278. models := im.getJobStageModel(job, sednav1.ILJobDeploy)
  279. if err == nil && neededDeploy && models != nil {
  280. if !job.Spec.DeploySpec.Model.HotUpdateEnabled {
  281. trainedModel := models[0]
  282. deployModel := models[1]
  283. err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL)
  284. if err != nil {
  285. status.Status = string(sednav1.ILJobStageCondFailed)
  286. klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err)
  287. return err
  288. }
  289. status.Status = string(sednav1.ILJobStageCondReady)
  290. klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  291. } else {
  292. status.Status = string(sednav1.ILJobStageCondReady)
  293. }
  294. status.Input = &clienttypes.Input{
  295. Models: models,
  296. }
  297. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  298. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  299. } else {
  300. // No need to deploy, just report completed status
  301. // TODO: instead of reporting deploy-completed, another more reasonable status
  302. klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier)
  303. status.Status = string(sednav1.ILJobStageCondCompleted)
  304. }
  305. err = im.Client.WriteMessage(status, job.getHeader())
  306. if err != nil {
  307. klog.Errorf("job(%s) completed the %s task failed: %v",
  308. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err)
  309. return err
  310. }
  311. job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus
  312. }
  313. return nil
  314. }
  315. // startJob starts a job
  316. func (im *Manager) startJob(name string) {
  317. var err error
  318. job := im.IncrementalJobMap[name]
  319. err = im.initJob(job, name)
  320. if err != nil {
  321. klog.Errorf("failed to init job (name=%s): %+v", name)
  322. return
  323. }
  324. klog.Infof("incremental job(%s) was started", name)
  325. defer klog.Infof("incremental learning job(%s) was stopped", name)
  326. // handle data from dataset
  327. go im.handleData(job)
  328. tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
  329. for {
  330. select {
  331. case <-job.JobConfig.Done:
  332. return
  333. default:
  334. }
  335. cond := im.getLatestCondition(job)
  336. jobStage := cond.Stage
  337. switch jobStage {
  338. case sednav1.ILJobTrain:
  339. err = im.trainTask(job)
  340. case sednav1.ILJobEval:
  341. err = im.evalTask(job)
  342. case sednav1.ILJobDeploy:
  343. if cond.Type == sednav1.ILJobStageCondWaiting {
  344. err = im.deployTask(job)
  345. } else if cond.Type == sednav1.ILJobStageCondRunning && job.Spec.DeploySpec.Model.HotUpdateEnabled {
  346. err = im.hotModelUpdateDeployTask(job)
  347. }
  348. default:
  349. klog.Errorf("invalid phase: %s", jobStage)
  350. continue
  351. }
  352. if err != nil {
  353. klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err)
  354. }
  355. <-tick.C
  356. }
  357. }
  358. // Insert inserts incremental-learning-job config to db
  359. func (im *Manager) Insert(message *clienttypes.Message) error {
  360. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  361. first := false
  362. job, ok := im.IncrementalJobMap[name]
  363. if !ok {
  364. job = &Job{}
  365. im.IncrementalJobMap[name] = job
  366. first = true
  367. }
  368. if err := json.Unmarshal(message.Content, &job); err != nil {
  369. return err
  370. }
  371. if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
  372. return err
  373. }
  374. if first {
  375. go im.startJob(name)
  376. }
  377. return nil
  378. }
  379. // deleteModelHotUpdateData deletes the local data of model hot update
  380. func (im *Manager) deleteModelHotUpdateData(job *Job) error {
  381. if configFile, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok {
  382. localHostDir := filepath.Dir(configFile)
  383. dir := util.AddPrefixPath(im.VolumeMountPrefix, localHostDir)
  384. if err := os.RemoveAll(dir); err != nil {
  385. return fmt.Errorf("failed to delete the dir(%s): %w", dir, err)
  386. }
  387. }
  388. return nil
  389. }
  390. // Delete deletes incremental-learning-job config in db
  391. func (im *Manager) Delete(message *clienttypes.Message) error {
  392. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  393. if job, ok := im.IncrementalJobMap[name]; ok && job.JobConfig.Done != nil {
  394. close(job.JobConfig.Done)
  395. if err := im.deleteModelHotUpdateData(job); err != nil {
  396. klog.Errorf("job(%s) failed to delete data of model hot update: %v", name, err)
  397. // continue anyway
  398. }
  399. }
  400. delete(im.IncrementalJobMap, name)
  401. if err := db.DeleteResource(name); err != nil {
  402. return err
  403. }
  404. return nil
  405. }
  406. // updateJobFromDB updates job from db
  407. func (im *Manager) updateJobFromDB(job *Job) error {
  408. var err error
  409. previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier)
  410. if err != nil {
  411. return err
  412. }
  413. m := metav1.ObjectMeta{}
  414. if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) {
  415. return err
  416. }
  417. rounds, ok := m.Annotations[AnnotationsRoundsKey]
  418. if !ok {
  419. return nil
  420. }
  421. if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil {
  422. return err
  423. }
  424. numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey]
  425. if !ok {
  426. return nil
  427. }
  428. if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil {
  429. return err
  430. }
  431. dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey]
  432. if !ok {
  433. return nil
  434. }
  435. localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "")
  436. if !job.JobConfig.Storage.IsLocalStorage {
  437. defer os.RemoveAll(localURL)
  438. }
  439. if err != nil {
  440. return err
  441. }
  442. samples, err := dataset.GetSamples(dataFileOfEval)
  443. if err != nil {
  444. klog.Errorf("read file %s failed: %v", dataFileOfEval, err)
  445. return err
  446. }
  447. job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples)
  448. return nil
  449. }
  450. // saveJobToDB saves job info to db
  451. func (im *Manager) saveJobToDB(job *Job) error {
  452. ann := job.ObjectMeta.Annotations
  453. if ann == nil {
  454. ann = make(map[string]string)
  455. }
  456. ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds)
  457. ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers)
  458. ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL
  459. return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec)
  460. }
  461. // initJob inits the job object
  462. func (im *Manager) initJob(job *Job, name string) error {
  463. job.JobConfig = new(JobConfig)
  464. jobConfig := job.JobConfig
  465. jobConfig.UniqueIdentifier = name
  466. jobConfig.Storage = storage.Storage{IsLocalStorage: false}
  467. credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
  468. if credential != "" {
  469. if err := job.JobConfig.Storage.SetCredential(credential); err != nil {
  470. return fmt.Errorf("failed to set storage credential: %w", err)
  471. }
  472. }
  473. jobConfig.Done = make(chan struct{})
  474. jobConfig.Lock = sync.Mutex{}
  475. jobConfig.Rounds = 0
  476. jobConfig.DataSamples = &DataSamples{
  477. PreviousNumbers: 0,
  478. TrainSamples: make([]string, 0),
  479. EvalVersionSamples: make([][]string, 0),
  480. EvalSamples: make([]string, 0),
  481. }
  482. trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
  483. if err != nil {
  484. return fmt.Errorf("failed to init train trigger: %+w", err)
  485. }
  486. deployTrigger, err := newTrigger(job.Spec.DeploySpec.Trigger)
  487. if err != nil {
  488. return fmt.Errorf("failed to init deploy trigger: %+w", err)
  489. }
  490. jobConfig.TrainTrigger = trainTrigger
  491. jobConfig.DeployTrigger = deployTrigger
  492. outputDir := job.Spec.OutputDir
  493. isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir)
  494. if err != nil {
  495. return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", job.Name, outputDir, err)
  496. }
  497. if isLocalURL {
  498. jobConfig.Storage.IsLocalStorage = true
  499. outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
  500. }
  501. jobConfig.OutputDir = outputDir
  502. if err := job.createOutputDir(jobConfig); err != nil {
  503. return err
  504. }
  505. if err := im.updateJobFromDB(job); err != nil {
  506. klog.Errorf("job(%s) failed to update job from db: %v", name, err)
  507. }
  508. initTriggerStatus(jobConfig)
  509. return nil
  510. }
  511. func initTriggerStatus(jobConfig *JobConfig) {
  512. jobConfig.TrainTriggerStatus = TriggerReadyStatus
  513. jobConfig.EvalTriggerStatus = TriggerReadyStatus
  514. jobConfig.DeployTriggerStatus = TriggerReadyStatus
  515. jobConfig.HotModelUpdateDeployTriggerStatus = TriggerReadyStatus
  516. }
  517. func newTrigger(t sednav1.Trigger) (trigger.Base, error) {
  518. // convert trigger to map
  519. triggerMap := make(map[string]interface{})
  520. c, err := json.Marshal(t)
  521. if err != nil {
  522. return nil, err
  523. }
  524. err = json.Unmarshal(c, &triggerMap)
  525. if err != nil {
  526. return nil, err
  527. }
  528. return trigger.NewTrigger(triggerMap)
  529. }
  530. // getModelsFromJobConditions gets models from job condition
  531. func (im *Manager) getModelsFromJobConditions(jobConditions []sednav1.ILJobCondition, stage sednav1.ILJobStage, currentType sednav1.ILJobStageConditionType, dataType string) []Model {
  532. // TODO: runtime.type changes to common.type for gm and lc
  533. for i := len(jobConditions) - 1; i >= 0; i-- {
  534. var cond gmtypes.IncrementalCondData
  535. jobCond := jobConditions[i]
  536. if jobCond.Stage == stage && jobCond.Type == currentType {
  537. if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
  538. continue
  539. }
  540. if dataType == "input" {
  541. if cond.Input == nil {
  542. continue
  543. }
  544. return cond.Input.Models
  545. } else if dataType == "output" {
  546. if cond.Output == nil {
  547. continue
  548. }
  549. return cond.Output.Models
  550. }
  551. }
  552. }
  553. return nil
  554. }
  555. // getEvalResult gets eval result from job conditions
  556. func (im *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) {
  557. jobConditions := job.Status.Conditions
  558. models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobEval, sednav1.ILJobStageCondCompleted, "output")
  559. var result []map[string][]float64
  560. var err error
  561. for _, m := range models {
  562. bytes, err := json.Marshal(m.Metrics)
  563. if err != nil {
  564. return nil, err
  565. }
  566. data := make(map[string][]float64)
  567. if err = json.Unmarshal(bytes, &data); err != nil {
  568. return nil, err
  569. }
  570. result = append(result, data)
  571. }
  572. return result, err
  573. }
  574. // getJobStageModel gets model from job conditions for train/eval/deploy
  575. func (im *Manager) getJobStageModel(job *Job, jobStage sednav1.ILJobStage) []Model {
  576. jobConditions := job.Status.Conditions
  577. switch jobStage {
  578. case sednav1.ILJobTrain:
  579. // the second model is the pre-trained model of train stage.
  580. models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output")
  581. if models != nil {
  582. return []Model{{Format: models[1].Format, URL: models[1].URL}}
  583. }
  584. case sednav1.ILJobEval:
  585. // the first model is the output model of train stage.
  586. models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output")
  587. if models != nil {
  588. return []Model{{Format: models[0].Format, URL: models[0].URL}}
  589. }
  590. case sednav1.ILJobDeploy:
  591. // two models for deploy stage:
  592. // the first model is the output model of train stage, which was evaluated as better than the second model in eval stage.
  593. // the second model is the serving model used in the inference worker.
  594. var deployModels []Model
  595. models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobEval, sednav1.ILJobStageCondReady, "input")
  596. for _, m := range models {
  597. deployModels = append(deployModels, Model{Format: m.Format, URL: m.URL})
  598. }
  599. return deployModels
  600. }
  601. return nil
  602. }
  603. // triggerTrainTask triggers the train task
  604. func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) {
  605. var err error
  606. jobConfig := job.JobConfig
  607. const numOfSamples = "num_of_samples"
  608. samples := map[string]interface{}{
  609. numOfSamples: len(jobConfig.DataSamples.TrainSamples),
  610. }
  611. isTrigger := jobConfig.TrainTrigger.Trigger(samples)
  612. if !isTrigger {
  613. return nil, false, nil
  614. }
  615. job.JobConfig.Rounds++
  616. var m *Model
  617. latestCondition := im.getLatestCondition(job)
  618. rounds := jobConfig.Rounds
  619. if rounds <= 1 {
  620. m = jobConfig.TrainModel
  621. } else {
  622. models := im.getJobStageModel(job, latestCondition.Stage)
  623. if models != nil {
  624. m = &models[0]
  625. }
  626. }
  627. var dataIndexURL string
  628. jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
  629. jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  630. if err != nil {
  631. job.JobConfig.Rounds--
  632. klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v",
  633. jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
  634. return nil, false, err
  635. }
  636. dataURL := jobConfig.TrainDataURL
  637. outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
  638. if jobConfig.Storage.IsLocalStorage {
  639. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  640. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  641. outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir)
  642. }
  643. input := clienttypes.Input{
  644. Models: []Model{*m},
  645. DataURL: dataURL,
  646. DataIndexURL: dataIndexURL,
  647. OutputDir: outputDir,
  648. }
  649. msg := clienttypes.UpstreamMessage{
  650. Phase: string(sednav1.ILJobTrain),
  651. Status: string(sednav1.ILJobStageCondReady),
  652. Input: &input,
  653. }
  654. jobConfig.TriggerTime = time.Now()
  655. return &msg, true, nil
  656. }
  657. // triggerEvalTask triggers the eval task
  658. func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, error) {
  659. jobConfig := job.JobConfig
  660. var err error
  661. latestCondition := im.getLatestCondition(job)
  662. ms := im.getJobStageModel(job, latestCondition.Stage)
  663. if ms == nil {
  664. return nil, err
  665. }
  666. models := []Model{ms[0], {
  667. Format: jobConfig.DeployModel.Format,
  668. URL: jobConfig.DeployModel.URL,
  669. }}
  670. // EvalModels has two models, first is trained model, second is deployed model
  671. jobConfig.EvalModels = models
  672. var dataIndexURL string
  673. jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
  674. job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  675. if err != nil {
  676. klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v",
  677. jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
  678. return nil, err
  679. }
  680. dataURL := jobConfig.EvalDataURL
  681. if jobConfig.Storage.IsLocalStorage {
  682. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  683. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  684. }
  685. input := clienttypes.Input{
  686. Models: models,
  687. DataURL: dataURL,
  688. DataIndexURL: dataIndexURL,
  689. }
  690. msg := &clienttypes.UpstreamMessage{
  691. Phase: string(sednav1.ILJobEval),
  692. Status: string(sednav1.ILJobStageCondReady),
  693. Input: &input,
  694. }
  695. return msg, nil
  696. }
  697. // triggerDeployTask triggers the deploy task
  698. func (im *Manager) triggerDeployTask(job *Job) (bool, error) {
  699. jobConfig := job.JobConfig
  700. evalResult, err := im.getEvalResult(job)
  701. if err != nil && len(evalResult) < 2 {
  702. klog.Errorf("job(name=%s failed to get eval result(%v): %+w", job.Name, evalResult, err)
  703. return false, err
  704. }
  705. newMetrics := evalResult[0]
  706. oldMetrics := evalResult[1]
  707. metricDelta := make(map[string]interface{})
  708. for metric := range newMetrics {
  709. // keep the full metrics
  710. metricDelta[metric] = newMetrics[metric]
  711. var l []float64
  712. for i := range newMetrics[metric] {
  713. l = append(l, newMetrics[metric][i]-oldMetrics[metric][i])
  714. }
  715. metricDelta[metric+"_delta"] = l
  716. }
  717. tt := job.Spec.DeploySpec.Trigger
  718. // convert tt to map
  719. triggerMap := make(map[string]interface{})
  720. c, err := json.Marshal(tt)
  721. if err != nil {
  722. return false, err
  723. }
  724. err = json.Unmarshal(c, &triggerMap)
  725. if err != nil {
  726. return false, err
  727. }
  728. return jobConfig.DeployTrigger.Trigger(metricDelta), nil
  729. }
  730. // updateDeployModelFile updates deploy model file
  731. func (im *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error {
  732. if job.JobConfig.Storage.IsLocalStorage {
  733. trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel)
  734. }
  735. if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil {
  736. return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w",
  737. trainedModel, deployModel, err)
  738. }
  739. klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel)
  740. return nil
  741. }
  742. // createOutputDir creates the job output dir
  743. func (job *Job) createOutputDir(jobConfig *JobConfig) error {
  744. outputDir := jobConfig.OutputDir
  745. dirNames := []string{"data/train", "data/eval", "train", "eval"}
  746. if job.JobConfig.Storage.IsLocalStorage {
  747. if err := util.CreateFolder(outputDir); err != nil {
  748. klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, outputDir, err)
  749. return err
  750. }
  751. for _, v := range dirNames {
  752. dir := path.Join(outputDir, v)
  753. if err := util.CreateFolder(dir); err != nil {
  754. klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, dir, err)
  755. return err
  756. }
  757. }
  758. }
  759. outputConfig := OutputConfig{
  760. SamplesOutput: map[string]string{
  761. "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
  762. "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
  763. },
  764. TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
  765. EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
  766. }
  767. jobConfig.OutputConfig = &outputConfig
  768. return nil
  769. }
  770. func (im *Manager) getLatestCondition(job *Job) sednav1.ILJobCondition {
  771. jobConditions := job.Status.Conditions
  772. var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{}
  773. if len(jobConditions) > 0 {
  774. // get latest pod and pod status
  775. latestCondition = jobConditions[len(jobConditions)-1]
  776. }
  777. return latestCondition
  778. }
  779. func (im *Manager) getModel(namespace string, name string) (sednav1.Model, error) {
  780. modelName := util.GetUniqueIdentifier(namespace, name, model.KindName)
  781. model, ok := im.ModelManager.GetModel(modelName)
  782. if !ok {
  783. return model, fmt.Errorf("not exists model(name=%s)", modelName)
  784. }
  785. return model, nil
  786. }
  787. // loadTrainModel loads initial model information for training.
  788. func (im *Manager) loadTrainModel(job *Job) error {
  789. jobConfig := job.JobConfig
  790. if jobConfig.TrainModel == nil {
  791. initialModel, err := im.getModel(job.Namespace, job.Spec.InitialModel.Name)
  792. if err != nil {
  793. return err
  794. }
  795. jobConfig.TrainModel = new(Model)
  796. format := initialModel.Spec.Format
  797. url := initialModel.Spec.URL
  798. jobConfig.TrainModel.Format = format
  799. jobConfig.TrainModel.URL = url
  800. }
  801. return nil
  802. }
  803. // loadDeployModel loads model information for deploying.
  804. func (im *Manager) loadDeployModel(job *Job) error {
  805. jobConfig := job.JobConfig
  806. if jobConfig.DeployModel == nil {
  807. evalModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name)
  808. if err != nil {
  809. return err
  810. }
  811. jobConfig.DeployModel = new(Model)
  812. jobConfig.DeployModel.Format = evalModel.Spec.Format
  813. jobConfig.DeployModel.URL = evalModel.Spec.URL
  814. }
  815. return nil
  816. }
  817. // loadDataset loads dataset information
  818. func (im *Manager) loadDataset(job *Job) error {
  819. if job.JobConfig.Dataset != nil {
  820. // already loaded
  821. return nil
  822. }
  823. datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, dataset.KindName)
  824. dataset, ok := im.DatasetManager.GetDataset(datasetName)
  825. if !ok || dataset == nil {
  826. return fmt.Errorf("not exists dataset(name=%s)", datasetName)
  827. }
  828. job.JobConfig.Dataset = dataset
  829. return nil
  830. }
  831. // handleData updates samples information
  832. func (im *Manager) handleData(job *Job) {
  833. tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)
  834. jobConfig := job.JobConfig
  835. iterCount := 0
  836. for {
  837. select {
  838. case <-jobConfig.Done:
  839. return
  840. default:
  841. }
  842. if iterCount%100 == 0 {
  843. klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier)
  844. }
  845. iterCount++
  846. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  847. // already loaded dataset
  848. <-tick.C
  849. continue
  850. }
  851. dataset := jobConfig.Dataset
  852. currentNumberOfSamples := dataset.DataSource.NumberOfSamples
  853. previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers
  854. if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples {
  855. samples := dataset.DataSource.TrainSamples
  856. newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples
  857. trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples))
  858. jobConfig.Lock.Lock()
  859. jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
  860. samples[previousNumberOfSamples:previousNumberOfSamples+trainNum]...)
  861. klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum)
  862. jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
  863. samples[previousNumberOfSamples+trainNum:])
  864. jobConfig.Lock.Unlock()
  865. for _, v := range jobConfig.DataSamples.EvalVersionSamples {
  866. jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
  867. }
  868. klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
  869. jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples
  870. }
  871. <-tick.C
  872. }
  873. }
  874. // createFile creates data file and data index file
  875. func createFile(dir string, format string, isLocalStorage bool) (string, string) {
  876. switch format {
  877. case dataset.TXTFormat:
  878. if isLocalStorage {
  879. return path.Join(dir, "data.txt"), ""
  880. }
  881. return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
  882. }
  883. return "", ""
  884. }
  885. // writeSamples writes samples information to a file
  886. func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
  887. if samples == nil {
  888. return "", "", fmt.Errorf("not samples")
  889. }
  890. jobConfig := job.JobConfig
  891. subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
  892. fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  893. if jobConfig.Storage.IsLocalStorage {
  894. if err := util.CreateFolder(subDir); err != nil {
  895. return "", "", err
  896. }
  897. if err := im.writeByLine(samples, fileURL); err != nil {
  898. return "", "", err
  899. }
  900. if !jobConfig.Dataset.Storage.IsLocalStorage {
  901. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  902. if err := im.writeByLine(tempSamples, absURLFile); err != nil {
  903. return "", "", err
  904. }
  905. }
  906. return fileURL, absURLFile, nil
  907. }
  908. temporaryDir, err := util.CreateTemporaryDir()
  909. if err != nil {
  910. return "", "", err
  911. }
  912. localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  913. if err := im.writeByLine(samples, localFileURL); err != nil {
  914. return "", "", err
  915. }
  916. if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil {
  917. return "", "", err
  918. }
  919. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  920. if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil {
  921. return "", "", err
  922. }
  923. if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
  924. return "", "", err
  925. }
  926. defer os.RemoveAll(localFileURL)
  927. defer os.RemoveAll(localAbsURLFile)
  928. return fileURL, absURLFile, nil
  929. }
  930. // writeByLine writes file by line
  931. func (im *Manager) writeByLine(samples []string, fileURL string) error {
  932. file, err := os.Create(fileURL)
  933. if err != nil {
  934. klog.Errorf("create file(%s) failed", fileURL)
  935. return err
  936. }
  937. w := bufio.NewWriter(file)
  938. for _, line := range samples {
  939. _, _ = fmt.Fprintln(w, line)
  940. }
  941. if err := w.Flush(); err != nil {
  942. klog.Errorf("failed to write file(%s): %v", fileURL, err)
  943. return err
  944. }
  945. if err := file.Close(); err != nil {
  946. klog.Errorf("failed to close file(%s): %v", fileURL, err)
  947. return err
  948. }
  949. return nil
  950. }
  951. // monitorWorker monitors message from worker
  952. func (im *Manager) monitorWorker() {
  953. for {
  954. workerMessageChannel := im.WorkerMessageChannel
  955. workerMessage, ok := <-workerMessageChannel
  956. if !ok {
  957. break
  958. }
  959. klog.V(4).Infof("handling worker message %+v", workerMessage)
  960. name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
  961. job, ok := im.IncrementalJobMap[name]
  962. if !ok {
  963. continue
  964. }
  965. // TODO: filter some worker messages out
  966. wo := clienttypes.Output{}
  967. wo.Models = workerMessage.Results
  968. wo.OwnerInfo = workerMessage.OwnerInfo
  969. msg := &clienttypes.UpstreamMessage{
  970. Phase: workerMessage.Kind,
  971. Status: workerMessage.Status,
  972. Output: &wo,
  973. }
  974. if err := im.Client.WriteMessage(msg, job.getHeader()); err != nil {
  975. klog.Errorf("job(%s) failed to write message: %v", name, err)
  976. continue
  977. }
  978. }
  979. }
  980. // forwardSamples deletes the samples information in the memory
  981. func forwardSamples(jobConfig *JobConfig, jobStage sednav1.ILJobStage) {
  982. switch jobStage {
  983. case sednav1.ILJobTrain:
  984. jobConfig.Lock.Lock()
  985. jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
  986. jobConfig.Lock.Unlock()
  987. case sednav1.ILJobEval:
  988. if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity {
  989. jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
  990. }
  991. }
  992. }
  993. // AddWorkerMessage adds worker messages
  994. func (im *Manager) AddWorkerMessage(message workertypes.MessageContent) {
  995. im.WorkerMessageChannel <- message
  996. }
  997. // GetName returns name of the manager
  998. func (im *Manager) GetName() string {
  999. return KindName
  1000. }
  1001. func (job *Job) getHeader() clienttypes.MessageHeader {
  1002. return clienttypes.MessageHeader{
  1003. Namespace: job.Namespace,
  1004. ResourceKind: job.Kind,
  1005. ResourceName: job.Name,
  1006. Operation: clienttypes.StatusOperation,
  1007. }
  1008. }