You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

downstream.go 6.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package incrementallearning
  14. import (
  15. "context"
  16. "fmt"
  17. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "k8s.io/apimachinery/pkg/watch"
  20. "k8s.io/klog/v2"
  21. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  22. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  23. )
  24. // syncModelWithName will sync the model to the specified node.
  25. // Now called when creating the incrementaljob.
  26. func (c *Controller) syncModelWithName(nodeName, modelName, namespace string) error {
  27. model, err := c.client.Models(namespace).Get(context.TODO(), modelName, metav1.GetOptions{})
  28. if err != nil {
  29. // TODO: maybe use err.ErrStatus.Code == 404
  30. return fmt.Errorf("model(%s/%s) not found", namespace, modelName)
  31. }
  32. // Since model.Kind may be empty,
  33. // we need to fix the kind here if missing.
  34. // more details at https://github.com/kubernetes/kubernetes/issues/3030
  35. if len(model.Kind) == 0 {
  36. model.Kind = "Model"
  37. }
  38. runtime.InjectSecretAnnotations(c.kubeClient, model, model.Spec.CredentialName)
  39. c.sendToEdgeFunc(nodeName, watch.Added, model)
  40. return nil
  41. }
  42. func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) error {
  43. job, ok := obj.(*sednav1.IncrementalLearningJob)
  44. if !ok {
  45. return nil
  46. }
  47. // Since Kind may be empty,
  48. // we need to fix the kind here if missing.
  49. // more details at https://github.com/kubernetes/kubernetes/issues/3030
  50. job.Kind = KindName
  51. jobConditions := job.Status.Conditions
  52. if len(jobConditions) == 0 {
  53. return nil
  54. }
  55. dataName := job.Spec.Dataset.Name
  56. ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{})
  57. if err != nil {
  58. return fmt.Errorf("dataset(%s/%s) not found", job.Namespace, dataName)
  59. }
  60. // LC has dataset object on this node that may call dataset node
  61. dsNodeName := ds.Spec.NodeName
  62. var trainNodeName string
  63. var evalNodeName string
  64. var deployNodeName string
  65. getAnnotationsNodeName := func(nodeName sednav1.ILJobStage) string {
  66. return runtime.AnnotationsKeyPrefix + string(nodeName)
  67. }
  68. ann := job.GetAnnotations()
  69. if ann != nil {
  70. trainNodeName = ann[getAnnotationsNodeName(sednav1.ILJobTrain)]
  71. evalNodeName = ann[getAnnotationsNodeName(sednav1.ILJobEval)]
  72. if _, ok := ann[runtime.ModelHotUpdateAnnotationsKey]; ok {
  73. deployNodeName = ann[getAnnotationsNodeName(sednav1.ILJobDeploy)]
  74. }
  75. }
  76. if eventType == watch.Deleted {
  77. // delete jobs from all LCs
  78. nodes := sets.NewString(dsNodeName, trainNodeName, evalNodeName, deployNodeName)
  79. for node := range nodes {
  80. c.sendToEdgeFunc(node, eventType, job)
  81. }
  82. return nil
  83. }
  84. latestCondition := jobConditions[len(jobConditions)-1]
  85. currentType := latestCondition.Type
  86. jobStage := latestCondition.Stage
  87. syncModelWithName := func(modelName string, nodeName string) {
  88. if err := c.syncModelWithName(nodeName, modelName, job.Namespace); err != nil {
  89. klog.Warningf("Error to sync model %s when sync incremental learning job %s to node %s: %v",
  90. modelName, job.Name, nodeName, err)
  91. }
  92. }
  93. syncJobWithNodeName := func(nodeName string) {
  94. if err := c.sendToEdgeFunc(nodeName, eventType, job); err != nil {
  95. klog.Warningf("Error to sync incremental learning job %s to node %s in stage %s: %v",
  96. job.Name, nodeName, jobStage, err)
  97. }
  98. }
  99. runtime.InjectSecretAnnotations(c.kubeClient, job, job.Spec.CredentialName)
  100. // isJobResidentNode checks whether nodeName is a job resident node
  101. isJobResidentNode := func(nodeName string) bool {
  102. // the node where LC monitors dataset and the node where inference worker is running are job resident node
  103. if nodeName == dsNodeName || nodeName == deployNodeName {
  104. return true
  105. }
  106. return false
  107. }
  108. // delete job
  109. deleteJob := func(nodeName string) {
  110. if !isJobResidentNode(nodeName) {
  111. // delete LC's job from nodeName that's different from dataset node when worker's status
  112. // is completed or failed.
  113. c.sendToEdgeFunc(nodeName, watch.Deleted, job)
  114. }
  115. }
  116. switch currentType {
  117. case sednav1.ILJobStageCondWaiting:
  118. switch jobStage {
  119. case sednav1.ILJobTrain:
  120. syncModelWithName(job.Spec.InitialModel.Name, dsNodeName)
  121. syncJobWithNodeName(dsNodeName)
  122. case sednav1.ILJobEval:
  123. syncModelWithName(job.Spec.DeploySpec.Model.Name, dsNodeName)
  124. if job.Spec.EvalSpec.InitialModel != nil {
  125. syncModelWithName(job.Spec.EvalSpec.InitialModel.Name, dsNodeName)
  126. }
  127. syncJobWithNodeName(dsNodeName)
  128. case sednav1.ILJobDeploy:
  129. deployNodeName = evalNodeName
  130. syncModelWithName(job.Spec.DeploySpec.Model.Name, evalNodeName)
  131. if job.Spec.EvalSpec.InitialModel != nil && !job.Spec.DeploySpec.Model.HotUpdateEnabled {
  132. syncModelWithName(job.Spec.EvalSpec.InitialModel.Name, deployNodeName)
  133. }
  134. syncJobWithNodeName(deployNodeName)
  135. }
  136. case sednav1.ILJobStageCondRunning:
  137. switch jobStage {
  138. case sednav1.ILJobTrain:
  139. syncJobWithNodeName(trainNodeName)
  140. case sednav1.ILJobEval:
  141. if trainNodeName != evalNodeName && trainNodeName != dsNodeName {
  142. c.sendToEdgeFunc(trainNodeName, watch.Deleted, job)
  143. }
  144. syncJobWithNodeName(evalNodeName)
  145. case sednav1.ILJobDeploy:
  146. if evalNodeName != deployNodeName && evalNodeName != dsNodeName {
  147. c.sendToEdgeFunc(evalNodeName, watch.Deleted, job)
  148. }
  149. if job.Spec.EvalSpec.InitialModel != nil {
  150. syncModelWithName(job.Spec.EvalSpec.InitialModel.Name, deployNodeName)
  151. }
  152. syncModelWithName(job.Spec.DeploySpec.Model.Name, deployNodeName)
  153. syncJobWithNodeName(deployNodeName)
  154. }
  155. case sednav1.ILJobStageCondCompleted, sednav1.ILJobStageCondFailed:
  156. if !job.Spec.DeploySpec.Model.HotUpdateEnabled {
  157. deployNodeName = evalNodeName
  158. }
  159. switch jobStage {
  160. case sednav1.ILJobTrain:
  161. deleteJob(trainNodeName)
  162. case sednav1.ILJobEval:
  163. deleteJob(evalNodeName)
  164. case sednav1.ILJobDeploy:
  165. deleteJob(deployNodeName)
  166. }
  167. }
  168. return nil
  169. }
  170. func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error {
  171. c.sendToEdgeFunc = f
  172. return nil
  173. }