You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

downstream.go 9.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package globalmanager
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/fields"
  20. "k8s.io/apimachinery/pkg/runtime"
  21. "k8s.io/apimachinery/pkg/watch"
  22. "k8s.io/client-go/kubernetes"
  23. "k8s.io/client-go/tools/cache"
  24. "k8s.io/klog/v2"
  25. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  26. clientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
  27. "github.com/kubeedge/sedna/pkg/globalmanager/config"
  28. "github.com/kubeedge/sedna/pkg/globalmanager/messagelayer"
  29. "github.com/kubeedge/sedna/pkg/globalmanager/utils"
  30. )
  31. // DownstreamController watch kubernetes api server and send the controller resource change to edge
  32. type DownstreamController struct {
  33. // events from watch kubernetes api server
  34. events chan watch.Event
  35. cfg *config.ControllerConfig
  36. client *clientset.SednaV1alpha1Client
  37. kubeClient kubernetes.Interface
  38. messageLayer messagelayer.MessageLayer
  39. }
  40. func (dc *DownstreamController) injectSecret(obj CommonInterface, secretName string) error {
  41. if secretName == "" {
  42. return nil
  43. }
  44. secret, err := dc.kubeClient.CoreV1().Secrets(obj.GetNamespace()).Get(context.TODO(), secretName, metav1.GetOptions{})
  45. if err != nil {
  46. klog.Warningf("failed to get the secret %s: %+v",
  47. secretName, err)
  48. return err
  49. }
  50. InjectSecretObj(obj, secret)
  51. return err
  52. }
  53. // syncDataset syncs the dataset resources
  54. func (dc *DownstreamController) syncDataset(eventType watch.EventType, dataset *sednav1.Dataset) error {
  55. // Here only propagate to the nodes with non empty name
  56. nodeName := dataset.Spec.NodeName
  57. if len(nodeName) == 0 {
  58. return fmt.Errorf("empty node name")
  59. }
  60. dc.injectSecret(dataset, dataset.Spec.CredentialName)
  61. return dc.messageLayer.SendResourceObject(nodeName, eventType, dataset)
  62. }
  63. // syncJointInferenceService syncs the joint-inference-service resources
  64. func (dc *DownstreamController) syncJointInferenceService(eventType watch.EventType, joint *sednav1.JointInferenceService) error {
  65. // Here only propagate to the nodes with non empty name
  66. // FIXME: only the case that Spec.NodeName specified is support
  67. nodeName := joint.Spec.EdgeWorker.Template.Spec.NodeName
  68. if len(nodeName) == 0 {
  69. return fmt.Errorf("empty node name")
  70. }
  71. return dc.messageLayer.SendResourceObject(nodeName, eventType, joint)
  72. }
  73. // syncFederatedLearningJob syncs the federated resources
  74. func (dc *DownstreamController) syncFederatedLearningJob(eventType watch.EventType, job *sednav1.FederatedLearningJob) error {
  75. // broadcast to all nodes specified in spec
  76. nodeset := make(map[string]bool)
  77. for _, trainingWorker := range job.Spec.TrainingWorkers {
  78. // Here only propagate to the nodes with non empty name
  79. if len(trainingWorker.Template.Spec.NodeName) > 0 {
  80. nodeset[trainingWorker.Template.Spec.NodeName] = true
  81. }
  82. }
  83. for nodeName := range nodeset {
  84. dc.messageLayer.SendResourceObject(nodeName, eventType, job)
  85. }
  86. return nil
  87. }
  88. // syncModelWithName will sync the model to the specified node.
  89. // Now called when creating the incrementaljob.
  90. func (dc *DownstreamController) syncModelWithName(nodeName, modelName, namespace string) error {
  91. model, err := dc.client.Models(namespace).Get(context.TODO(), modelName, metav1.GetOptions{})
  92. if err != nil {
  93. // TODO: maybe use err.ErrStatus.Code == 404
  94. return fmt.Errorf("model(%s/%s) not found", namespace, modelName)
  95. }
  96. // Since model.Kind may be empty,
  97. // we need to fix the kind here if missing.
  98. // more details at https://github.com/kubernetes/kubernetes/issues/3030
  99. if len(model.Kind) == 0 {
  100. model.Kind = "Model"
  101. }
  102. dc.injectSecret(model, model.Spec.CredentialName)
  103. dc.messageLayer.SendResourceObject(nodeName, watch.Added, model)
  104. return nil
  105. }
  106. // syncIncrementalJob syncs the incremental learning jobs
  107. func (dc *DownstreamController) syncIncrementalJob(eventType watch.EventType, job *sednav1.IncrementalLearningJob) error {
  108. // Here only propagate to the nodes with non empty name
  109. // FIXME(llhuii): only the case that all workers having the same nodeName are support,
  110. // will support Spec.NodeSelector and differenect nodeName.
  111. nodeName := job.Spec.TrainSpec.Template.Spec.NodeName
  112. if len(nodeName) == 0 {
  113. return fmt.Errorf("empty node name")
  114. }
  115. dc.injectSecret(job, job.Spec.CredentialName)
  116. // Sync the model info to edgenode when the job is created
  117. if eventType == watch.Added {
  118. models := make(map[string]bool)
  119. for _, modelName := range []string{
  120. job.Spec.InitialModel.Name,
  121. job.Spec.DeploySpec.Model.Name,
  122. } {
  123. models[modelName] = true
  124. }
  125. for modelName := range models {
  126. err := dc.syncModelWithName(nodeName, modelName, job.Namespace)
  127. if err != nil {
  128. klog.Warningf("Error to sync model %s when sync incremental learning job %s to node %s: %v", modelName, job.Name, nodeName, err)
  129. }
  130. }
  131. } else if eventType == watch.Deleted {
  132. // noop
  133. }
  134. dc.messageLayer.SendResourceObject(nodeName, eventType, job)
  135. return nil
  136. }
  137. // sync defines the entrypoint of syncing all resources
  138. func (dc *DownstreamController) sync(stopCh <-chan struct{}) {
  139. for {
  140. select {
  141. case <-stopCh:
  142. klog.Info("Stop controller downstream loop")
  143. return
  144. case e := <-dc.events:
  145. var err error
  146. var kind, namespace, name string
  147. switch t := e.Object.(type) {
  148. case (*sednav1.Dataset):
  149. // Since t.Kind may be empty,
  150. // we need to fix the kind here if missing.
  151. // more details at https://github.com/kubernetes/kubernetes/issues/3030
  152. if len(t.Kind) == 0 {
  153. t.Kind = "Dataset"
  154. }
  155. kind = t.Kind
  156. namespace = t.Namespace
  157. name = t.Name
  158. err = dc.syncDataset(e.Type, t)
  159. case (*sednav1.JointInferenceService):
  160. // TODO: find a good way to avoid these duplicate codes
  161. if len(t.Kind) == 0 {
  162. t.Kind = "JointInferenceService"
  163. }
  164. kind = t.Kind
  165. namespace = t.Namespace
  166. name = t.Name
  167. err = dc.syncJointInferenceService(e.Type, t)
  168. case (*sednav1.FederatedLearningJob):
  169. if len(t.Kind) == 0 {
  170. t.Kind = "FederatedLearningJob"
  171. }
  172. kind = t.Kind
  173. namespace = t.Namespace
  174. name = t.Name
  175. err = dc.syncFederatedLearningJob(e.Type, t)
  176. case (*sednav1.IncrementalLearningJob):
  177. if len(t.Kind) == 0 {
  178. t.Kind = "IncrementalLearningJob"
  179. }
  180. kind = t.Kind
  181. namespace = t.Namespace
  182. name = t.Name
  183. err = dc.syncIncrementalJob(e.Type, t)
  184. default:
  185. klog.Warningf("object type: %T unsupported", e)
  186. continue
  187. }
  188. if err != nil {
  189. klog.Warningf("Error to sync %s(%s/%s), err: %+v", kind, namespace, name, err)
  190. } else {
  191. klog.V(2).Infof("synced %s(%s/%s)", kind, namespace, name)
  192. }
  193. }
  194. }
  195. }
  196. // watch function watches the crd resources which should by synced to nodes
  197. func (dc *DownstreamController) watch(stopCh <-chan struct{}) {
  198. rh := cache.ResourceEventHandlerFuncs{
  199. AddFunc: func(obj interface{}) {
  200. eventObj := obj.(runtime.Object)
  201. dc.events <- watch.Event{Type: watch.Added, Object: eventObj}
  202. },
  203. UpdateFunc: func(old, cur interface{}) {
  204. // Since we don't support the spec update operation currently,
  205. // so only status updates arrive here and NO propagation to edge.
  206. // Update:
  207. // We sync it to edge when using self-built websocket, and
  208. // this sync isn't needed when we switch out self-built websocket.
  209. dc.events <- watch.Event{Type: watch.Added, Object: cur.(runtime.Object)}
  210. },
  211. DeleteFunc: func(obj interface{}) {
  212. eventObj := obj.(runtime.Object)
  213. dc.events <- watch.Event{Type: watch.Deleted, Object: eventObj}
  214. },
  215. }
  216. client := dc.client.RESTClient()
  217. // make this option configurable
  218. resyncPeriod := time.Second * 60
  219. namespace := dc.cfg.Namespace
  220. // TODO: use the informer
  221. for resourceName, object := range map[string]runtime.Object{
  222. "datasets": &sednav1.Dataset{},
  223. "jointinferenceservices": &sednav1.JointInferenceService{},
  224. "federatedlearningjobs": &sednav1.FederatedLearningJob{},
  225. "incrementallearningjobs": &sednav1.IncrementalLearningJob{},
  226. } {
  227. lw := cache.NewListWatchFromClient(client, resourceName, namespace, fields.Everything())
  228. si := cache.NewSharedInformer(lw, object, resyncPeriod)
  229. si.AddEventHandler(rh)
  230. go si.Run(stopCh)
  231. }
  232. }
  233. // Start starts the controller
  234. func (dc *DownstreamController) Start() error {
  235. stopCh := dc.messageLayer.Done()
  236. // watch is an asynchronous call
  237. dc.watch(stopCh)
  238. // sync is a synchronous call
  239. go dc.sync(stopCh)
  240. return nil
  241. }
  242. // GetName returns the name of the downstream controller
  243. func (dc *DownstreamController) GetName() string {
  244. return "DownstreamController"
  245. }
  246. // NewDownstreamController creates a controller DownstreamController from config
  247. func NewDownstreamController(cfg *config.ControllerConfig) (FeatureControllerI, error) {
  248. // TODO: make bufferSize configurable
  249. bufferSize := 10
  250. events := make(chan watch.Event, bufferSize)
  251. crdclient, err := utils.NewCRDClient()
  252. if err != nil {
  253. return nil, fmt.Errorf("create crd client failed with error: %w", err)
  254. }
  255. kubeClient, err := utils.KubeClient()
  256. if err != nil {
  257. return nil, err
  258. }
  259. dc := &DownstreamController{
  260. cfg: cfg,
  261. events: events,
  262. client: crdclient,
  263. kubeClient: kubeClient,
  264. messageLayer: messagelayer.NewContextMessageLayer(),
  265. }
  266. return dc, nil
  267. }