You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

upstream.go 16 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package globalmanager
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. v1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/klog/v2"
  22. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  23. clientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
  24. "github.com/kubeedge/sedna/pkg/globalmanager/config"
  25. "github.com/kubeedge/sedna/pkg/globalmanager/messagelayer"
  26. "github.com/kubeedge/sedna/pkg/globalmanager/utils"
  27. )
  28. // updateHandler handles the updates from LC(running at edge) to update the
  29. // corresponding resource
  30. type updateHandler func(namespace, name, operation string, content []byte) error
  31. // UpstreamController subscribes the updates from edge and syncs to k8s api server
  32. type UpstreamController struct {
  33. client *clientset.SednaV1alpha1Client
  34. messageLayer messagelayer.MessageLayer
  35. updateHandlers map[string]updateHandler
  36. }
  37. const upstreamStatusUpdateRetries = 3
  38. // retryUpdateStatus simply retries to call the status update func
  39. func retryUpdateStatus(name, namespace string, updateStatusFunc func() error) error {
  40. var err error
  41. for retry := 0; retry <= upstreamStatusUpdateRetries; retry++ {
  42. err = updateStatusFunc()
  43. if err == nil {
  44. return nil
  45. }
  46. klog.Warningf("Error to update %s/%s status, retried %d times: %+v", namespace, name, retry, err)
  47. }
  48. return err
  49. }
  50. func newUnmarshalError(namespace, name, operation string, content []byte) error {
  51. return fmt.Errorf("Unable to unmarshal content for (%s/%s) operation: '%s', content: '%+v'", namespace, name, operation, string(content))
  52. }
  53. func checkUpstreamOperation(operation string) error {
  54. // current only support the 'status' operation
  55. if operation != "status" {
  56. return fmt.Errorf("unknown operation %s", operation)
  57. }
  58. return nil
  59. }
  60. // updateDatasetStatus updates the dataset status
  61. func (uc *UpstreamController) updateDatasetStatus(name, namespace string, status sednav1.DatasetStatus) error {
  62. client := uc.client.Datasets(namespace)
  63. if status.UpdateTime == nil {
  64. now := metav1.Now()
  65. status.UpdateTime = &now
  66. }
  67. return retryUpdateStatus(name, namespace, func() error {
  68. dataset, err := client.Get(context.TODO(), name, metav1.GetOptions{})
  69. if err != nil {
  70. return err
  71. }
  72. dataset.Status = status
  73. _, err = client.UpdateStatus(context.TODO(), dataset, metav1.UpdateOptions{})
  74. return err
  75. })
  76. }
  77. // updateDatasetFromEdge syncs update from edge
  78. func (uc *UpstreamController) updateDatasetFromEdge(name, namespace, operation string, content []byte) error {
  79. err := checkUpstreamOperation(operation)
  80. if err != nil {
  81. return err
  82. }
  83. status := sednav1.DatasetStatus{}
  84. err = json.Unmarshal(content, &status)
  85. if err != nil {
  86. return newUnmarshalError(namespace, name, operation, content)
  87. }
  88. return uc.updateDatasetStatus(name, namespace, status)
  89. }
  90. // convertToMetrics converts the metrics from LCs to resource metrics
  91. func convertToMetrics(m map[string]interface{}) []sednav1.Metric {
  92. var l []sednav1.Metric
  93. for k, v := range m {
  94. var displayValue string
  95. switch t := v.(type) {
  96. case string:
  97. displayValue = t
  98. default:
  99. // ignore the json marshal error
  100. b, _ := json.Marshal(v)
  101. displayValue = string(b)
  102. }
  103. l = append(l, sednav1.Metric{Key: k, Value: displayValue})
  104. }
  105. return l
  106. }
  107. func (uc *UpstreamController) updateJointInferenceMetrics(name, namespace string, metrics []sednav1.Metric) error {
  108. client := uc.client.JointInferenceServices(namespace)
  109. return retryUpdateStatus(name, namespace, func() error {
  110. joint, err := client.Get(context.TODO(), name, metav1.GetOptions{})
  111. if err != nil {
  112. return err
  113. }
  114. joint.Status.Metrics = metrics
  115. _, err = client.UpdateStatus(context.TODO(), joint, metav1.UpdateOptions{})
  116. return err
  117. })
  118. }
  119. // updateJointInferenceFromEdge syncs the edge updates to k8s
  120. func (uc *UpstreamController) updateJointInferenceFromEdge(name, namespace, operation string, content []byte) error {
  121. err := checkUpstreamOperation(operation)
  122. if err != nil {
  123. return err
  124. }
  125. // Output defines owner output information
  126. type Output struct {
  127. ServiceInfo map[string]interface{} `json:"ownerInfo"`
  128. }
  129. var status struct {
  130. // Phase always should be "inference"
  131. Phase string `json:"phase"`
  132. Status string `json:"status"`
  133. Output *Output `json:"output"`
  134. }
  135. err = json.Unmarshal(content, &status)
  136. if err != nil {
  137. return newUnmarshalError(namespace, name, operation, content)
  138. }
  139. // TODO: propagate status.Status to k8s
  140. output := status.Output
  141. if output == nil || output.ServiceInfo == nil {
  142. // no output info
  143. klog.Warningf("empty status info for joint inference service %s/%s", namespace, name)
  144. return nil
  145. }
  146. info := output.ServiceInfo
  147. for _, ignoreTimeKey := range []string{
  148. "startTime",
  149. "updateTime",
  150. } {
  151. delete(info, ignoreTimeKey)
  152. }
  153. metrics := convertToMetrics(info)
  154. err = uc.updateJointInferenceMetrics(name, namespace, metrics)
  155. if err != nil {
  156. return fmt.Errorf("failed to update metrics, err:%+w", err)
  157. }
  158. return nil
  159. }
  160. func (uc *UpstreamController) updateModelMetrics(name, namespace string, metrics []sednav1.Metric) error {
  161. client := uc.client.Models(namespace)
  162. return retryUpdateStatus(name, namespace, (func() error {
  163. model, err := client.Get(context.TODO(), name, metav1.GetOptions{})
  164. if err != nil {
  165. return err
  166. }
  167. now := metav1.Now()
  168. model.Status.UpdateTime = &now
  169. model.Status.Metrics = metrics
  170. _, err = client.UpdateStatus(context.TODO(), model, metav1.UpdateOptions{})
  171. return err
  172. }))
  173. }
  174. func (uc *UpstreamController) updateModelMetricsByFederatedName(name, namespace string, metrics []sednav1.Metric) error {
  175. client := uc.client.FederatedLearningJobs(namespace)
  176. var err error
  177. federatedLearningJob, err := client.Get(context.TODO(), name, metav1.GetOptions{})
  178. if err != nil {
  179. // federated crd not found
  180. return err
  181. }
  182. modelName := federatedLearningJob.Spec.AggregationWorker.Model.Name
  183. return uc.updateModelMetrics(modelName, namespace, metrics)
  184. }
  185. func (uc *UpstreamController) appendFederatedLearningJobStatusCondition(name, namespace string, cond sednav1.FLJobCondition) error {
  186. client := uc.client.FederatedLearningJobs(namespace)
  187. return retryUpdateStatus(name, namespace, (func() error {
  188. job, err := client.Get(context.TODO(), name, metav1.GetOptions{})
  189. if err != nil {
  190. return err
  191. }
  192. job.Status.Conditions = append(job.Status.Conditions, cond)
  193. _, err = client.UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
  194. return err
  195. }))
  196. }
  197. // updateFederatedLearningJobFromEdge updates the federated job's status
  198. func (uc *UpstreamController) updateFederatedLearningJobFromEdge(name, namespace, operation string, content []byte) (err error) {
  199. err = checkUpstreamOperation(operation)
  200. if err != nil {
  201. return err
  202. }
  203. // JobInfo defines the job information
  204. type JobInfo struct {
  205. // Current training round
  206. CurrentRound int `json:"currentRound"`
  207. UpdateTime string `json:"updateTime"`
  208. }
  209. // Output defines job output information
  210. type Output struct {
  211. Models []Model `json:"models"`
  212. JobInfo *JobInfo `json:"ownerInfo"`
  213. }
  214. var status struct {
  215. Phase string `json:"phase"`
  216. Status string `json:"status"`
  217. Output *Output `json:"output"`
  218. }
  219. err = json.Unmarshal(content, &status)
  220. if err != nil {
  221. err = newUnmarshalError(namespace, name, operation, content)
  222. return
  223. }
  224. output := status.Output
  225. if output != nil {
  226. // Update the model's metrics
  227. if len(output.Models) > 0 {
  228. // only one model
  229. model := output.Models[0]
  230. metrics := convertToMetrics(model.Metrics)
  231. if len(metrics) > 0 {
  232. uc.updateModelMetricsByFederatedName(name, namespace, metrics)
  233. }
  234. }
  235. jobInfo := output.JobInfo
  236. // update job info if having any info
  237. if jobInfo != nil && jobInfo.CurrentRound > 0 {
  238. // Find a good place to save the progress info
  239. // TODO: more meaningful reason/message
  240. reason := "DoTraining"
  241. message := fmt.Sprintf("Round %v reaches at %s", jobInfo.CurrentRound, jobInfo.UpdateTime)
  242. cond := NewFLJobCondition(sednav1.FLJobCondTraining, reason, message)
  243. uc.appendFederatedLearningJobStatusCondition(name, namespace, cond)
  244. }
  245. }
  246. return nil
  247. }
  248. func (uc *UpstreamController) appendIncrementalLearningJobStatusCondition(name, namespace string, cond sednav1.ILJobCondition) error {
  249. client := uc.client.IncrementalLearningJobs(namespace)
  250. return retryUpdateStatus(name, namespace, (func() error {
  251. job, err := client.Get(context.TODO(), name, metav1.GetOptions{})
  252. if err != nil {
  253. return err
  254. }
  255. job.Status.Conditions = append(job.Status.Conditions, cond)
  256. _, err = client.UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
  257. return err
  258. }))
  259. }
  260. // updateIncrementalLearningFromEdge syncs the edge updates to k8s
  261. func (uc *UpstreamController) updateIncrementalLearningFromEdge(name, namespace, operation string, content []byte) error {
  262. err := checkUpstreamOperation(operation)
  263. if err != nil {
  264. return err
  265. }
  266. var jobStatus struct {
  267. Phase string `json:"phase"`
  268. Status string `json:"status"`
  269. }
  270. err = json.Unmarshal(content, &jobStatus)
  271. if err != nil {
  272. return newUnmarshalError(namespace, name, operation, content)
  273. }
  274. // Get the condition data.
  275. // Here unmarshal and marshal immediately to skip the unnecessary fields
  276. var condData IncrementalCondData
  277. err = json.Unmarshal(content, &condData)
  278. if err != nil {
  279. return newUnmarshalError(namespace, name, operation, content)
  280. }
  281. condDataBytes, _ := json.Marshal(&condData)
  282. cond := sednav1.ILJobCondition{
  283. Status: v1.ConditionTrue,
  284. LastHeartbeatTime: metav1.Now(),
  285. LastTransitionTime: metav1.Now(),
  286. Data: string(condDataBytes),
  287. Message: "reported by lc",
  288. }
  289. switch strings.ToLower(jobStatus.Phase) {
  290. case "train":
  291. cond.Stage = sednav1.ILJobTrain
  292. case "eval":
  293. cond.Stage = sednav1.ILJobEval
  294. case "deploy":
  295. cond.Stage = sednav1.ILJobDeploy
  296. default:
  297. return fmt.Errorf("invalid condition stage: %v", jobStatus.Phase)
  298. }
  299. switch strings.ToLower(jobStatus.Status) {
  300. case "ready":
  301. cond.Type = sednav1.ILJobStageCondReady
  302. case "completed":
  303. cond.Type = sednav1.ILJobStageCondCompleted
  304. case "failed":
  305. cond.Type = sednav1.ILJobStageCondFailed
  306. case "waiting":
  307. cond.Type = sednav1.ILJobStageCondWaiting
  308. default:
  309. return fmt.Errorf("invalid condition type: %v", jobStatus.Status)
  310. }
  311. err = uc.appendIncrementalLearningJobStatusCondition(name, namespace, cond)
  312. if err != nil {
  313. return fmt.Errorf("failed to append condition, err:%+w", err)
  314. }
  315. return nil
  316. }
  317. func (uc *UpstreamController) appendLifelongLearningJobStatusCondition(name, namespace string, cond sednav1.LLJobCondition) error {
  318. client := uc.client.LifelongLearningJobs(namespace)
  319. return retryUpdateStatus(name, namespace, func() error {
  320. job, err := client.Get(context.TODO(), name, metav1.GetOptions{})
  321. if err != nil {
  322. return err
  323. }
  324. job.Status.Conditions = append(job.Status.Conditions, cond)
  325. _, err = client.UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
  326. return err
  327. })
  328. }
  329. // updateLifelongLearningJobFromEdge syncs the edge updates to k8s
  330. func (uc *UpstreamController) updateLifelongLearningJobFromEdge(name, namespace, operation string, content []byte) error {
  331. err := checkUpstreamOperation(operation)
  332. if err != nil {
  333. return err
  334. }
  335. var jobStatus struct {
  336. Phase string `json:"phase"`
  337. Status string `json:"status"`
  338. }
  339. err = json.Unmarshal(content, &jobStatus)
  340. if err != nil {
  341. return newUnmarshalError(namespace, name, operation, content)
  342. }
  343. // Get the condition data.
  344. // Here unmarshal and marshal immediately to skip the unnecessary fields
  345. var condData LifelongLearningCondData
  346. err = json.Unmarshal(content, &condData)
  347. if err != nil {
  348. return newUnmarshalError(namespace, name, operation, content)
  349. }
  350. condDataBytes, _ := json.Marshal(&condData)
  351. cond := sednav1.LLJobCondition{
  352. Status: v1.ConditionTrue,
  353. LastHeartbeatTime: metav1.Now(),
  354. LastTransitionTime: metav1.Now(),
  355. Data: string(condDataBytes),
  356. Message: "reported by lc",
  357. }
  358. switch strings.ToLower(jobStatus.Phase) {
  359. case "train":
  360. cond.Stage = sednav1.LLJobTrain
  361. case "eval":
  362. cond.Stage = sednav1.LLJobEval
  363. case "deploy":
  364. cond.Stage = sednav1.LLJobDeploy
  365. default:
  366. return fmt.Errorf("invalid condition stage: %v", jobStatus.Phase)
  367. }
  368. switch strings.ToLower(jobStatus.Status) {
  369. case "ready":
  370. cond.Type = sednav1.LLJobStageCondReady
  371. case "completed":
  372. cond.Type = sednav1.LLJobStageCondCompleted
  373. case "failed":
  374. cond.Type = sednav1.LLJobStageCondFailed
  375. case "waiting":
  376. cond.Type = sednav1.LLJobStageCondWaiting
  377. default:
  378. return fmt.Errorf("invalid condition type: %v", jobStatus.Status)
  379. }
  380. err = uc.appendLifelongLearningJobStatusCondition(name, namespace, cond)
  381. if err != nil {
  382. return fmt.Errorf("failed to append condition, err:%+w", err)
  383. }
  384. return nil
  385. }
  386. // syncEdgeUpdate receives the updates from edge and syncs these to k8s.
  387. func (uc *UpstreamController) syncEdgeUpdate() {
  388. for {
  389. select {
  390. case <-uc.messageLayer.Done():
  391. klog.Info("Stop sedna upstream loop")
  392. return
  393. default:
  394. }
  395. update, err := uc.messageLayer.ReceiveResourceUpdate()
  396. if err != nil {
  397. klog.Warningf("Ignore update since this err: %+v", err)
  398. continue
  399. }
  400. kind := update.Kind
  401. namespace := update.Namespace
  402. name := update.Name
  403. operation := update.Operation
  404. handler, ok := uc.updateHandlers[kind]
  405. if ok {
  406. err := handler(name, namespace, operation, update.Content)
  407. if err != nil {
  408. klog.Errorf("Error to handle %s %s/%s operation(%s): %+v", kind, namespace, name, operation, err)
  409. }
  410. } else {
  411. klog.Warningf("No handler for resource kind %s", kind)
  412. }
  413. }
  414. }
  415. // Start the upstream controller
  416. func (uc *UpstreamController) Start() error {
  417. klog.Info("Start the sedna upstream controller")
  418. go uc.syncEdgeUpdate()
  419. return nil
  420. }
  421. // GetName returns the name of the upstream controller
  422. func (uc *UpstreamController) GetName() string {
  423. return "UpstreamController"
  424. }
  425. // NewUpstreamController creates a new Upstream controller from config
  426. func NewUpstreamController(cfg *config.ControllerConfig) (FeatureControllerI, error) {
  427. client, err := utils.NewCRDClient()
  428. if err != nil {
  429. return nil, fmt.Errorf("create crd client failed with error: %w", err)
  430. }
  431. uc := &UpstreamController{
  432. client: client,
  433. messageLayer: messagelayer.NewContextMessageLayer(),
  434. }
  435. // NOTE: current no direct model update from edge,
  436. // model update will be triggered by the corresponding training feature
  437. uc.updateHandlers = map[string]updateHandler{
  438. "dataset": uc.updateDatasetFromEdge,
  439. "jointinferenceservice": uc.updateJointInferenceFromEdge,
  440. "federatedlearningjob": uc.updateFederatedLearningJobFromEdge,
  441. "incrementallearningjob": uc.updateIncrementalLearningFromEdge,
  442. "lifelonglearningjob": uc.updateLifelongLearningJobFromEdge,
  443. }
  444. return uc, nil
  445. }