You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 30 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package incrementallearning
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "k8s.io/klog/v2"
  25. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  26. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  27. gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/incrementallearning"
  28. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  29. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  30. clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  31. "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset"
  32. "github.com/kubeedge/sedna/pkg/localcontroller/managers/model"
  33. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  34. "github.com/kubeedge/sedna/pkg/localcontroller/trigger"
  35. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  36. workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker"
  37. )
  38. // IncrementalLearningJob defines config for incremental-learning-job
  39. type Job struct {
  40. sednav1.IncrementalLearningJob
  41. JobConfig *JobConfig
  42. Dataset *dataset.Dataset
  43. Storage storage.Storage
  44. Done chan struct{}
  45. }
  46. // JobConfig defines config for incremental-learning-job
  47. type JobConfig struct {
  48. UniqueIdentifier string
  49. Rounds int
  50. TrainTrigger trigger.Base
  51. DeployTrigger trigger.Base
  52. TriggerTime time.Time
  53. TrainTriggerStatus string
  54. EvalTriggerStatus string
  55. TrainDataURL string
  56. EvalDataURL string
  57. OutputDir string
  58. OutputConfig *OutputConfig
  59. DataSamples *DataSamples
  60. TrainModel *Model
  61. DeployModel *Model
  62. EvalModels []Model
  63. EvalResult []Model
  64. Lock sync.Mutex
  65. }
  66. type Model = clienttypes.Model
  67. // OutputConfig defines config for job output
  68. type OutputConfig struct {
  69. SamplesOutput map[string]string `json:"trainData"`
  70. TrainOutput string `json:"trainOutput"`
  71. EvalOutput string `json:"evalOutput"`
  72. }
  73. // DataSamples defines samples information
  74. type DataSamples struct {
  75. Numbers int
  76. TrainSamples []string
  77. EvalVersionSamples [][]string
  78. EvalSamples []string
  79. }
  80. // IncrementalLearningJob defines incremental-learning-job manager
  81. type Manager struct {
  82. Client clienttypes.ClientI
  83. WorkerMessageChannel chan workertypes.MessageContent
  84. DatasetManager *dataset.Manager
  85. ModelManager *model.Manager
  86. IncrementalJobMap map[string]*Job
  87. VolumeMountPrefix string
  88. }
  89. const (
  90. // JobIterationIntervalSeconds is interval time of each iteration of job
  91. JobIterationIntervalSeconds = 10
  92. // DatasetHandlerIntervalSeconds is interval time of handling dataset
  93. DatasetHandlerIntervalSeconds = 10
  94. // EvalSamplesCapacity is capacity of eval samples
  95. EvalSamplesCapacity = 5
  96. //KindName is kind of incremental-learning-job resource
  97. KindName = "incrementallearningjob"
  98. // TriggerReadyStatus is the ready status about trigger
  99. TriggerReadyStatus = "ready"
  100. // TriggerCompletedStatus is the completed status about trigger
  101. TriggerCompletedStatus = "completed"
  102. )
  103. // New creates a incremental-learning-job manager
  104. func New(client clienttypes.ClientI, datasetManager *dataset.Manager,
  105. modelManager *model.Manager, options *options.LocalControllerOptions) *Manager {
  106. im := Manager{
  107. Client: client,
  108. WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize),
  109. DatasetManager: datasetManager,
  110. ModelManager: modelManager,
  111. IncrementalJobMap: make(map[string]*Job),
  112. VolumeMountPrefix: options.VolumeMountPrefix,
  113. }
  114. return &im
  115. }
  116. // Start starts incremental-learning-job manager
  117. func (im *Manager) Start() error {
  118. go im.monitorWorker()
  119. return nil
  120. }
  121. // trainTask starts training task
  122. func (im *Manager) trainTask(job *Job, currentRound int) error {
  123. jobConfig := job.JobConfig
  124. latestCond := im.getLatestCondition(job)
  125. jobStage := latestCond.Stage
  126. currentType := latestCond.Type
  127. if currentType == sednav1.ILJobStageCondWaiting {
  128. if job.Dataset == nil {
  129. return fmt.Errorf("job(name=%s) dataset not ready", jobConfig.UniqueIdentifier)
  130. }
  131. err := im.loadTrainModel(job)
  132. if err != nil {
  133. return fmt.Errorf("failed to sync train model, and waiting it: %v", err)
  134. }
  135. if currentRound < jobConfig.Rounds {
  136. currentRound = jobConfig.Rounds
  137. initTriggerStatus(jobConfig)
  138. }
  139. }
  140. if currentType == sednav1.ILJobStageCondWaiting && jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  141. payload, ok, err := im.triggerTrainTask(job)
  142. if !ok {
  143. return nil
  144. }
  145. if err != nil {
  146. klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
  147. jobConfig.UniqueIdentifier, jobStage, err)
  148. return err
  149. }
  150. err = im.Client.WriteMessage(payload, job.getHeader())
  151. if err != nil {
  152. klog.Errorf("job(name=%s) failed to write message: %v",
  153. jobConfig.UniqueIdentifier, err)
  154. return err
  155. }
  156. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  157. jobConfig.Rounds++
  158. forwardSamples(jobConfig, jobStage)
  159. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  160. jobConfig.UniqueIdentifier, jobStage)
  161. }
  162. return nil
  163. }
  164. // evalTask starts eval task
  165. func (im *Manager) evalTask(job *Job) error {
  166. jobConfig := job.JobConfig
  167. latestCond := im.getLatestCondition(job)
  168. jobStage := latestCond.Stage
  169. currentType := latestCond.Type
  170. if currentType == sednav1.ILJobStageCondWaiting {
  171. err := im.loadDeployModel(job)
  172. if err != nil {
  173. return fmt.Errorf("failed to sync deploy model, and waiting it: %v", err)
  174. }
  175. }
  176. if currentType == sednav1.ILJobStageCondWaiting && jobConfig.EvalTriggerStatus == TriggerReadyStatus {
  177. payload, err := im.triggerEvalTask(job)
  178. if err != nil {
  179. klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
  180. jobConfig.UniqueIdentifier, jobStage, err)
  181. return err
  182. }
  183. err = im.Client.WriteMessage(payload, job.getHeader())
  184. if err != nil {
  185. return err
  186. }
  187. jobConfig.EvalTriggerStatus = TriggerCompletedStatus
  188. forwardSamples(jobConfig, jobStage)
  189. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  190. jobConfig.UniqueIdentifier, jobStage)
  191. }
  192. return nil
  193. }
  194. // deployTask starts deploy task
  195. func (im *Manager) deployTask(job *Job) {
  196. jobConfig := job.JobConfig
  197. var err error
  198. var neededDeploy bool
  199. neededDeploy, err = im.triggerDeployTask(job)
  200. status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)}
  201. if err == nil && neededDeploy {
  202. deployModel, err := im.deployModel(job)
  203. if err != nil {
  204. klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err)
  205. } else {
  206. klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier)
  207. }
  208. if err != nil || deployModel == nil {
  209. status.Status = string(sednav1.ILJobStageCondFailed)
  210. } else {
  211. status.Status = string(sednav1.ILJobStageCondReady)
  212. status.Input = &clienttypes.Input{
  213. Models: []Model{
  214. *deployModel,
  215. },
  216. }
  217. }
  218. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  219. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  220. } else {
  221. // No need to deploy, just report completed status
  222. // TODO: instead of reporting deploy-completed, another more reasonable status
  223. klog.Infof("no need to deploy model for job(name=%s)", jobConfig.UniqueIdentifier)
  224. status.Status = string(sednav1.ILJobStageCondCompleted)
  225. }
  226. err = im.Client.WriteMessage(status, job.getHeader())
  227. if err != nil {
  228. klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
  229. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err)
  230. }
  231. klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  232. }
  233. // startJob starts a job
  234. func (im *Manager) startJob(name string) {
  235. var err error
  236. job := im.IncrementalJobMap[name]
  237. job.JobConfig = new(JobConfig)
  238. jobConfig := job.JobConfig
  239. jobConfig.UniqueIdentifier = name
  240. err = im.initJob(job)
  241. if err != nil {
  242. klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier)
  243. return
  244. }
  245. klog.Infof("incremental job(name=%s) is started", name)
  246. defer klog.Infof("incremental learning job(name=%s) is stopped", name)
  247. cond := im.getLatestCondition(job)
  248. currentType := cond.Type
  249. jobStage := cond.Stage
  250. if jobStage == sednav1.ILJobTrain && currentType == sednav1.ILJobStageCondWaiting {
  251. go im.handleData(job)
  252. }
  253. currentRound := jobConfig.Rounds
  254. tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
  255. for {
  256. select {
  257. case <-job.Done:
  258. return
  259. default:
  260. }
  261. latestCond := im.getLatestCondition(job)
  262. jobStage := latestCond.Stage
  263. switch jobStage {
  264. case sednav1.ILJobTrain:
  265. err = im.trainTask(job, currentRound)
  266. case sednav1.ILJobEval:
  267. err = im.evalTask(job)
  268. default:
  269. klog.Errorf("invalid phase: %s", jobStage)
  270. continue
  271. }
  272. if err != nil {
  273. klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
  274. jobConfig.UniqueIdentifier, jobStage, err)
  275. }
  276. <-tick.C
  277. }
  278. }
  279. // Insert inserts incremental-learning-job config to db
  280. func (im *Manager) Insert(message *clienttypes.Message) error {
  281. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  282. first := false
  283. job, ok := im.IncrementalJobMap[name]
  284. if !ok {
  285. job = &Job{}
  286. job.Storage = storage.Storage{IsLocalStorage: false}
  287. job.Done = make(chan struct{})
  288. im.IncrementalJobMap[name] = job
  289. first = true
  290. }
  291. if err := json.Unmarshal(message.Content, &job); err != nil {
  292. return err
  293. }
  294. credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
  295. if credential != "" {
  296. if err := job.Storage.SetCredential(credential); err != nil {
  297. return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err)
  298. }
  299. }
  300. if first {
  301. go im.startJob(name)
  302. }
  303. if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
  304. return err
  305. }
  306. return nil
  307. }
  308. // Delete deletes incremental-learning-job config in db
  309. func (im *Manager) Delete(message *clienttypes.Message) error {
  310. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  311. if job, ok := im.IncrementalJobMap[name]; ok && job.Done != nil {
  312. close(job.Done)
  313. }
  314. delete(im.IncrementalJobMap, name)
  315. if err := db.DeleteResource(name); err != nil {
  316. return err
  317. }
  318. return nil
  319. }
  320. // initJob inits the job object
  321. func (im *Manager) initJob(job *Job) error {
  322. jobConfig := job.JobConfig
  323. jobConfig.Lock = sync.Mutex{}
  324. jobConfig.Rounds = 1
  325. initTriggerStatus(jobConfig)
  326. trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
  327. if err != nil {
  328. return fmt.Errorf("failed to init train trigger: %+w", err)
  329. }
  330. deployTrigger, err := newTrigger(job.Spec.DeploySpec.Trigger)
  331. if err != nil {
  332. return fmt.Errorf("failed to init deploy trigger: %+w", err)
  333. }
  334. jobConfig.TrainTrigger = trainTrigger
  335. jobConfig.DeployTrigger = deployTrigger
  336. outputDir := job.Spec.OutputDir
  337. isLocalURL, err := job.Storage.IsLocalURL(outputDir)
  338. if err != nil {
  339. return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir)
  340. }
  341. if isLocalURL {
  342. job.Storage.IsLocalStorage = true
  343. outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
  344. }
  345. jobConfig.OutputDir = outputDir
  346. if err := job.createOutputDir(jobConfig); err != nil {
  347. return err
  348. }
  349. return nil
  350. }
  351. func initTriggerStatus(jobConfig *JobConfig) {
  352. jobConfig.TrainTriggerStatus = TriggerReadyStatus
  353. jobConfig.EvalTriggerStatus = TriggerReadyStatus
  354. }
  355. func newTrigger(t sednav1.Trigger) (trigger.Base, error) {
  356. // convert trigger to map
  357. triggerMap := make(map[string]interface{})
  358. c, err := json.Marshal(t)
  359. if err != nil {
  360. return nil, err
  361. }
  362. err = json.Unmarshal(c, &triggerMap)
  363. if err != nil {
  364. return nil, err
  365. }
  366. return trigger.NewTrigger(triggerMap)
  367. }
  368. // getTrainOrEvalModel gets train model or eval model from job conditions
  369. func (im *Manager) getTrainOrEvalModel(job *Job, jobStage sednav1.ILJobStage) *Model {
  370. jobConditions := job.Status.Conditions
  371. // TODO: runtime.type changes to common.type for gm and lc
  372. var models []runtime.Model
  373. for i := len(jobConditions) - 1; i >= 0; i-- {
  374. var cond gmtypes.IncrementalCondData
  375. jobCond := jobConditions[i]
  376. if jobCond.Stage == sednav1.ILJobTrain && jobCond.Type == sednav1.ILJobStageCondCompleted {
  377. if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
  378. continue
  379. }
  380. if cond.Output == nil {
  381. continue
  382. }
  383. // models list has two model, first is deploy model, second is trained model
  384. models = cond.Output.Models
  385. break
  386. }
  387. }
  388. // models must have two model file info which are output of train,
  389. // first model will be used for inference if it evaluated as excellent, second model will be used for retaining.
  390. if len(models) != 2 {
  391. return nil
  392. }
  393. switch jobStage {
  394. case sednav1.ILJobTrain:
  395. return &Model{Format: models[1].Format, URL: models[1].URL}
  396. case sednav1.ILJobEval:
  397. return &Model{Format: models[0].Format, URL: models[0].URL}
  398. }
  399. return nil
  400. }
  401. // triggerTrainTask triggers the train task
  402. func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) {
  403. var err error
  404. jobConfig := job.JobConfig
  405. const numOfSamples = "num_of_samples"
  406. samples := map[string]interface{}{
  407. numOfSamples: len(jobConfig.DataSamples.TrainSamples),
  408. }
  409. isTrigger := jobConfig.TrainTrigger.Trigger(samples)
  410. if !isTrigger {
  411. return nil, false, nil
  412. }
  413. var m *Model
  414. latestCondition := im.getLatestCondition(job)
  415. rounds := jobConfig.Rounds
  416. if rounds <= 1 {
  417. m = jobConfig.TrainModel
  418. } else {
  419. m = im.getTrainOrEvalModel(job, latestCondition.Stage)
  420. if m == nil {
  421. return nil, false, err
  422. }
  423. }
  424. var dataIndexURL string
  425. jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
  426. jobConfig.OutputConfig.SamplesOutput["train"], rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
  427. if err != nil {
  428. klog.Errorf("job(name=%s) train phase: write samples to the file(%s) is failed, error: %v",
  429. jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
  430. return nil, false, err
  431. }
  432. dataURL := jobConfig.TrainDataURL
  433. outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
  434. if job.Storage.IsLocalStorage {
  435. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  436. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  437. outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir)
  438. }
  439. input := clienttypes.Input{
  440. Models: []Model{*m},
  441. DataURL: dataURL,
  442. DataIndexURL: dataIndexURL,
  443. OutputDir: outputDir,
  444. }
  445. msg := clienttypes.UpstreamMessage{
  446. Phase: string(sednav1.ILJobTrain),
  447. Status: string(sednav1.ILJobStageCondReady),
  448. Input: &input,
  449. }
  450. jobConfig.TriggerTime = time.Now()
  451. return &msg, true, nil
  452. }
  453. // triggerEvalTask triggers the eval task
  454. func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, error) {
  455. jobConfig := job.JobConfig
  456. var err error
  457. latestCondition := im.getLatestCondition(job)
  458. m := im.getTrainOrEvalModel(job, latestCondition.Stage)
  459. if m == nil {
  460. return nil, err
  461. }
  462. models := []Model{*m, {
  463. Format: jobConfig.DeployModel.Format,
  464. URL: jobConfig.DeployModel.URL,
  465. }}
  466. // EvalModels has two models, first is trained model, second is deployed model
  467. jobConfig.EvalModels = models
  468. var dataIndexURL string
  469. rounds := jobConfig.Rounds
  470. jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
  471. rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
  472. if err != nil {
  473. klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v",
  474. jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
  475. return nil, err
  476. }
  477. dataURL := jobConfig.EvalDataURL
  478. if job.Storage.IsLocalStorage {
  479. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  480. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  481. }
  482. input := clienttypes.Input{
  483. Models: models,
  484. DataURL: dataURL,
  485. DataIndexURL: dataIndexURL,
  486. }
  487. msg := &clienttypes.UpstreamMessage{
  488. Phase: string(sednav1.ILJobEval),
  489. Status: string(sednav1.ILJobStageCondReady),
  490. Input: &input,
  491. }
  492. return msg, nil
  493. }
  494. // triggerDeployTask triggers the deploy task
  495. func (im *Manager) triggerDeployTask(job *Job) (bool, error) {
  496. jobConfig := job.JobConfig
  497. // EvalResult must has two models info, first is trained model, second is deployed model.
  498. if len(jobConfig.EvalResult) != 2 {
  499. return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult))
  500. }
  501. getMetrics := func(metrics map[string]interface{}) (map[string][]float64, error) {
  502. var err error
  503. bytes, err := json.Marshal(metrics)
  504. if err != nil {
  505. return nil, err
  506. }
  507. data := make(map[string][]float64)
  508. if err := json.Unmarshal(bytes, &data); err != nil {
  509. return nil, err
  510. }
  511. return data, err
  512. }
  513. newMetrics, err := getMetrics(jobConfig.EvalResult[0].Metrics)
  514. if err != nil {
  515. return false, err
  516. }
  517. oldMetrics, err := getMetrics(jobConfig.EvalResult[1].Metrics)
  518. if err != nil {
  519. return false, err
  520. }
  521. metricDelta := make(map[string]interface{})
  522. for metric := range newMetrics {
  523. // keep the full metrics
  524. metricDelta[metric] = newMetrics[metric]
  525. var l []float64
  526. for i := range newMetrics[metric] {
  527. l = append(l, newMetrics[metric][i]-oldMetrics[metric][i])
  528. }
  529. metricDelta[metric+"_delta"] = l
  530. }
  531. tt := job.Spec.DeploySpec.Trigger
  532. // convert tt to map
  533. triggerMap := make(map[string]interface{})
  534. c, err := json.Marshal(tt)
  535. if err != nil {
  536. return false, err
  537. }
  538. err = json.Unmarshal(c, &triggerMap)
  539. if err != nil {
  540. return false, err
  541. }
  542. return jobConfig.DeployTrigger.Trigger(metricDelta), nil
  543. }
  544. // deployModel deploys model
  545. func (im *Manager) deployModel(job *Job) (*Model, error) {
  546. jobConfig := job.JobConfig
  547. trainedModel := jobConfig.EvalModels[0].URL
  548. deployModel := jobConfig.EvalModels[1].URL
  549. if job.Storage.IsLocalStorage {
  550. trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel)
  551. }
  552. if err := job.updateDeployModel(deployModel, trainedModel); err != nil {
  553. return nil, err
  554. }
  555. klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, trainedModel)
  556. return &jobConfig.EvalModels[0], nil
  557. }
  558. func (job *Job) updateDeployModel(deployModel string, newModel string) error {
  559. if err := job.Storage.CopyFile(newModel, deployModel); err != nil {
  560. return fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v",
  561. newModel, deployModel, err)
  562. }
  563. klog.Infof("copy model(url=%s) to the deploy model(url=%s) successfully", newModel, deployModel)
  564. return nil
  565. }
  566. // createOutputDir creates the job output dir
  567. func (job *Job) createOutputDir(jobConfig *JobConfig) error {
  568. outputDir := jobConfig.OutputDir
  569. dirNames := []string{"data/train", "data/eval", "train", "eval"}
  570. if job.Storage.IsLocalStorage {
  571. if err := util.CreateFolder(outputDir); err != nil {
  572. klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir)
  573. return err
  574. }
  575. for _, v := range dirNames {
  576. dir := path.Join(outputDir, v)
  577. if err := util.CreateFolder(dir); err != nil {
  578. klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir)
  579. return err
  580. }
  581. }
  582. }
  583. outputConfig := OutputConfig{
  584. SamplesOutput: map[string]string{
  585. "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
  586. "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
  587. },
  588. TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
  589. EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
  590. }
  591. jobConfig.OutputConfig = &outputConfig
  592. return nil
  593. }
  594. func (im *Manager) getLatestCondition(job *Job) sednav1.ILJobCondition {
  595. jobConditions := job.Status.Conditions
  596. var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{}
  597. if len(jobConditions) > 0 {
  598. // get latest pod and pod status
  599. latestCondition = jobConditions[len(jobConditions)-1]
  600. }
  601. return latestCondition
  602. }
  603. func (im *Manager) getModel(namespace string, name string) (sednav1.Model, error) {
  604. modelName := util.GetUniqueIdentifier(namespace, name, model.KindName)
  605. model, ok := im.ModelManager.GetModel(modelName)
  606. if !ok {
  607. return model, fmt.Errorf("not exists model(name=%s)", modelName)
  608. }
  609. return model, nil
  610. }
  611. // loadTrainModel loads initial model information for training.
  612. func (im *Manager) loadTrainModel(job *Job) error {
  613. jobConfig := job.JobConfig
  614. if jobConfig.TrainModel == nil {
  615. initialModel, err := im.getModel(job.Namespace, job.Spec.InitialModel.Name)
  616. if err != nil {
  617. return err
  618. }
  619. jobConfig.TrainModel = new(Model)
  620. format := initialModel.Spec.Format
  621. url := initialModel.Spec.URL
  622. jobConfig.TrainModel.Format = format
  623. jobConfig.TrainModel.URL = url
  624. }
  625. return nil
  626. }
  627. // loadDeployModel loads model information for deploying.
  628. func (im *Manager) loadDeployModel(job *Job) error {
  629. jobConfig := job.JobConfig
  630. if jobConfig.DeployModel == nil {
  631. evalModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name)
  632. if err != nil {
  633. return err
  634. }
  635. jobConfig.DeployModel = new(Model)
  636. jobConfig.DeployModel.Format = evalModel.Spec.Format
  637. jobConfig.DeployModel.URL = evalModel.Spec.URL
  638. }
  639. return nil
  640. }
  641. // loadDataset loads dataset information
  642. func (im *Manager) loadDataset(job *Job) error {
  643. if job.Dataset != nil {
  644. // already loaded
  645. return nil
  646. }
  647. datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, dataset.KindName)
  648. dataset, ok := im.DatasetManager.GetDataset(datasetName)
  649. if !ok || dataset == nil {
  650. return fmt.Errorf("not exists dataset(name=%s)", datasetName)
  651. }
  652. jobConfig := job.JobConfig
  653. jobConfig.DataSamples = &DataSamples{
  654. Numbers: 0,
  655. TrainSamples: make([]string, 0),
  656. EvalVersionSamples: make([][]string, 0),
  657. EvalSamples: make([]string, 0),
  658. }
  659. job.Dataset = dataset
  660. return nil
  661. }
  662. // handleData updates samples information
  663. func (im *Manager) handleData(job *Job) {
  664. tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)
  665. jobConfig := job.JobConfig
  666. iterCount := 0
  667. for {
  668. select {
  669. case <-job.Done:
  670. return
  671. default:
  672. }
  673. // in case dataset is not synced to LC before job synced to LC
  674. // here call loadDataset in each period
  675. err := im.loadDataset(job)
  676. if iterCount%100 == 0 {
  677. klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier)
  678. }
  679. iterCount++
  680. if err != nil {
  681. klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v",
  682. jobConfig.UniqueIdentifier,
  683. err)
  684. <-tick.C
  685. continue
  686. }
  687. dataset := job.Dataset
  688. if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers {
  689. samples := dataset.DataSource.TrainSamples
  690. trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers))
  691. jobConfig.Lock.Lock()
  692. jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
  693. samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...)
  694. klog.Infof("job(name=%s) current train samples nums is %d",
  695. jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples))
  696. jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
  697. samples[(jobConfig.DataSamples.Numbers+trainNum+1):])
  698. jobConfig.Lock.Unlock()
  699. for _, v := range jobConfig.DataSamples.EvalVersionSamples {
  700. jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
  701. }
  702. klog.Infof("job(name=%s) current eval samples nums is %d",
  703. jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
  704. jobConfig.DataSamples.Numbers = len(samples)
  705. }
  706. <-tick.C
  707. }
  708. }
  709. // createFile creates data file and data index file
  710. func createFile(dir string, format string, isLocalStorage bool) (string, string) {
  711. switch format {
  712. case dataset.TXTFormat:
  713. if isLocalStorage {
  714. return path.Join(dir, "data.txt"), ""
  715. }
  716. return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
  717. }
  718. return "", ""
  719. }
  720. // writeSamples writes samples information to a file
  721. func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
  722. subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
  723. fileURL, absURLFile := createFile(subDir, format, job.Dataset.Storage.IsLocalStorage)
  724. if job.Storage.IsLocalStorage {
  725. if err := util.CreateFolder(subDir); err != nil {
  726. return "", "", err
  727. }
  728. if err := im.writeByLine(samples, fileURL); err != nil {
  729. return "", "", err
  730. }
  731. if !job.Dataset.Storage.IsLocalStorage {
  732. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  733. if err := im.writeByLine(tempSamples, absURLFile); err != nil {
  734. return "", "", err
  735. }
  736. }
  737. return fileURL, absURLFile, nil
  738. }
  739. temporaryDir, err := util.CreateTemporaryDir()
  740. if err != nil {
  741. return "", "", err
  742. }
  743. localFileURL, localAbsURLFile := createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage)
  744. if err := im.writeByLine(samples, localFileURL); err != nil {
  745. return "", "", err
  746. }
  747. if err := job.Storage.Upload(localFileURL, fileURL); err != nil {
  748. return "", "", err
  749. }
  750. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  751. if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil {
  752. return "", "", err
  753. }
  754. if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
  755. return "", "", err
  756. }
  757. defer os.RemoveAll(localFileURL)
  758. defer os.RemoveAll(localAbsURLFile)
  759. return fileURL, absURLFile, nil
  760. }
  761. // writeByLine writes file by line
  762. func (im *Manager) writeByLine(samples []string, fileURL string) error {
  763. file, err := os.Create(fileURL)
  764. if err != nil {
  765. klog.Errorf("create file(%s) failed", fileURL)
  766. return err
  767. }
  768. w := bufio.NewWriter(file)
  769. for _, line := range samples {
  770. _, _ = fmt.Fprintln(w, line)
  771. }
  772. if err := w.Flush(); err != nil {
  773. klog.Errorf("write file(%s) failed", fileURL)
  774. return err
  775. }
  776. if err := file.Close(); err != nil {
  777. klog.Errorf("close file failed, error: %v", err)
  778. return err
  779. }
  780. return nil
  781. }
  782. // monitorWorker monitors message from worker
  783. func (im *Manager) monitorWorker() {
  784. for {
  785. workerMessageChannel := im.WorkerMessageChannel
  786. workerMessage, ok := <-workerMessageChannel
  787. if !ok {
  788. break
  789. }
  790. klog.V(4).Infof("handling worker message %+v", workerMessage)
  791. name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
  792. job, ok := im.IncrementalJobMap[name]
  793. if !ok {
  794. continue
  795. }
  796. // TODO: filter some worker messages out
  797. wo := clienttypes.Output{}
  798. wo.Models = workerMessage.Results
  799. wo.OwnerInfo = workerMessage.OwnerInfo
  800. msg := &clienttypes.UpstreamMessage{
  801. Phase: workerMessage.Kind,
  802. Status: workerMessage.Status,
  803. Output: &wo,
  804. }
  805. if err := im.Client.WriteMessage(msg, job.getHeader()); err != nil {
  806. klog.Errorf("job(name=%s) failed to write message: %v", name, err)
  807. continue
  808. }
  809. im.handleWorkerMessage(job, workerMessage)
  810. }
  811. }
  812. // handleWorkerMessage handles message from worker
  813. func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) {
  814. latestCond := im.getLatestCondition(job)
  815. jobStage := strings.ToLower(string(latestCond.Stage))
  816. workerKind := strings.ToLower(workerMessage.Kind)
  817. if jobStage != workerKind {
  818. klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier,
  819. jobStage, workerKind)
  820. return
  821. }
  822. var models []Model
  823. for _, result := range workerMessage.Results {
  824. metrics := make(map[string]interface{})
  825. if m, ok := result["metrics"]; ok {
  826. bytes, err := json.Marshal(m)
  827. if err != nil {
  828. return
  829. }
  830. err = json.Unmarshal(bytes, &metrics)
  831. if err != nil {
  832. klog.Warningf("failed to unmarshal the worker(name=%s) metrics %v, err: %v",
  833. workerMessage.Name,
  834. m,
  835. err)
  836. }
  837. }
  838. model := Model{
  839. Format: result["format"].(string),
  840. URL: result["url"].(string),
  841. Metrics: metrics}
  842. models = append(models, model)
  843. }
  844. workerStatus := workerMessage.Status
  845. jobName := job.JobConfig.UniqueIdentifier
  846. if workerStatus == workertypes.CompletedStatus {
  847. klog.Infof("job(name=%s) complete the %s task successfully", jobName, jobStage)
  848. switch latestCond.Stage {
  849. case sednav1.ILJobEval:
  850. job.JobConfig.EvalResult = models
  851. // when eval worker is complete, the deploy task starts immediately without waiting for the notification of GM.
  852. im.deployTask(job)
  853. }
  854. }
  855. }
  856. // forwardSamples deletes the samples information in the memory
  857. func forwardSamples(jobConfig *JobConfig, jobStage sednav1.ILJobStage) {
  858. switch jobStage {
  859. case sednav1.ILJobTrain:
  860. jobConfig.Lock.Lock()
  861. jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
  862. jobConfig.Lock.Unlock()
  863. case sednav1.ILJobEval:
  864. if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity {
  865. jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
  866. }
  867. }
  868. }
  869. // AddWorkerMessage adds worker messages
  870. func (im *Manager) AddWorkerMessage(message workertypes.MessageContent) {
  871. im.WorkerMessageChannel <- message
  872. }
  873. // GetName returns name of the manager
  874. func (im *Manager) GetName() string {
  875. return KindName
  876. }
  877. func (job *Job) getHeader() clienttypes.MessageHeader {
  878. return clienttypes.MessageHeader{
  879. Namespace: job.Namespace,
  880. ResourceKind: job.Kind,
  881. ResourceName: job.Name,
  882. Operation: clienttypes.StatusOperation,
  883. }
  884. }