Sedna leverages KubeEdge's edge-cloud collaboration capabilities to enable cross-edge-cloud collaborative training and inference. Joint inference and federated learning are essential applications of Sedna's edge-cloud collaborative inference. However, the current Sedna project has several functional deficiencies in its joint inference and federated learning controllers, primarily reflected in the following aspects:
This proposal aims to optimize the joint inference and federated learning controllers to improve task reconstruction and cascade deletion, thereby enhancing the controllers' functionality and application efficiency.
The goals of the optimization of Sedna's joint inference and federated learning controllers optimization are to:
The optimization of the Sedna Joint Inference and Federated Learning controllers only modifies the Joint Inference and Federated Learning controller code. The controllers are decoupled from each other, and the functions that were changed have been updated in the affected controllers as well. Since no changes were made to the CRD, the deployment of existing applications will not be impacted.
The main changes are in the files /pkg/globalmanager/controllers/jointinference/jointinferenceservice.go and /pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go.
Additionally, in the file /pkg/globalmanager/runtime/worker.go, to meet the need for creating a deployment in joint inference, we modified the CreateDeploymentWithTemplate function and the injectDeploymentParam. The necessary updates were also made in the files/pkg/globalmanager/controllers/objectsearch/objectsearchservice.go and /pkg/globalmanager/controllers/featureextraction/featureextractionservice.go where these functions are used. We also added a new function, UpdateDeploymentWithTemplate, to handle deployment updates.
The test files are /pkg/globalmanager/controllers/jointinference/jointinferenceservice_test.go and /pkg/globalmanager/controllers/federatedlearning/federatedlearningjob_test.go.
The test files introduced the fake package, which brought in some additional dependency code, increasing the pull request size.
This project focuses on optimizing the logic of joint inference and federated learning task controllers, with no impact on other modules.
Cascade Deletion of Joint Inference and Federated Learning Tasks
Self-Healing Capability of Joint Inference and Federated Learning Tasks
Testing and Verification
Documentation Updates
Users of Sedna's joint inference and federated learning applications.
In Kubernetes, the Owner Reference provides information about the relationships between objects, allowing the control plane and other API clients to clean up associated resources when an object is deleted. The garbage collector can implement cascade deletion functionality based on this relationship.
Each resource's metadata includes an ownerReferences field, an array indicating its owners. When an owner resource is deleted, it is removed from this array, and the garbage collector will reclaim the resource once all owners are deleted.
The Owner Reference relationships for JointInferenceService is depicted in the following diagrams:
The Owner Reference relationships for FederatedLearningJob is depicted in the following diagrams:
Taking the Joint Inference Controller as an example, the Owner Reference setting process involves several steps:
In pkg/globalmanager/controllers/jointinference/jointinferenceservice.go, the controller name and CR kind name are defined:
// Name is this controller name
Name = "JointInference"
// KindName is the kind name of CR this controller controls
KindName = "JointInferenceService"
The GroupVersionKind for the controller type is defined as follows:
// Kind contains the schema.GroupVersionKind for this controller type.
var Kind = sednav1.SchemeGroupVersion.WithKind(Name)
The run function starts the controller, defines the number of workers, and sets up worker threads:
for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) }
In the worker function, tasks are dequeued, processed, and marked as done:
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the sync is never invoked concurrently with the same key.
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
The sync function parses the key to get the namespace and name:
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
}
if len(ns) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid jointinference service key %q: either namespace or name is missing", key)
}
It then retrieves the JointInferenceService object from the lister:
sharedService, err := c.serviceLister.JointInferenceServices(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.V(4).Infof("JointInferenceService has been deleted: %v", key)
return true, nil
}
return false, err
}
service := *sharedService
The GroupVersionKind is set:
service.SetGroupVersionKind(Kind)
A selector is generated, and associated pods are listed:
selector, _ := runtime.GenerateSelector(&service)
pods, err := c.podStore.Pods(service.Namespace).List(selector)
if err != nil {
return false, err
}
klog.V(4).Infof("list jointinference service %v/%v, %v pods: %v", service.Namespace, service.Name, len(pods), pods)
When no worker failures occur, and if no associated pods exist, the createWorkers function is called to create pods:
else {
if len(pods) == 0 {
active, manageServiceErr = c.createWorkers(&service)
}
In the createWorkers function, createCloudWorker and createEdgeWorker are called to create cloud and edge worker pods. The runtime.CreatePodWithTemplate function creates pods, with the OwnerReference set as follows:
if controllerRef != nil {
pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef)
}
The automatic reconstruction of a failed Pod is determined by the RestartPolicy. In the JointInferenceService, the RestartPolicy is not explicitly set, so the default is always. During the joint inference task, if an issue arises, such as EdgeMesh being improperly configured, preventing the edge from accessing the cloud's port 5000 for cloud-based inference, the edge pod will continuously restart. In the FederatedLearningJob, the RestartPolicy is set to OnFailure.
Pod Reconstruction
The automatic reconstruction of a failed Pod is determined by the RestartPolicy. In the JointInferenceService, the RestartPolicy is not explicitly set, so the default is always. During the joint inference task, if an issue arises, such as EdgeMesh being improperly configured, preventing the edge from accessing the cloud's port 5000 for cloud-based inference, the edge pod will continuously restart. In the FederatedLearningJob, the RestartPolicy is set to OnFailure.
K8S Informer Mechanism
Kubernetes uses an Informer instead of a Controller to access the API Server. All Controller operations interact with the Informer, which does not access the API Server each time. Informer uses a ListAndWatch mechanism. When the Informer starts for the first time, it calls the LIST API to retrieve the latest versions of all resource objects and then uses the WATCH API to monitor changes to these objects. This event information is maintained in a read-only cache queue to improve query efficiency and reduce the load on the API Server.
According to the diagram, let's explain the roles of some components in the Informer:
Controller: The core of the Informer, it can create reflectors and control the processLoop. The processLoop pops data from the DeltaFIFO queue, first calls the Indexer for caching and indexing, and then distributes it to the processor for handling.
Reflector: The Informer does not directly access the k8s API server but uses a Reflector to do so. The Reflector monitors specific Kubernetes resources through ListAndWatch. When a resource changes, such as an Added event, the resource object is stored in the local DeltaFIFO cache.
DeltaFIFO: A first-in-first-out cache queue used to store various events returned by the Watch API, such as Added, Updated, and Deleted.
LocalStore: The cache of the Informer, which stores objects from the API server (some of which may still be in DeltaFIFO). Users query the cache directly, reducing the API server's load. LocalStore is accessed by the Lister's List/Get methods.
WorkQueue: After DeltaFIFO receives an event, it stores the event in its data structure, directly operates the data stored in the Store, updates the Store, and then pops the event into the WorkQueue. The Controller triggers the corresponding callback function based on the event type received in the WorkQueue.
Informer first lists/watches the API server. The Reflector handles the connection, listing all instances of the resource and monitoring changes through the watch method. If the connection is interrupted, it resumes from the latest resourceVersion. When instances are created, deleted, or updated, the Reflector receives event notifications, which are placed into DeltaFIFO.
Informer reads these deltas from DeltaFIFO, determines the event type, and updates the local cache accordingly.
For Added events, the Informer saves the object to the local cache via the Indexer and creates an index. For Deleted events, it removes the object from the cache.
DeltaFIFO then pops the event into the Controller, which calls the ResourceEventHandler callback for handling.
ResourceEventHandler filters and places the relevant objects into the WorkQueue.
The Controller retrieves objects from the WorkQueue, launches a worker, and executes business logic. This typically involves comparing the cluster's current state with the desired state and instructing the API server to make adjustments, like creating new pods for a deployment.
Workers can use the Lister to get resources from the cache, avoiding frequent access to the API server.
There are three types of ResourceEventHandler functions in Informer:
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
The logic for these three functions is user-defined. After registering the ResourceEventHandler during controller initialization, the corresponding ResourceEventHandler will be triggered whenever an instance of the object is created, deleted, or updated.
Taking jointinferenceservice.go as an example, the New() function creates a new JointInferenceService controller to keep the relevant pods in sync with the corresponding JointInferenceService objects. In the New() function, the informer is initialized.
podInformer := cc.KubeInformerFactory.Core().V1().Pods()
serviceInformer := cc.SednaInformerFactory.Sedna().V1alpha1().JointInferenceServices()
The service informer uses a custom handler:
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
jc.enqueueController(obj, true)
jc.syncToEdge(watch.Added, obj)
},
UpdateFunc: func(old, cur interface{}) {
jc.enqueueController(cur, true)
jc.syncToEdge(watch.Added, cur)
},
DeleteFunc: func(obj interface{}) {
jc.enqueueController(obj, true)
jc.syncToEdge(watch.Deleted, obj)
},
})
The pod informer uses a custom handler:
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.addPod,
UpdateFunc: jc.updatePod,
DeleteFunc: jc.deletePod,
})
The EventHandler functions (addPod, updatePod, deletePod) here simply add the relevant objects to the queue without performing any other processing.
podInformer.Lister() creates a Lister() to retrieve Pod resources, and podInformer.Informer().HasSynced checks whether the Informer's cache has been synchronized.
jc.serviceLister = serviceInformer.Lister()
jc.serviceStoreSynced = serviceInformer.Informer().HasSynced
//...
jc.podStore = podInformer.Lister()
jc.podStoreSynced = podInformer.Informer().HasSynced
The process of synchronizing resources from the API server is carried out by the jointinferenceservice controller in the Run() function. The Run() function starts the main goroutine responsible for the watch and sync services.
if !cache.WaitForNamedCacheSync(Name, stopCh, c.podStoreSynced, c.serviceStoreSynced) {
klog.Errorf("failed to wait for %s caches to sync", Name)
return
}
After starting the Informer, it waits for the local cache to sync before launching the workers. When a change event is received, the changed object is retrieved from the event, an object key (in the form of namespace/name) is generated, and the key is placed into the worker queue. The worker() function then calls the c.processNextWorkItem() function.
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
The processNextWorkItem function retrieves the key from the worker queue and calls sync(). In the sync() function, it fetches the actual object from the local cache using the lister and performs the relevant synchronization operations.
Listen for delete events. When the Informer detects a delete event with OwnerReference set to FederatedLearning, it triggers the DeletePod function, which reconstructs the pod. The newly created pod will be almost identical to the original pod, retaining its configuration and specifications, but its resource version and UID will be reset to allow for regeneration.
FederatedLearningJob.c.recreatedPods.Load(pod.Name) to verify if this pod has already been recreated. If it has been recreated, do not attempt to recreate it again.pod.DeepCopy() to create a deep copy of the pod.ResourceVersion and UID) and status fields.c.kubeClient.CoreV1().Pods(pod.Namespace).Create. // Create a deep copy of the old pod
newPod := pod.DeepCopy()
// Reset the resource version and UID as they are unique to each object
newPod.ResourceVersion = ""
newPod.UID = ""
// Clear the status
newPod.Status = v1.PodStatus{}
// Remove the deletion timestamp
newPod.DeletionTimestamp = nil
// Remove the deletion grace period seconds
newPod.DeletionGracePeriodSeconds = nil
_, err := c.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
if err != nil {
return
}
c.recreatedPods.Store(pod.Name, true) to mark that this pod has been recreated.c.recreatedPods.Delete(pod.Name)), allowing the reconstruction logic to be triggered again if the pod is deleted once more. // mark the pod as recreated
c.recreatedPods.Store(newPod.Name, true)
// set a timer to delete the record from the map after a while
go func() {
time.Sleep(5 * time.Second)
c.recreatedPods.Delete(pod.Name)
}()
We add a sync.Map type field called recreatedPods to the Controller's data structure to prevent a pod from being recreated multiple times during a single delete event. When a pod is manually deleted and successfully recreated, its name is added to recreatedPods. If the deletePod function is called again during the same delete event, the pod name will already exist in recreatedPods, preventing the manually deleted pod from being deleted and recreated again. A timer is also set to clear the mark after 5 seconds, allowing the reconstruction logic to be triggered again if the pod is manually deleted later.
type Controller struct{
//...
preventRecreation bool
//...
}
Inference tasks are stateless workloads, so Kubernetes' native component Deployment can be leveraged for pod self-healing. A complete process for monitoring and handling resource changes needs to be built. The Deployment Informer can replace the Pod Informer to manage worker control.
Deployment Informer resource from the informer factory.addDeployment, updateDeployment, deleteDeployment) in the informer.Deployment resource in the cluster changes, the Informer will trigger the corresponding event handler.In addition, to ensure that the deployment resource is not accidentally deleted, we adopt the same method used in Federated Learning for recreating pods to recreate the deployment.
Flowchart for updating pod by modifying the CRD in the Federated Learning controller:
Listen for update events: When the Informer detects an update event related to the FederatedLearningJob, if there are changes in the CRD, the original pod is deleted, and a new pod is created based on the updated parameters in the CRD.
The updateJob function is triggered, and it first checks whether an update is necessary.
old and cur cannot be converted into *sednav1.FederatedLearningJob objects, it returns immediately.ResourceVersion of oldJob and curJob are the same, it indicates there are no changes, so the function returns without processing the update.preventRecreation flag to true to prevent Pod recreation during the update process.oldJob and curJob.old.Generation and cur.Generation fields. The CRD has a Generation field within its Spec. This field is automatically generated and increments by 1 every time the CRD object is created or modified, starting at 1 during creation. The Generation field only changes when the Spec is modified, not when the Status is updated. Therefore, this field can be used to determine if a change occurred in the Spec. If they are not equal, it means that the parameters of the FederatedLearningJob have changed.curJob.Spec to recreate the AggWorker and TrainWorker.preventRecreation flag to false to allow future Pod self-healing without interference.Flowchart for updating pod by modifying the CRD in the Joint Inference controller:
When an update event is detected, if the Informer detects an update to the JointInferenceService, and there are changes to the CRD, the deployment will be updated according to the new parameters in the CRD.
The logic for updating pods by modifying the CRD in Joint Inference is largely the same as in Federated Learning.
Relevant fields are compared, and if they are not equal, it indicates that the parameters of the JointInferenceService have changed. The cloudWorker and edgeWorker are updated using the new curService.Spec.
Unit tests are used instead of end-to-end tests, focusing on testing the two modified functions: deletePod() and updateJob().
Test_deletePod()fake.NewSimpleClientset().fakeclient.fakeclient is registered with it.controller.deletePod() function, and fakeClient.CoreV1().Pods("default").Get(context.TODO(), "test-pod", metav1.GetOptions{}) is used to check if the pod has been recreated; if it has not been recreated, the test fails.controller.deletePod() to delete a non-existent pod.Test_updateJob()batch_size in TrainingWorker from 32 to 16).updateJob function to update the old job to the new job, simulating the job update process in a real environment.Test_UpdateService()sendToEdgeFunc is set to an empty function (not performing actual edge communication).value1 to value2, while also incrementing the service's Generation value.updateService function to trigger an update to the joint inference service.value1 to value2, ensuring that the service update logic executes correctly.