You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 30 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package manager
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "k8s.io/klog/v2"
  25. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  26. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  27. gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/incrementallearning"
  28. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  29. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  30. "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  31. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  32. "github.com/kubeedge/sedna/pkg/localcontroller/trigger"
  33. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  34. )
  35. // IncrementalLearningJob defines config for incremental-learning-job
  36. type IncrementalLearningJob struct {
  37. sednav1.IncrementalLearningJob
  38. JobConfig *JobConfig
  39. Dataset *Dataset
  40. Done chan struct{}
  41. Storage storage.Storage
  42. }
  43. // JobConfig defines config for incremental-learning-job
  44. type JobConfig struct {
  45. UniqueIdentifier string
  46. Rounds int
  47. TrainTrigger trigger.Base
  48. DeployTrigger trigger.Base
  49. TriggerTime time.Time
  50. TrainTriggerStatus string
  51. EvalTriggerStatus string
  52. TrainDataURL string
  53. EvalDataURL string
  54. OutputDir string
  55. OutputConfig *OutputConfig
  56. DataSamples *DataSamples
  57. TrainModel *ModelInfo
  58. DeployModel *ModelInfo
  59. EvalModels []ModelInfo
  60. EvalResult []ModelInfo
  61. Lock sync.Mutex
  62. }
  63. // OutputConfig defines config for job output
  64. type OutputConfig struct {
  65. SamplesOutput map[string]string `json:"trainData"`
  66. TrainOutput string `json:"trainOutput"`
  67. EvalOutput string `json:"evalOutput"`
  68. }
  69. // DataSamples defines samples information
  70. type DataSamples struct {
  71. Numbers int
  72. TrainSamples []string
  73. EvalVersionSamples [][]string
  74. EvalSamples []string
  75. }
  76. // IncrementalLearningJob defines incremental-learning-job manager
  77. type IncrementalJobManager struct {
  78. Client gmclient.ClientI
  79. WorkerMessageChannel chan WorkerMessage
  80. DatasetManager *DatasetManager
  81. ModelManager *ModelManager
  82. IncrementalJobMap map[string]*IncrementalLearningJob
  83. VolumeMountPrefix string
  84. }
  85. const (
  86. // JobIterationIntervalSeconds is interval time of each iteration of job
  87. JobIterationIntervalSeconds = 10
  88. // DatasetHandlerIntervalSeconds is interval time of handling dataset
  89. DatasetHandlerIntervalSeconds = 10
  90. // EvalSamplesCapacity is capacity of eval samples
  91. EvalSamplesCapacity = 5
  92. //IncrementalLearningJobKind is kind of incremental-learning-job resource
  93. IncrementalLearningJobKind = "incrementallearningjob"
  94. )
  95. // NewIncrementalJobManager creates a incremental-learning-job manager
  96. func NewIncrementalJobManager(client gmclient.ClientI, datasetManager *DatasetManager,
  97. modelManager *ModelManager, options *options.LocalControllerOptions) *IncrementalJobManager {
  98. im := IncrementalJobManager{
  99. Client: client,
  100. WorkerMessageChannel: make(chan WorkerMessage, WorkerMessageChannelCacheSize),
  101. DatasetManager: datasetManager,
  102. ModelManager: modelManager,
  103. IncrementalJobMap: make(map[string]*IncrementalLearningJob),
  104. VolumeMountPrefix: options.VolumeMountPrefix,
  105. }
  106. return &im
  107. }
  108. // Start starts incremental-learning-job manager
  109. func (im *IncrementalJobManager) Start() error {
  110. go im.monitorWorker()
  111. return nil
  112. }
  113. // trainTask starts training task
  114. func (im *IncrementalJobManager) trainTask(job *IncrementalLearningJob, currentRound int) error {
  115. jobConfig := job.JobConfig
  116. latestCond := im.getLatestCondition(job)
  117. jobStage := latestCond.Stage
  118. currentType := latestCond.Type
  119. if currentType == sednav1.ILJobStageCondWaiting {
  120. if job.Dataset == nil {
  121. return fmt.Errorf("job(name=%s) dataset not ready", jobConfig.UniqueIdentifier)
  122. }
  123. err := im.loadTrainModel(job)
  124. if err != nil {
  125. return fmt.Errorf("failed to sync train model, and waiting it: %v", err)
  126. }
  127. if currentRound < jobConfig.Rounds {
  128. currentRound = jobConfig.Rounds
  129. initTriggerStatus(jobConfig)
  130. }
  131. }
  132. if currentType == sednav1.ILJobStageCondWaiting && jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  133. payload, ok, err := im.triggerTrainTask(job)
  134. if !ok {
  135. return nil
  136. }
  137. if err != nil {
  138. klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
  139. jobConfig.UniqueIdentifier, jobStage, err)
  140. return err
  141. }
  142. err = im.Client.WriteMessage(payload, job.getHeader())
  143. if err != nil {
  144. klog.Errorf("job(name=%s) failed to write message: %v",
  145. jobConfig.UniqueIdentifier, err)
  146. return err
  147. }
  148. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  149. jobConfig.Rounds++
  150. forwardSamples(jobConfig, jobStage)
  151. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  152. jobConfig.UniqueIdentifier, jobStage)
  153. }
  154. return nil
  155. }
  156. // evalTask starts eval task
  157. func (im *IncrementalJobManager) evalTask(job *IncrementalLearningJob) error {
  158. jobConfig := job.JobConfig
  159. latestCond := im.getLatestCondition(job)
  160. jobStage := latestCond.Stage
  161. currentType := latestCond.Type
  162. if currentType == sednav1.ILJobStageCondWaiting {
  163. err := im.loadDeployModel(job)
  164. if err != nil {
  165. return fmt.Errorf("failed to sync deploy model, and waiting it: %v", err)
  166. }
  167. }
  168. if currentType == sednav1.ILJobStageCondWaiting && jobConfig.EvalTriggerStatus == TriggerReadyStatus {
  169. payload, err := im.triggerEvalTask(job)
  170. if err != nil {
  171. klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
  172. jobConfig.UniqueIdentifier, jobStage, err)
  173. return err
  174. }
  175. err = im.Client.WriteMessage(payload, job.getHeader())
  176. if err != nil {
  177. return err
  178. }
  179. jobConfig.EvalTriggerStatus = TriggerCompletedStatus
  180. forwardSamples(jobConfig, jobStage)
  181. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  182. jobConfig.UniqueIdentifier, jobStage)
  183. }
  184. return nil
  185. }
  186. // deployTask starts deploy task
  187. func (im *IncrementalJobManager) deployTask(job *IncrementalLearningJob) {
  188. jobConfig := job.JobConfig
  189. var err error
  190. var neededDeploy bool
  191. neededDeploy, err = im.triggerDeployTask(job)
  192. status := UpstreamMessage{Phase: string(sednav1.ILJobDeploy)}
  193. if err == nil && neededDeploy {
  194. deployModel, err := im.deployModel(job)
  195. if err != nil {
  196. klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err)
  197. } else {
  198. klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier)
  199. }
  200. if err != nil || deployModel == nil {
  201. status.Status = string(sednav1.ILJobStageCondFailed)
  202. } else {
  203. status.Status = string(sednav1.ILJobStageCondReady)
  204. status.Input = &WorkerInput{
  205. Models: []ModelInfo{
  206. *deployModel,
  207. },
  208. }
  209. }
  210. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  211. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  212. } else {
  213. // No need to deploy, just report completed status
  214. // TODO: instead of reporting deploy-completed, another more reasonable status
  215. klog.Infof("no need to deploy model for job(name=%s)", jobConfig.UniqueIdentifier)
  216. status.Status = string(sednav1.ILJobStageCondCompleted)
  217. }
  218. err = im.Client.WriteMessage(status, job.getHeader())
  219. if err != nil {
  220. klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
  221. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err)
  222. }
  223. klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  224. }
  225. // startJob starts a job
  226. func (im *IncrementalJobManager) startJob(name string) {
  227. var err error
  228. job := im.IncrementalJobMap[name]
  229. job.JobConfig = new(JobConfig)
  230. jobConfig := job.JobConfig
  231. jobConfig.UniqueIdentifier = name
  232. err = im.initJob(job)
  233. if err != nil {
  234. klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier)
  235. return
  236. }
  237. klog.Infof("incremental job(name=%s) is started", name)
  238. defer klog.Infof("incremental learning job(name=%s) is stopped", name)
  239. cond := im.getLatestCondition(job)
  240. currentType := cond.Type
  241. jobStage := cond.Stage
  242. if jobStage == sednav1.ILJobTrain && currentType == sednav1.ILJobStageCondWaiting {
  243. go im.handleData(job)
  244. }
  245. currentRound := jobConfig.Rounds
  246. tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
  247. for {
  248. select {
  249. case <-job.Done:
  250. return
  251. default:
  252. }
  253. latestCond := im.getLatestCondition(job)
  254. jobStage := latestCond.Stage
  255. switch jobStage {
  256. case sednav1.ILJobTrain:
  257. err = im.trainTask(job, currentRound)
  258. case sednav1.ILJobEval:
  259. err = im.evalTask(job)
  260. default:
  261. klog.Errorf("invalid phase: %s", jobStage)
  262. continue
  263. }
  264. if err != nil {
  265. klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
  266. jobConfig.UniqueIdentifier, jobStage, err)
  267. }
  268. <-tick.C
  269. }
  270. }
  271. // Insert inserts incremental-learning-job config to db
  272. func (im *IncrementalJobManager) Insert(message *gmclient.Message) error {
  273. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  274. first := false
  275. job, ok := im.IncrementalJobMap[name]
  276. if !ok {
  277. job = &IncrementalLearningJob{}
  278. job.Storage = storage.Storage{IsLocalStorage: false}
  279. job.Done = make(chan struct{})
  280. im.IncrementalJobMap[name] = job
  281. first = true
  282. }
  283. if err := json.Unmarshal(message.Content, &job); err != nil {
  284. return err
  285. }
  286. credential := job.ObjectMeta.Annotations[CredentialAnnotationKey]
  287. if credential != "" {
  288. if err := job.Storage.SetCredential(credential); err != nil {
  289. return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err)
  290. }
  291. }
  292. if first {
  293. go im.startJob(name)
  294. }
  295. if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
  296. return err
  297. }
  298. return nil
  299. }
  300. // Delete deletes incremental-learning-job config in db
  301. func (im *IncrementalJobManager) Delete(message *gmclient.Message) error {
  302. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  303. if job, ok := im.IncrementalJobMap[name]; ok && job.Done != nil {
  304. close(job.Done)
  305. }
  306. delete(im.IncrementalJobMap, name)
  307. if err := db.DeleteResource(name); err != nil {
  308. return err
  309. }
  310. return nil
  311. }
  312. // initJob inits the job object
  313. func (im *IncrementalJobManager) initJob(job *IncrementalLearningJob) error {
  314. jobConfig := job.JobConfig
  315. jobConfig.Lock = sync.Mutex{}
  316. jobConfig.Rounds = 1
  317. initTriggerStatus(jobConfig)
  318. trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
  319. if err != nil {
  320. return fmt.Errorf("failed to init train trigger: %+w", err)
  321. }
  322. deployTrigger, err := newTrigger(job.Spec.DeploySpec.Trigger)
  323. if err != nil {
  324. return fmt.Errorf("failed to init deploy trigger: %+w", err)
  325. }
  326. jobConfig.TrainTrigger = trainTrigger
  327. jobConfig.DeployTrigger = deployTrigger
  328. outputDir := job.Spec.OutputDir
  329. isLocalURL, err := job.Storage.IsLocalURL(outputDir)
  330. if err != nil {
  331. return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir)
  332. }
  333. if isLocalURL {
  334. job.Storage.IsLocalStorage = true
  335. outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
  336. }
  337. jobConfig.OutputDir = outputDir
  338. if err := job.createOutputDir(jobConfig); err != nil {
  339. return err
  340. }
  341. return nil
  342. }
  343. func initTriggerStatus(jobConfig *JobConfig) {
  344. jobConfig.TrainTriggerStatus = TriggerReadyStatus
  345. jobConfig.EvalTriggerStatus = TriggerReadyStatus
  346. }
  347. func newTrigger(t sednav1.Trigger) (trigger.Base, error) {
  348. // convert trigger to map
  349. triggerMap := make(map[string]interface{})
  350. c, err := json.Marshal(t)
  351. if err != nil {
  352. return nil, err
  353. }
  354. err = json.Unmarshal(c, &triggerMap)
  355. if err != nil {
  356. return nil, err
  357. }
  358. return trigger.NewTrigger(triggerMap)
  359. }
  360. // getTrainOrEvalModel gets train model or eval model from job conditions
  361. func (im *IncrementalJobManager) getTrainOrEvalModel(job *IncrementalLearningJob, jobStage sednav1.ILJobStage) *ModelInfo {
  362. jobConditions := job.Status.Conditions
  363. // TODO: runtime.type changes to common.type for gm and lc
  364. var models []runtime.Model
  365. for i := len(jobConditions) - 1; i >= 0; i-- {
  366. var cond gmtypes.IncrementalCondData
  367. jobCond := jobConditions[i]
  368. if jobCond.Stage == sednav1.ILJobTrain && jobCond.Type == sednav1.ILJobStageCondCompleted {
  369. if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
  370. continue
  371. }
  372. if cond.Output == nil {
  373. continue
  374. }
  375. // models list has two model, first is deploy model, second is trained model
  376. models = cond.Output.Models
  377. break
  378. }
  379. }
  380. // models must have two model file info which are output of train,
  381. // first model will be used for inference if it evaluated as excellent, second model will be used for retaining.
  382. if len(models) != 2 {
  383. return nil
  384. }
  385. switch jobStage {
  386. case sednav1.ILJobTrain:
  387. return &ModelInfo{Format: models[1].Format, URL: models[1].URL}
  388. case sednav1.ILJobEval:
  389. return &ModelInfo{Format: models[0].Format, URL: models[0].URL}
  390. }
  391. return nil
  392. }
  393. // triggerTrainTask triggers the train task
  394. func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) (interface{}, bool, error) {
  395. var err error
  396. jobConfig := job.JobConfig
  397. const numOfSamples = "num_of_samples"
  398. samples := map[string]interface{}{
  399. numOfSamples: len(jobConfig.DataSamples.TrainSamples),
  400. }
  401. isTrigger := jobConfig.TrainTrigger.Trigger(samples)
  402. if !isTrigger {
  403. return nil, false, nil
  404. }
  405. var m *ModelInfo
  406. latestCondition := im.getLatestCondition(job)
  407. rounds := jobConfig.Rounds
  408. if rounds <= 1 {
  409. m = jobConfig.TrainModel
  410. } else {
  411. m = im.getTrainOrEvalModel(job, latestCondition.Stage)
  412. if m == nil {
  413. return nil, false, err
  414. }
  415. }
  416. var dataIndexURL string
  417. jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
  418. jobConfig.OutputConfig.SamplesOutput["train"], rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
  419. if err != nil {
  420. klog.Errorf("job(name=%s) train phase: write samples to the file(%s) is failed, error: %v",
  421. jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
  422. return nil, false, err
  423. }
  424. dataURL := jobConfig.TrainDataURL
  425. outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
  426. if job.Storage.IsLocalStorage {
  427. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  428. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  429. outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir)
  430. }
  431. input := WorkerInput{
  432. Models: []ModelInfo{*m},
  433. DataURL: dataURL,
  434. DataIndexURL: dataIndexURL,
  435. OutputDir: outputDir,
  436. }
  437. msg := UpstreamMessage{
  438. Phase: string(sednav1.ILJobTrain),
  439. Status: string(sednav1.ILJobStageCondReady),
  440. Input: &input,
  441. }
  442. jobConfig.TriggerTime = time.Now()
  443. return &msg, true, nil
  444. }
  445. // triggerEvalTask triggers the eval task
  446. func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (*UpstreamMessage, error) {
  447. jobConfig := job.JobConfig
  448. var err error
  449. latestCondition := im.getLatestCondition(job)
  450. m := im.getTrainOrEvalModel(job, latestCondition.Stage)
  451. if m == nil {
  452. return nil, err
  453. }
  454. models := []ModelInfo{*m, {
  455. Format: jobConfig.DeployModel.Format,
  456. URL: jobConfig.DeployModel.URL,
  457. }}
  458. // EvalModels has two models, first is trained model, second is deployed model
  459. jobConfig.EvalModels = models
  460. var dataIndexURL string
  461. rounds := jobConfig.Rounds
  462. jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
  463. rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
  464. if err != nil {
  465. klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v",
  466. jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
  467. return nil, err
  468. }
  469. dataURL := jobConfig.EvalDataURL
  470. if job.Storage.IsLocalStorage {
  471. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  472. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  473. }
  474. input := WorkerInput{
  475. Models: models,
  476. DataURL: dataURL,
  477. DataIndexURL: dataIndexURL,
  478. }
  479. msg := &UpstreamMessage{
  480. Phase: string(sednav1.ILJobEval),
  481. Status: string(sednav1.ILJobStageCondReady),
  482. Input: &input,
  483. }
  484. return msg, nil
  485. }
  486. // triggerDeployTask triggers the deploy task
  487. func (im *IncrementalJobManager) triggerDeployTask(job *IncrementalLearningJob) (bool, error) {
  488. jobConfig := job.JobConfig
  489. // EvalResult must has two models info, first is trained model, second is deployed model.
  490. if len(jobConfig.EvalResult) != 2 {
  491. return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult))
  492. }
  493. newMetrics, oldMetrics := jobConfig.EvalResult[0].Metrics, jobConfig.EvalResult[1].Metrics
  494. metricDelta := make(map[string]interface{})
  495. for metric := range newMetrics {
  496. // keep the full metrics
  497. metricDelta[metric] = newMetrics[metric]
  498. var l []float64
  499. for i := range newMetrics[metric] {
  500. l = append(l, newMetrics[metric][i]-oldMetrics[metric][i])
  501. }
  502. metricDelta[metric+"_delta"] = l
  503. }
  504. tt := job.Spec.DeploySpec.Trigger
  505. // convert tt to map
  506. triggerMap := make(map[string]interface{})
  507. c, err := json.Marshal(tt)
  508. if err != nil {
  509. return false, err
  510. }
  511. err = json.Unmarshal(c, &triggerMap)
  512. if err != nil {
  513. return false, err
  514. }
  515. return jobConfig.DeployTrigger.Trigger(metricDelta), nil
  516. }
  517. // deployModel deploys model
  518. func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*ModelInfo, error) {
  519. jobConfig := job.JobConfig
  520. trainedModel := jobConfig.EvalModels[0].URL
  521. deployModel := jobConfig.EvalModels[1].URL
  522. if job.Storage.IsLocalStorage {
  523. trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel)
  524. }
  525. if err := job.updateDeployModel(deployModel, trainedModel); err != nil {
  526. return nil, err
  527. }
  528. klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, trainedModel)
  529. return &jobConfig.EvalModels[0], nil
  530. }
  531. func (job *IncrementalLearningJob) updateDeployModel(deployModel string, newModel string) error {
  532. if err := job.Storage.CopyFile(newModel, deployModel); err != nil {
  533. return fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v",
  534. newModel, deployModel, err)
  535. }
  536. klog.Infof("copy model(url=%s) to the deploy model(url=%s) successfully", newModel, deployModel)
  537. return nil
  538. }
  539. // createOutputDir creates the job output dir
  540. func (job *IncrementalLearningJob) createOutputDir(jobConfig *JobConfig) error {
  541. outputDir := jobConfig.OutputDir
  542. dirNames := []string{"data/train", "data/eval", "train", "eval"}
  543. if job.Storage.IsLocalStorage {
  544. if err := util.CreateFolder(outputDir); err != nil {
  545. klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir)
  546. return err
  547. }
  548. for _, v := range dirNames {
  549. dir := path.Join(outputDir, v)
  550. if err := util.CreateFolder(dir); err != nil {
  551. klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir)
  552. return err
  553. }
  554. }
  555. }
  556. outputConfig := OutputConfig{
  557. SamplesOutput: map[string]string{
  558. "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
  559. "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
  560. },
  561. TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
  562. EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
  563. }
  564. jobConfig.OutputConfig = &outputConfig
  565. return nil
  566. }
  567. func (im *IncrementalJobManager) getLatestCondition(job *IncrementalLearningJob) sednav1.ILJobCondition {
  568. jobConditions := job.Status.Conditions
  569. var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{}
  570. if len(jobConditions) > 0 {
  571. // get latest pod and pod status
  572. latestCondition = jobConditions[len(jobConditions)-1]
  573. }
  574. return latestCondition
  575. }
  576. func (im *IncrementalJobManager) getModel(namespace string, name string) (sednav1.Model, error) {
  577. modelName := util.GetUniqueIdentifier(namespace, name, ModelResourceKind)
  578. model, ok := im.ModelManager.GetModel(modelName)
  579. if !ok {
  580. return model, fmt.Errorf("not exists model(name=%s)", modelName)
  581. }
  582. return model, nil
  583. }
  584. // loadTrainModel loads initial model information for training.
  585. func (im *IncrementalJobManager) loadTrainModel(job *IncrementalLearningJob) error {
  586. jobConfig := job.JobConfig
  587. if jobConfig.TrainModel == nil {
  588. initialModel, err := im.getModel(job.Namespace, job.Spec.InitialModel.Name)
  589. if err != nil {
  590. return err
  591. }
  592. jobConfig.TrainModel = new(ModelInfo)
  593. format := initialModel.Spec.Format
  594. url := initialModel.Spec.URL
  595. jobConfig.TrainModel.Format = format
  596. jobConfig.TrainModel.URL = url
  597. }
  598. return nil
  599. }
  600. // loadDeployModel loads model information for deploying.
  601. func (im *IncrementalJobManager) loadDeployModel(job *IncrementalLearningJob) error {
  602. jobConfig := job.JobConfig
  603. if jobConfig.DeployModel == nil {
  604. evalModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name)
  605. if err != nil {
  606. return err
  607. }
  608. jobConfig.DeployModel = new(ModelInfo)
  609. jobConfig.DeployModel.Format = evalModel.Spec.Format
  610. jobConfig.DeployModel.URL = evalModel.Spec.URL
  611. }
  612. return nil
  613. }
  614. // loadDataset loads dataset information
  615. func (im *IncrementalJobManager) loadDataset(job *IncrementalLearningJob) error {
  616. if job.Dataset != nil {
  617. // already loaded
  618. return nil
  619. }
  620. datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, DatasetResourceKind)
  621. dataset, ok := im.DatasetManager.GetDataset(datasetName)
  622. if !ok || dataset == nil {
  623. return fmt.Errorf("not exists dataset(name=%s)", datasetName)
  624. }
  625. jobConfig := job.JobConfig
  626. jobConfig.DataSamples = &DataSamples{
  627. Numbers: 0,
  628. TrainSamples: make([]string, 0),
  629. EvalVersionSamples: make([][]string, 0),
  630. EvalSamples: make([]string, 0),
  631. }
  632. job.Dataset = dataset
  633. return nil
  634. }
  635. // handleData updates samples information
  636. func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) {
  637. tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)
  638. jobConfig := job.JobConfig
  639. iterCount := 0
  640. for {
  641. select {
  642. case <-job.Done:
  643. return
  644. default:
  645. }
  646. // in case dataset is not synced to LC before job synced to LC
  647. // here call loadDataset in each period
  648. err := im.loadDataset(job)
  649. if iterCount%100 == 0 {
  650. klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier)
  651. }
  652. iterCount++
  653. if err != nil {
  654. klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v",
  655. jobConfig.UniqueIdentifier,
  656. err)
  657. <-tick.C
  658. continue
  659. }
  660. dataset := job.Dataset
  661. if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers {
  662. samples := dataset.DataSource.TrainSamples
  663. trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers))
  664. jobConfig.Lock.Lock()
  665. jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
  666. samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...)
  667. klog.Infof("job(name=%s) current train samples nums is %d",
  668. jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples))
  669. jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
  670. samples[(jobConfig.DataSamples.Numbers+trainNum+1):])
  671. jobConfig.Lock.Unlock()
  672. for _, v := range jobConfig.DataSamples.EvalVersionSamples {
  673. jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
  674. }
  675. klog.Infof("job(name=%s) current eval samples nums is %d",
  676. jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
  677. jobConfig.DataSamples.Numbers = len(samples)
  678. }
  679. <-tick.C
  680. }
  681. }
  682. // createFile creates data file and data index file
  683. func createFile(dir string, format string, isLocalStorage bool) (string, string) {
  684. switch format {
  685. case "txt":
  686. if isLocalStorage {
  687. return path.Join(dir, "data.txt"), ""
  688. }
  689. return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
  690. }
  691. return "", ""
  692. }
  693. // writeSamples writes samples information to a file
  694. func (im *IncrementalJobManager) writeSamples(job *IncrementalLearningJob, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
  695. subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
  696. fileURL, absURLFile := createFile(subDir, format, job.Dataset.Storage.IsLocalStorage)
  697. if job.Storage.IsLocalStorage {
  698. if err := util.CreateFolder(subDir); err != nil {
  699. return "", "", err
  700. }
  701. if err := im.writeByLine(samples, fileURL); err != nil {
  702. return "", "", err
  703. }
  704. if !job.Dataset.Storage.IsLocalStorage {
  705. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  706. if err := im.writeByLine(tempSamples, absURLFile); err != nil {
  707. return "", "", err
  708. }
  709. }
  710. return fileURL, absURLFile, nil
  711. }
  712. temporaryDir, err := util.CreateTemporaryDir()
  713. if err != nil {
  714. return "", "", err
  715. }
  716. localFileURL, localAbsURLFile := createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage)
  717. if err := im.writeByLine(samples, localFileURL); err != nil {
  718. return "", "", err
  719. }
  720. if err := job.Storage.Upload(localFileURL, fileURL); err != nil {
  721. return "", "", err
  722. }
  723. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  724. if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil {
  725. return "", "", err
  726. }
  727. if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
  728. return "", "", err
  729. }
  730. defer os.RemoveAll(localFileURL)
  731. defer os.RemoveAll(localAbsURLFile)
  732. return fileURL, absURLFile, nil
  733. }
  734. // writeByLine writes file by line
  735. func (im *IncrementalJobManager) writeByLine(samples []string, fileURL string) error {
  736. file, err := os.Create(fileURL)
  737. if err != nil {
  738. klog.Errorf("create file(%s) failed", fileURL)
  739. return err
  740. }
  741. w := bufio.NewWriter(file)
  742. for _, line := range samples {
  743. _, _ = fmt.Fprintln(w, line)
  744. }
  745. if err := w.Flush(); err != nil {
  746. klog.Errorf("write file(%s) failed", fileURL)
  747. return err
  748. }
  749. if err := file.Close(); err != nil {
  750. klog.Errorf("close file failed, error: %v", err)
  751. return err
  752. }
  753. return nil
  754. }
  755. // monitorWorker monitors message from worker
  756. func (im *IncrementalJobManager) monitorWorker() {
  757. for {
  758. workerMessageChannel := im.WorkerMessageChannel
  759. workerMessage, ok := <-workerMessageChannel
  760. if !ok {
  761. break
  762. }
  763. klog.V(4).Infof("handling worker message %+v", workerMessage)
  764. name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
  765. job, ok := im.IncrementalJobMap[name]
  766. if !ok {
  767. continue
  768. }
  769. // TODO: filter some worker messages out
  770. wo := WorkerOutput{}
  771. wo.Models = workerMessage.Results
  772. wo.OwnerInfo = workerMessage.OwnerInfo
  773. msg := &UpstreamMessage{
  774. Phase: workerMessage.Kind,
  775. Status: workerMessage.Status,
  776. Output: &wo,
  777. }
  778. im.Client.WriteMessage(msg, job.getHeader())
  779. im.handleWorkerMessage(job, workerMessage)
  780. }
  781. }
  782. // handleWorkerMessage handles message from worker
  783. func (im *IncrementalJobManager) handleWorkerMessage(job *IncrementalLearningJob, workerMessage WorkerMessage) {
  784. latestCond := im.getLatestCondition(job)
  785. jobStage := strings.ToLower(string(latestCond.Stage))
  786. workerKind := strings.ToLower(workerMessage.Kind)
  787. if jobStage != workerKind {
  788. klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier,
  789. jobStage, workerKind)
  790. return
  791. }
  792. var models []ModelInfo
  793. for _, result := range workerMessage.Results {
  794. metrics := map[string][]float64{}
  795. if m, ok := result["metrics"]; ok {
  796. bytes, err := json.Marshal(m)
  797. if err != nil {
  798. return
  799. }
  800. err = json.Unmarshal(bytes, &metrics)
  801. if err != nil {
  802. klog.Warningf("failed to unmarshal the worker(name=%s) metrics %v, err: %v",
  803. workerMessage.Name,
  804. m,
  805. err)
  806. }
  807. }
  808. model := ModelInfo{
  809. result["format"].(string),
  810. result["url"].(string),
  811. metrics}
  812. models = append(models, model)
  813. }
  814. workerStatus := workerMessage.Status
  815. jobName := job.JobConfig.UniqueIdentifier
  816. if workerStatus == WorkerCompletedStatus {
  817. klog.Infof("job(name=%s) complete the %s task successfully", jobName, jobStage)
  818. switch latestCond.Stage {
  819. case sednav1.ILJobEval:
  820. job.JobConfig.EvalResult = models
  821. // when eval worker is complete, the deploy task starts immediately without waiting for the notification of GM.
  822. im.deployTask(job)
  823. }
  824. }
  825. }
  826. // forwardSamples deletes the samples information in the memory
  827. func forwardSamples(jobConfig *JobConfig, jobStage sednav1.ILJobStage) {
  828. switch jobStage {
  829. case sednav1.ILJobTrain:
  830. jobConfig.Lock.Lock()
  831. jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
  832. jobConfig.Lock.Unlock()
  833. case sednav1.ILJobEval:
  834. if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity {
  835. jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
  836. }
  837. }
  838. }
  839. // AddWorkerMessage adds worker messages
  840. func (im *IncrementalJobManager) AddWorkerMessage(message WorkerMessage) {
  841. im.WorkerMessageChannel <- message
  842. }
  843. // GetName returns name of the manager
  844. func (im *IncrementalJobManager) GetName() string {
  845. return IncrementalLearningJobKind
  846. }
  847. func (job *IncrementalLearningJob) getHeader() gmclient.MessageHeader {
  848. return gmclient.MessageHeader{
  849. Namespace: job.Namespace,
  850. ResourceKind: job.Kind,
  851. ResourceName: job.Name,
  852. Operation: gmclient.StatusOperation,
  853. }
  854. }