Browse Source

Add globalmanager

Signed-off-by: llhuii <liulinghui@huawei.com>
tags/v0.1.0
llhuii 4 years ago
parent
commit
2236187005
14 changed files with 1877 additions and 0 deletions
  1. +194
    -0
      pkg/globalmanager/common.go
  2. +99
    -0
      pkg/globalmanager/config/config.go
  3. +51
    -0
      pkg/globalmanager/controller.go
  4. +177
    -0
      pkg/globalmanager/downstream.go
  5. +647
    -0
      pkg/globalmanager/jointinferenceservice.go
  6. +108
    -0
      pkg/globalmanager/messagelayer/context.go
  7. +17
    -0
      pkg/globalmanager/messagelayer/model/model.go
  8. +170
    -0
      pkg/globalmanager/messagelayer/ws/context.go
  9. +98
    -0
      pkg/globalmanager/messagelayer/ws/server.go
  10. +28
    -0
      pkg/globalmanager/types.go
  11. +233
    -0
      pkg/globalmanager/upstream.go
  12. +19
    -0
      pkg/globalmanager/utils/crdclient.go
  13. +16
    -0
      pkg/globalmanager/utils/kubeclient.go
  14. +20
    -0
      pkg/globalmanager/utils/kubeconfig.go

+ 194
- 0
pkg/globalmanager/common.go View File

@@ -0,0 +1,194 @@
package globalmanager

import (
"context"
"fmt"
"math"
"strings"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

const (
// DefaultBackOff is the default backoff period
DefaultBackOff = 10 * time.Second
// MaxBackOff is the max backoff period
MaxBackOff = 360 * time.Second
statusUpdateRetries = 3
// setting some prefix for container path, include data and code prefix
codePrefix = "/home/work"
dataPrefix = "/home/data"
bigModelPort int32 = 5000
)

// CreateVolumeMap creates volumeMap for container
// return volumeMounts and volumes for stage of creating pod
func CreateVolumeMap(containerPara *ContainerPara) ([]v1.VolumeMount, []v1.Volume) {
var volumeMounts []v1.VolumeMount
var volumes []v1.Volume
volumetype := v1.HostPathDirectory
for i, v := range containerPara.volumeMountList {
tempVolumeMount := v1.VolumeMount{
MountPath: v,
Name: containerPara.volumeMapName[i],
}
volumeMounts = append(volumeMounts, tempVolumeMount)
}
for i, v := range containerPara.volumeList {
tempVolume := v1.Volume{
Name: containerPara.volumeMapName[i],
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: v,
Type: &volumetype,
},
},
}
volumes = append(volumes, tempVolume)
}
return volumeMounts, volumes
}

// CreateEnvVars creates EnvMap for container
// include EnvName and EnvValue map for stage of creating a pod
func CreateEnvVars(envMap map[string]string) []v1.EnvVar {
var envVars []v1.EnvVar
for envName, envValue := range envMap {
Env := v1.EnvVar{
Name: envName,
Value: envValue,
}
envVars = append(envVars, Env)
}
return envVars
}

func MatchContainerBaseImage(imageHub map[string]string, frameName string, frameVersion string) (string, error) {
inputImageName := frameName + ":" + frameVersion
for imageName, imageURL := range imageHub {
if inputImageName == imageName {
return imageURL, nil
}
}
return "", fmt.Errorf("image %v not exists in imagehub", inputImageName)
}

func GetNodeIPByName(kubeClient kubernetes.Interface, name string) (string, error) {
n, err := kubeClient.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return "", err
}

typeToAddress := make(map[v1.NodeAddressType]string)
for _, addr := range n.Status.Addresses {
typeToAddress[addr.Type] = addr.Address
}
address, found := typeToAddress[v1.NodeExternalIP]
if found {
return address, nil
}

address, found = typeToAddress[v1.NodeInternalIP]
if found {
return address, nil
}
return "", fmt.Errorf("can't found node ip for node %s", name)
}

func GenerateLabels(object CommonInterface) map[string]string {
kind := object.GroupVersionKind().Kind
group := object.GroupVersionKind().Group
name := object.GetName()
key := strings.ToLower(kind) + "." + group + "/name"
labels := make(map[string]string)
labels[key] = name
return labels
}

func GenerateSelector(object CommonInterface) (labels.Selector, error) {
ls := &metav1.LabelSelector{
MatchLabels: GenerateLabels(object),
}
return metav1.LabelSelectorAsSelector(ls)
}

func CreateKubernetesService(kubeClient kubernetes.Interface, object CommonInterface, inputPort int32, inputIP string) (int32, error) {
ctx := context.Background()
name := object.GetName()
namespace := object.GetNamespace()
kind := object.GroupVersionKind().Kind
targePort := intstr.IntOrString{
IntVal: inputPort,
}
serviceSpec := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: object.GetNamespace(),
GenerateName: name + "-" + "service" + "-",
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(object, object.GroupVersionKind()),
},
Labels: GenerateLabels(object),
},
Spec: v1.ServiceSpec{
Selector: GenerateLabels(object),
ExternalIPs: []string{
inputIP,
},
Type: v1.ServiceTypeNodePort,
Ports: []v1.ServicePort{
{
Port: inputPort,
TargetPort: targePort,
},
},
},
}
service, err := kubeClient.CoreV1().Services(namespace).Create(ctx, serviceSpec, metav1.CreateOptions{})
if err != nil {
klog.Warningf("failed to create service for %v %v/%v, err:%s", kind, namespace, name, err)
return 0, err
}

klog.V(2).Infof("Service %s is created successfully for %v %v/%v", service.Name, kind, namespace, name)
return service.Spec.Ports[0].NodePort, nil
}

// getBackoff calc the next wait time for the key
func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
exp := queue.NumRequeues(key)

if exp <= 0 {
return time.Duration(0)
}

// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(DefaultBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1))
if backoff > math.MaxInt64 {
return MaxBackOff
}

calculated := time.Duration(backoff)
if calculated > MaxBackOff {
return MaxBackOff
}
return calculated
}

func calcActivePodCount(pods []*v1.Pod) int32 {
var result int32 = 0
for _, p := range pods {
if v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil {
result++
}
}
return result
}

+ 99
- 0
pkg/globalmanager/config/config.go View File

@@ -0,0 +1,99 @@
package config

import (
"io/ioutil"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

"github.com/edgeai-neptune/neptune/pkg/util"
)

const (
DefaultKubeConfig = ""
DefaultNamespace = v1.NamespaceAll
DefaultWebsocketAddress = "0.0.0.0"
DefaultWebsocketPort = 9000
DefaultLCServer = "http://localhost:9100"
)

// ControllerConfig indicates the config of controller
type ControllerConfig struct {
// KubeAPIConfig indicates the kubernetes cluster info which controller will connected
KubeConfig string `json:"kubeConfig,omitempty"`

// Master indicates the address of the Kubernetes API server. Overrides any value in KubeConfig.
// such as https://127.0.0.1:8443
// default ""
Master string `json:"master"`
// Namespace indicates which namespace the controller listening to.
// default ""
Namespace string `json:"namespace,omitempty"`
// ImageHub indicates the image which the framework/version mapping to
// +Required
ImageHub map[string]string `json:"imageHub,omitempty"`

// websocket server config
// Since the current limit of kubeedge(1.5), GM needs to build the websocket channel for communicating between GM and LCs.
WebSocket WebSocket `json:"websocket,omitempty"`

// lc config to info the worker
LC LCConfig `json:"localController,omitempty"`
}

type WebSocket struct {
// default DefaultWebsocketAddress
Address string `json:"address,omitempty"`
// default DefaultWebsocketPort
Port int64 `json:"port,omitempty"`
}

type LCConfig struct {
// default DefaultLCServer
Server string `json:"server"`
}

func (c *ControllerConfig) Parse(filename string) error {
data, err := ioutil.ReadFile(filename)
if err != nil {
klog.Errorf("Failed to read configfile %s: %v", filename, err)
return err
}
err = yaml.Unmarshal(data, c)
if err != nil {
klog.Errorf("Failed to unmarshal configfile %s: %v", filename, err)
return err
}
return nil
}

func (c *ControllerConfig) Validate() field.ErrorList {
allErrs := field.ErrorList{}
if c.KubeConfig != "" && !util.FileIsExist(c.KubeConfig) {
allErrs = append(allErrs, field.Invalid(field.NewPath("kubeconfig"), c.KubeConfig, "kubeconfig not exist"))
}
return allErrs
}

func NewDefaultControllerConfig() *ControllerConfig {
return &ControllerConfig{
KubeConfig: DefaultKubeConfig,
Master: "",
Namespace: DefaultNamespace,
WebSocket: WebSocket{
Address: DefaultWebsocketAddress,
Port: DefaultWebsocketPort,
},
LC: LCConfig{
Server: DefaultLCServer,
},
}
}

var Config ControllerConfig

func InitConfigure(cc *ControllerConfig) {
Config = *cc
}

+ 51
- 0
pkg/globalmanager/controller.go View File

@@ -0,0 +1,51 @@
package globalmanager

import (
"fmt"
"os"

"k8s.io/klog/v2"

"github.com/edgeai-neptune/neptune/pkg/globalmanager/config"
websocket "github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer/ws"
)

// NeptuneController
type NeptuneController struct {
Config *config.ControllerConfig
}

func NewController(cc *config.ControllerConfig) *NeptuneController {
config.InitConfigure(cc)
return &NeptuneController{
Config: cc,
}
}

// Start controller
func (c *NeptuneController) Start() {
type newFunc func(cfg *config.ControllerConfig) (FeatureControllerI, error)

for _, featureFunc := range []newFunc{
NewUpstreamController,
NewDownstreamController,
NewJointController,
} {
f, _ := featureFunc(c.Config)
err := f.Start()
if err != nil {
klog.Warningf("failed to start controller %s: %+v", f.GetName(), err)
} else {
klog.Infof("started controller %s", f.GetName())
}
}

addr := fmt.Sprintf("%s:%d", c.Config.WebSocket.Address, c.Config.WebSocket.Port)

ws := websocket.NewServer(addr)
err := ws.ListenAndServe()
if err != nil {
klog.Fatalf("failed to listen websocket at %s", addr)
os.Exit(1)
}
}

+ 177
- 0
pkg/globalmanager/downstream.go View File

@@ -0,0 +1,177 @@
package globalmanager

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

neptunev1 "github.com/edgeai-neptune/neptune/pkg/apis/neptune/v1alpha1"
clientset "github.com/edgeai-neptune/neptune/pkg/client/clientset/versioned/typed/neptune/v1alpha1"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/config"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/utils"
)

// DownstreamController watch kubernetes api server and send the controller resource change to edge
type DownstreamController struct {
// events from watch kubernetes api server
events chan watch.Event

cfg *config.ControllerConfig

client *clientset.NeptuneV1alpha1Client
messageLayer messagelayer.MessageLayer
}

// syncDataset syncs the dataset resources
func (dc *DownstreamController) syncDataset(eventType watch.EventType, dataset *neptunev1.Dataset) error {
// Here only propagate to the nodes with non empty name
nodeName := dataset.Spec.NodeName
if len(nodeName) == 0 {
return fmt.Errorf("empty node name")
}

return dc.messageLayer.SendResourceObject(nodeName, eventType, dataset)
}

// syncJointInferenceService syncs the joint-inference-service resources
func (dc *DownstreamController) syncJointInferenceService(eventType watch.EventType, joint *neptunev1.JointInferenceService) error {
// Here only propagate to the nodes with non empty name
nodeName := joint.Spec.EdgeWorker.NodeName
if len(nodeName) == 0 {
return fmt.Errorf("empty node name")
}

return dc.messageLayer.SendResourceObject(nodeName, eventType, joint)
}

// sync defines the entrypoint of syncing all resources
func (dc *DownstreamController) sync(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
klog.Info("Stop controller downstream loop")
return

case e := <-dc.events:

var err error
var kind, namespace, name string
switch t := e.Object.(type) {
case (*neptunev1.Dataset):
// Since t.Kind may be empty,
// we need to fix the kind here if missing.
// more details at https://github.com/kubernetes/kubernetes/issues/3030
if len(t.Kind) == 0 {
t.Kind = "Dataset"
}
kind = t.Kind
namespace = t.Namespace
name = t.Name
err = dc.syncDataset(e.Type, t)

case (*neptunev1.JointInferenceService):
// TODO: find a good way to avoid these duplicate codes
if len(t.Kind) == 0 {
t.Kind = "JointInferenceService"
}
kind = t.Kind
namespace = t.Namespace
name = t.Name
err = dc.syncJointInferenceService(e.Type, t)

default:
klog.Warningf("object type: %T unsupported", e)
continue
}

if err != nil {
klog.Warningf("Error to sync %s(%s/%s), err: %+v", kind, namespace, name, err)
} else {
klog.V(2).Infof("synced %s(%s/%s)", kind, namespace, name)
}
}
}
}

// watch function watches the crd resources which should by synced to nodes
func (dc *DownstreamController) watch(stopCh <-chan struct{}) {
rh := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eventObj := obj.(runtime.Object)
dc.events <- watch.Event{Type: watch.Added, Object: eventObj}
},
UpdateFunc: func(old, cur interface{}) {
// Since we don't support the spec update operation currently,
// so only status updates arrive here and NO propagation to edge.

// Update:
// We sync it to edge when using self-built websocket, and
// this sync isn't needed when we switch out self-built websocket.
dc.events <- watch.Event{Type: watch.Added, Object: cur.(runtime.Object)}
},
DeleteFunc: func(obj interface{}) {
eventObj := obj.(runtime.Object)
dc.events <- watch.Event{Type: watch.Deleted, Object: eventObj}
},
}

client := dc.client.RESTClient()

// make this option configurable
resyncPeriod := time.Second * 60
namespace := dc.cfg.Namespace

for resourceName, object := range map[string]runtime.Object{
"datasets": &neptunev1.Dataset{},
"jointinferenceservices": &neptunev1.JointInferenceService{},
} {
lw := cache.NewListWatchFromClient(client, resourceName, namespace, fields.Everything())
si := cache.NewSharedInformer(lw, object, resyncPeriod)
si.AddEventHandler(rh)
go si.Run(stopCh)
}
}

// Start starts the controller
func (dc *DownstreamController) Start() error {
stopCh := dc.messageLayer.Done()

// watch is an asynchronous call
dc.watch(stopCh)

// sync is a synchronous call
go dc.sync(stopCh)

return nil
}

func (dc *DownstreamController) GetName() string {
return "DownstreamController"
}

// NewDownstreamController creates a controller DownstreamController from config
func NewDownstreamController(cfg *config.ControllerConfig) (FeatureControllerI, error) {
// TODO: make bufferSize configurable
bufferSize := 10
events := make(chan watch.Event, bufferSize)

crdclient, err := utils.NewCRDClient()
if err != nil {
return nil, fmt.Errorf("create crd client failed with error: %w", err)
}

dc := &DownstreamController{
cfg: cfg,
events: events,
client: crdclient,
messageLayer: messagelayer.NewContextMessageLayer(),
}

return dc, nil
}

+ 647
- 0
pkg/globalmanager/jointinferenceservice.go View File

@@ -0,0 +1,647 @@
package globalmanager

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilrand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
k8scontroller "k8s.io/kubernetes/pkg/controller"

neptunev1 "github.com/edgeai-neptune/neptune/pkg/apis/neptune/v1alpha1"
clientset "github.com/edgeai-neptune/neptune/pkg/client/clientset/versioned"
neptuneclientset "github.com/edgeai-neptune/neptune/pkg/client/clientset/versioned/typed/neptune/v1alpha1"
informers "github.com/edgeai-neptune/neptune/pkg/client/informers/externalversions"
neptunev1listers "github.com/edgeai-neptune/neptune/pkg/client/listers/neptune/v1alpha1"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/config"
messageContext "github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer/ws"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/utils"
)

type JointInferenceType string

const (
JointInferenceForEdge JointInferenceType = "Edge"
JointInferenceForCloud JointInferenceType = "Cloud"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
var jointServiceControllerKind = neptunev1.SchemeGroupVersion.WithKind("JointInferenceService")

// JointInferenceServiceController ensures that all JointInferenceService objects
// have corresponding pods to run their configured workload.
type JointInferenceServiceController struct {
kubeClient kubernetes.Interface
client neptuneclientset.NeptuneV1alpha1Interface

// podStoreSynced returns true if the pod store has been synced at least once.
podStoreSynced cache.InformerSynced
// A store of pods
podStore corelisters.PodLister

// serviceStoreSynced returns true if the jointinferenceservice store has been synced at least once.
serviceStoreSynced cache.InformerSynced
// A store of service
serviceLister neptunev1listers.JointInferenceServiceLister

// JointInferenceServices that need to be updated
queue workqueue.RateLimitingInterface

recorder record.EventRecorder

cfg *config.ControllerConfig
}

// Run the main goroutine responsible for watching and syncing services.
func (jc *JointInferenceServiceController) Start() error {
workers := 1
stopCh := messageContext.Done()

go func() {
defer utilruntime.HandleCrash()
defer jc.queue.ShutDown()
klog.Infof("Starting joint inference service controller")
defer klog.Infof("Shutting down joint inference service controller")

if !cache.WaitForNamedCacheSync("jointinferenceservice", stopCh, jc.podStoreSynced, jc.serviceStoreSynced) {
klog.Errorf("failed to wait for joint inferce service caches to sync")

return
}

klog.Infof("Starting joint inference service workers")
for i := 0; i < workers; i++ {
go wait.Until(jc.worker, time.Second, stopCh)
}

<-stopCh
}()
return nil
}

// enqueueByPod enqueues the jointInferenceService object of the specified pod.
func (jc *JointInferenceServiceController) enqueueByPod(pod *v1.Pod, immediate bool) {
controllerRef := metav1.GetControllerOf(pod)

if controllerRef == nil {
return
}

if controllerRef.Kind != jointServiceControllerKind.Kind {
return
}

service, err := jc.serviceLister.JointInferenceServices(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}

if service.UID != controllerRef.UID {
return
}

jc.enqueueController(service, immediate)
}

// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (jc *JointInferenceServiceController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
jc.deletePod(pod)
return
}

// backoff to queue when PodFailed
immediate := pod.Status.Phase != v1.PodFailed

jc.enqueueByPod(pod, immediate)
}

// When a pod is updated, figure out what joint inference service manage it and wake them up.
func (jc *JointInferenceServiceController) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)

// no pod update, no queue
if curPod.ResourceVersion == oldPod.ResourceVersion {
return
}

jc.addPod(curPod)
}

// deletePod enqueues the jointinferenceservice obj When a pod is deleted
func (jc *JointInferenceServiceController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)

// comment from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/job/job_controller.go

// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new jointinferenceservice will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Warningf("couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
klog.Warningf("tombstone contained object that is not a pod %+v", obj)
return
}
}
jc.enqueueByPod(pod, true)
}

// obj could be an *neptunev1.JointInferenceService, or a DeletionFinalStateUnknown marker item,
// immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run.
func (jc *JointInferenceServiceController) enqueueController(obj interface{}, immediate bool) {
key, err := k8scontroller.KeyFunc(obj)
if err != nil {
klog.Warningf("Couldn't get key for object %+v: %v", obj, err)
return
}

backoff := time.Duration(0)
if !immediate {
backoff = getBackoff(jc.queue, key)
}
jc.queue.AddAfter(key, backoff)
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the sync is never invoked concurrently with the same key.
func (jc *JointInferenceServiceController) worker() {
for jc.processNextWorkItem() {
}
}

func (jc *JointInferenceServiceController) processNextWorkItem() bool {
key, quit := jc.queue.Get()
if quit {
return false
}
defer jc.queue.Done(key)

forget, err := jc.sync(key.(string))
if err == nil {
if forget {
jc.queue.Forget(key)
}
return true
}

klog.Warningf("Error syncing jointinference service: %v", err)
jc.queue.AddRateLimited(key)

return true
}

// sync will sync the jointinferenceservice with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (jc *JointInferenceServiceController) sync(key string) (bool, error) {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing jointinference service %q (%v)", key, time.Since(startTime))
}()

ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
}
if len(ns) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid jointinference service key %q: either namespace or name is missing", key)
}
sharedJointinferenceservice, err := jc.serviceLister.JointInferenceServices(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.V(4).Infof("JointInferenceService has been deleted: %v", key)
return true, nil
}
return false, err
}

jointinferenceservice := *sharedJointinferenceservice

// if jointinferenceservice was finished previously, we don't want to redo the termination
if IsJointinferenceserviceFinished(&jointinferenceservice) {
return true, nil
}

// set kind for jointinferenceservice in case that the kind is None
// more details at https://github.com/kubernetes/kubernetes/issues/3030
jointinferenceservice.SetGroupVersionKind(jointServiceControllerKind)

selector, _ := GenerateSelector(&jointinferenceservice)
pods, err := jc.podStore.Pods(jointinferenceservice.Namespace).List(selector)

if err != nil {
return false, err
}

klog.V(4).Infof("list jointinference service %v/%v, %v pods: %v", jointinferenceservice.Namespace, jointinferenceservice.Name, len(pods), pods)

latestConditionLen := len(jointinferenceservice.Status.Conditions)

active := calcActivePodCount(pods)
var failed int32 = 0
// neededCounts means that two pods should be created successfully in a jointinference service currently
// two pods consist of edge pod and cloud pod
var neededCounts int32 = 2
// jointinferenceservice first start
if jointinferenceservice.Status.StartTime == nil {
now := metav1.Now()
jointinferenceservice.Status.StartTime = &now
} else {
failed = neededCounts - active
}

var manageServiceErr error
serviceFailed := false

var latestConditionType neptunev1.JointInferenceServiceConditionType = ""

// get the latest condition type
// based on that condition updated is appended, not inserted.
jobConditions := jointinferenceservice.Status.Conditions
if len(jobConditions) > 0 {
latestConditionType = (jobConditions)[len(jobConditions)-1].Type
}

var newCondtionType neptunev1.JointInferenceServiceConditionType
var reason string
var message string

if failed > 0 {
serviceFailed = true
// TODO: get the failed worker, and knows that which worker fails, edge inference worker or cloud inference worker
reason = "workerFailed"
message = "the worker of Jointinferenceservice failed"
newCondtionType = neptunev1.JointInferenceServiceCondFailed
jc.recorder.Event(&jointinferenceservice, v1.EventTypeWarning, reason, message)
} else {
if len(pods) == 0 {
active, manageServiceErr = jc.createPod(&jointinferenceservice)
}
if manageServiceErr != nil {
serviceFailed = true
message = error.Error(manageServiceErr)
newCondtionType = neptunev1.JointInferenceServiceCondFailed
failed = neededCounts - active
} else {
// TODO: handle the case that the pod phase is PodSucceeded
newCondtionType = neptunev1.JointInferenceServiceCondRunning
}
}

//
if newCondtionType != latestConditionType {
jointinferenceservice.Status.Conditions = append(jointinferenceservice.Status.Conditions, NewJointInferenceServiceCondition(newCondtionType, reason, message))
}
forget := false

// no need to update the jointinferenceservice if the status hasn't changed since last time
if jointinferenceservice.Status.Active != active || jointinferenceservice.Status.Failed != failed || len(jointinferenceservice.Status.Conditions) != latestConditionLen {
jointinferenceservice.Status.Active = active
jointinferenceservice.Status.Failed = failed

if err := jc.updateStatus(&jointinferenceservice); err != nil {
return forget, err
}

if serviceFailed && !IsJointinferenceserviceFinished(&jointinferenceservice) {
// returning an error will re-enqueue jointinferenceservice after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for jointinference service key %q", key)
}

forget = true
}

return forget, manageServiceErr
}

func NewJointInferenceServiceCondition(conditionType neptunev1.JointInferenceServiceConditionType, reason, message string) neptunev1.JointInferenceServiceCondition {
return neptunev1.JointInferenceServiceCondition{
Type: conditionType,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}

func (jc *JointInferenceServiceController) updateStatus(jointinferenceservice *neptunev1.JointInferenceService) error {
serviceClient := jc.client.JointInferenceServices(jointinferenceservice.Namespace)
var err error
for i := 0; i <= statusUpdateRetries; i = i + 1 {
var newJointinferenceservice *neptunev1.JointInferenceService
newJointinferenceservice, err = serviceClient.Get(context.TODO(), jointinferenceservice.Name, metav1.GetOptions{})
if err != nil {
break
}
newJointinferenceservice.Status = jointinferenceservice.Status
if _, err = serviceClient.UpdateStatus(context.TODO(), newJointinferenceservice, metav1.UpdateOptions{}); err == nil {
break
}
}
return nil
}

func IsJointinferenceserviceFinished(j *neptunev1.JointInferenceService) bool {
for _, c := range j.Status.Conditions {
if (c.Type == neptunev1.JointInferenceServiceCondFailed) && c.Status == v1.ConditionTrue {
return true
}
}
return false
}

func (jc *JointInferenceServiceController) createPod(service *neptunev1.JointInferenceService) (active int32, err error) {
active = 0

// create pod for cloudPod
err = jc.createCloudPod(service)
if err != nil {
return active, err
}
active++

// create kubernetesService for cloudPod, and get bigServicePort for edgePod
var bigServicePort int32
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.NodeName)
bigServicePort, err = CreateKubernetesService(jc.kubeClient, service, bigModelPort, bigModelIP)
if err != nil {
return active, err
}

// create pod for edgePod
err = jc.createEdgePod(service, bigServicePort)
if err != nil {
return active, err
}
active++

return active, err
}

func (jc *JointInferenceServiceController) createCloudPod(service *neptunev1.JointInferenceService) error {
// deliver pod for cloudworker
ctx := context.Background()
var cloudModelPath string
cloudModelName := service.Spec.CloudWorker.Model.Name
cloudModel, err := jc.client.Models(service.Namespace).Get(ctx, cloudModelName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get cloud model %s: %w",
cloudModelName, err)
}

cloudModelPath = cloudModel.Spec.ModelURL

// convert crd to json, and put them into env of container
cloudModelJSON, _ := json.Marshal(cloudModel)
cloudModelString := string(cloudModelJSON)
cloudModelParent := filepath.Dir(cloudModelPath)

cloudWorker := service.Spec.CloudWorker
cloudCodePath := cloudWorker.WorkerSpec.ScriptDir
cloudParameterJSON, _ := json.Marshal(cloudWorker.WorkerSpec.Parameters)
cloudParameterString := string(cloudParameterJSON)

// Container VolumeMounts parameters
cloudCodeConPath := codePrefix
cloudModelConPath := dataPrefix + cloudModelParent

// Env parameters for cloud
cloudModelURL := dataPrefix + cloudModelPath

// Configure container mounting and Env information by initial ContainerPara
var cloudContainer *ContainerPara = new(ContainerPara)
cloudContainer.volumeMountList = []string{cloudCodeConPath, cloudModelConPath}
cloudContainer.volumeList = []string{cloudCodePath, cloudModelParent}
cloudContainer.volumeMapName = []string{"code", "model"}
cloudContainer.env = map[string]string{
"MODEL": cloudModelString,
"WORKER_NAME": "cloudworker-" + utilrand.String(5),
"SERVICE_NAME": service.Name,
"PARAMETERS": cloudParameterString,
"MODEL_URL": cloudModelURL,
"NAMESPACE": service.Namespace,
"BIG_MODEL_BIND_PORT": strconv.Itoa(int(bigModelPort)),
}

// create cloud pod
err = jc.generatedPod(service, JointInferenceForCloud, cloudContainer, false)
if err != nil {
return err
}
return nil
}

func (jc *JointInferenceServiceController) createEdgePod(service *neptunev1.JointInferenceService, bigServicePort int32) error {
// deliver pod for edgeworker
ctx := context.Background()
edgeModelName := service.Spec.EdgeWorker.Model.Name
edgeModel, err := jc.client.Models(service.Namespace).Get(ctx, edgeModelName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get edge model %s: %w",
edgeModelName, err)
}
edgeModelPath := edgeModel.Spec.ModelURL

// get bigModelIP from nodeName in cloudWorker
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.NodeName)

// convert crd to json, and put them into env of container
edgeModelJSON, _ := json.Marshal(edgeModel)
edgeModelString := string(edgeModelJSON)
edgeModelParent := filepath.Dir(edgeModelPath)

edgeWorker := service.Spec.EdgeWorker
edgeCodePath := edgeWorker.WorkerSpec.ScriptDir
edgeParameterJSON, _ := json.Marshal(edgeWorker.WorkerSpec.Parameters)
edgeParameterString := string(edgeParameterJSON)
HSAParameterJSON, _ := json.Marshal(edgeWorker.HardExampleAlgorithm.Parameters)
HSAParameterString := string(HSAParameterJSON)

// Container VolumeMounts parameters
edgeCodeConPath := codePrefix
edgeModelConPath := dataPrefix + edgeModelParent

// Env parameters for edge
edgeModelURL := dataPrefix + edgeModelPath

// Configure container mounting and Env information by initial ContainerPara
var edgeContainer *ContainerPara = new(ContainerPara)
edgeContainer.volumeMountList = []string{edgeCodeConPath, edgeModelConPath}
edgeContainer.volumeList = []string{edgeCodePath, edgeModelParent}
edgeContainer.volumeMapName = []string{"code", "model"}
edgeContainer.env = map[string]string{
"MODEL": edgeModelString,
"WORKER_NAME": "edgeworker-" + utilrand.String(5),
"SERVICE_NAME": service.Name,
"BIG_MODEL_IP": bigModelIP,
"BIG_MODEL_PORT": strconv.Itoa(int(bigServicePort)),
"PARAMETERS": edgeParameterString,
"HSA_PARAMETERS": HSAParameterString,
"MODEL_URL": edgeModelURL,
"NAMESPACE": service.Namespace,
"HARD_SAMPLE_ALGORITHM": edgeWorker.HardExampleAlgorithm.Name,
"LC_SERVER": jc.cfg.LC.Server,
}

// create edge pod
err = jc.generatedPod(service, JointInferenceForEdge, edgeContainer, true)
if err != nil {
return err
}
return nil
}

func (jc *JointInferenceServiceController) generatedPod(service *neptunev1.JointInferenceService, podtype JointInferenceType,
containerPara *ContainerPara, hostNetwork bool) error {
var workerSpec neptunev1.CommonWorkerSpec
var volumeMounts []v1.VolumeMount
var volumes []v1.Volume
var envs []v1.EnvVar
var nodeName string
ctx := context.Background()
if podtype == JointInferenceForEdge {
workerSpec = service.Spec.EdgeWorker.WorkerSpec
nodeName = service.Spec.EdgeWorker.NodeName
} else {
workerSpec = service.Spec.CloudWorker.WorkerSpec
nodeName = service.Spec.CloudWorker.NodeName
}
// get baseImgURL from imageHub based on user's configuration in job CRD
frameName := workerSpec.FrameworkType
frameVersion := workerSpec.FrameworkVersion
baseImgURL, err := MatchContainerBaseImage(jc.cfg.ImageHub, frameName, frameVersion)
// TODO: if matched image is empty, the pod creation process will not proceed, return error directly.
if err != nil {
klog.Warningf("jointinference service %v/%v %v worker matching container base image occurs error:%v", service.Namespace, service.Name, podtype, err)
return fmt.Errorf("%s pod occurs error: %w",
podtype, err)
}
volumeMounts, volumes = CreateVolumeMap(containerPara)
envs = CreateEnvVars(containerPara.env)
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: service.Namespace,
GenerateName: service.Name + "-" + strings.ToLower(string(podtype)) + "-",
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(service, jointServiceControllerKind),
},
Labels: GenerateLabels(service),
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
NodeName: nodeName,
Containers: []v1.Container{
{Name: "container-" + service.Name + "-" + strings.ToLower(string(podtype)) + "-" + utilrand.String(5),
Image: baseImgURL,
Args: []string{workerSpec.ScriptBootFile},
Env: envs,
VolumeMounts: volumeMounts,
}},
Volumes: volumes,
HostNetwork: hostNetwork,
},
}
pod, err := jc.kubeClient.CoreV1().Pods(service.Namespace).Create(ctx, podSpec, metav1.CreateOptions{})
if err != nil {
klog.Warningf("failed to create %s pod %s for jointinference service %v/%v, err:%s", string(podtype), pod.Name, service.Namespace, service.Name, err)
return err
}
klog.V(2).Infof("%s pod %s is created successfully for jointinference service %v/%v", string(podtype), pod.Name, service.Namespace, service.Name)
return nil
}

func (jc *JointInferenceServiceController) GetName() string {
return "JointInferenceServiceController"
}

// NewJointController creates a new JointInferenceService controller that keeps the relevant pods
// in sync with their corresponding JointInferenceService objects.
func NewJointController(cfg *config.ControllerConfig) (FeatureControllerI, error) {
namespace := cfg.Namespace
if namespace == "" {
namespace = metav1.NamespaceAll
}

kubeClient, err := utils.KubeClient()
kubecfg, _ := utils.KubeConfig()
crdclient, err := clientset.NewForConfig(kubecfg)
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, time.Second*30, kubeinformers.WithNamespace(namespace))

podInformer := kubeInformerFactory.Core().V1().Pods()

serviceInformerFactory := informers.NewSharedInformerFactoryWithOptions(crdclient, time.Second*30, informers.WithNamespace(namespace))
serviceInformer := serviceInformerFactory.Neptune().V1alpha1().JointInferenceServices()

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

jc := &JointInferenceServiceController{
kubeClient: kubeClient,
client: crdclient.NeptuneV1alpha1(),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultBackOff, MaxBackOff), "jointinferenceservice"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "jointinferenceservice-controller"}),
cfg: cfg,
}

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
jc.enqueueController(obj, true)
},

UpdateFunc: func(old, cur interface{}) {
jc.enqueueController(cur, true)
},

DeleteFunc: func(obj interface{}) {
jc.enqueueController(obj, true)
},
})

jc.serviceLister = serviceInformer.Lister()
jc.serviceStoreSynced = serviceInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.addPod,
UpdateFunc: jc.updatePod,
DeleteFunc: jc.deletePod,
})

jc.podStore = podInformer.Lister()
jc.podStoreSynced = podInformer.Informer().HasSynced

stopCh := messageContext.Done()
kubeInformerFactory.Start(stopCh)
serviceInformerFactory.Start(stopCh)
return jc, err
}

+ 108
- 0
pkg/globalmanager/messagelayer/context.go View File

@@ -0,0 +1,108 @@
package messagelayer

import (
"encoding/json"
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"

"github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer/model"
wsContext "github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer/ws"
)

// MessageLayer define all functions that message layer must implement
type MessageLayer interface {
SendResourceObject(nodeName string, eventType watch.EventType, obj interface{}) error
ReceiveResourceUpdate() (*ResourceUpdateSpec, error)
Done() <-chan struct{}
}

// ContextMessageLayer build on context
type ContextMessageLayer struct {
}

type ResourceUpdateSpec struct {
Kind string
Namespace string
Name string
Operation string
Content []byte
}

// SendResourceObject message to the node with resource object and event type
func (cml *ContextMessageLayer) SendResourceObject(nodeName string, eventType watch.EventType, obj interface{}) error {
var operation string
switch eventType {
case watch.Added:
operation = "insert"
case watch.Modified:
operation = "update"
case watch.Deleted:
operation = "delete"
default:
// should never get here
return fmt.Errorf("event type: %s unsupported", eventType)
}

var msg model.Message
payload, _ := json.Marshal(obj)
msg.Content = (payload)

// For code simplicity not to duplicate the code,
// here just unmarshal to unifying struct type.
var om metav1.PartialObjectMetadata
err := json.Unmarshal(payload, &om)
if err != nil {
// impossible here, just for in case
return fmt.Errorf("Unmarshal error for %v, err: %w", obj, err)
}

namespace := om.Namespace
kind := strings.ToLower(om.Kind)
name := om.Name

msg.Namespace = namespace
msg.ResourceKind = kind
msg.ResourceName = name
msg.Operation = operation

klog.V(2).Infof("sending %s %s/%s to node(%s)", kind, namespace, name, nodeName)
klog.V(4).Infof("sending %s %s/%s to node(%s), msg:%+v", kind, namespace, name, nodeName, msg)
// TODO: may need to guarantee message send to node
return wsContext.SendToEdge(nodeName, &msg)
}

func (cml *ContextMessageLayer) ReceiveResourceUpdate() (*ResourceUpdateSpec, error) {
nodeName, msg, err := wsContext.ReceiveFromEdge()
if err != nil {
return nil, err
}

klog.V(4).Infof("get message from nodeName %s:%s", nodeName, msg)
namespace := msg.Namespace
kind := strings.ToLower(msg.ResourceKind)
name := msg.ResourceName
operation := msg.Operation
content := msg.Content

return &ResourceUpdateSpec{
Kind: kind,
Namespace: namespace,
Name: name,
Operation: operation,
Content: content,
}, nil
}

// Done signals the message layer is done
func (cml *ContextMessageLayer) Done() <-chan struct{} {
return wsContext.Done()
}

// NewContextMessageLayer create a ContextMessageLayer
func NewContextMessageLayer() MessageLayer {
return &ContextMessageLayer{}
}

+ 17
- 0
pkg/globalmanager/messagelayer/model/model.go View File

@@ -0,0 +1,17 @@
package model

type MessageHeader struct {
Namespace string `json:"namespace"`

ResourceKind string `json:"resourceKind"`

ResourceName string `json:"resourceName"`

Operation string `json:"operation"`
}

// Message struct
type Message struct {
MessageHeader `json:"header"`
Content []byte `json:"content"`
}

+ 170
- 0
pkg/globalmanager/messagelayer/ws/context.go View File

@@ -0,0 +1,170 @@
package ws

import (
gocontext "context"
"fmt"
"strings"
"sync"

"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer/model"
)

type nodeMessage struct {
nodeName string
msg model.Message
}

// ChannelContext is object for Context channel
type ChannelContext struct {
ctx gocontext.Context
cancel gocontext.CancelFunc

upstreamChannel chan nodeMessage
//downstreamChannel chan nodeMessage

// downstream map
// nodeName => queue
nodeQueue sync.Map
nodeStore sync.Map
}

var (
// singleton
context *ChannelContext
)

func init() {
context = NewChannelContext()
}

func NewChannelContext() *ChannelContext {
upstreamSize := 1000
upstreamChannel := make(chan nodeMessage, upstreamSize)
//downstreamChannel := make(chan nodeMessage, bufferSize)

ctx, cancel := gocontext.WithCancel(gocontext.Background())
return &ChannelContext{
upstreamChannel: upstreamChannel,
// downstreamChannel: downstreamChannel,
// nodeMessageChannelMap: make(map[string]chan model.Message),
ctx: ctx,
cancel: cancel,
}
}

func getMsgKey(obj interface{}) (string, error) {
msg := obj.(*model.Message)

kind := msg.ResourceKind
namespace := msg.Namespace
name := msg.ResourceName
return strings.Join([]string{kind, namespace, name}, "/"), nil
}

func GetNodeQueue(nodeName string) workqueue.RateLimitingInterface {
q, ok := context.nodeQueue.Load(nodeName)
if !ok {
newQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), nodeName)
q, _ = context.nodeQueue.LoadOrStore(nodeName, newQ)
}
return q.(workqueue.RateLimitingInterface)
}

func GetNodeStore(nodeName string) cache.Store {
s, ok := context.nodeStore.Load(nodeName)
if !ok {
newS := cache.NewStore(getMsgKey)
s, _ = context.nodeStore.LoadOrStore(nodeName, newS)
}
return s.(cache.Store)
}

func SendToEdge(nodeName string, msg *model.Message) error {
q := GetNodeQueue(nodeName)
key, _ := getMsgKey(msg)
q.Add(key)

s := GetNodeStore(nodeName)
return s.Add(msg)
}

func ReceiveFromEdge() (nodeName string, msg model.Message, err error) {
nodeMsg := <-context.upstreamChannel
nodeName = nodeMsg.nodeName
msg = nodeMsg.msg
return
}

func SendToCloud(nodeName string, msg model.Message) error {
context.upstreamChannel <- nodeMessage{nodeName, msg}
return nil
}

func Done() <-chan struct{} {
return context.ctx.Done()
}

type ReadMsgFunc func() (model.Message, error)
type WriteMsgFunc func(model.Message) error

func AddNode(nodeName string, read ReadMsgFunc, write WriteMsgFunc, closeCh chan struct{}) {
GetNodeQueue(nodeName)
GetNodeStore(nodeName)

go func() {
// read loop
var msg model.Message
var err error
for {
msg, err = read()
if err != nil {
break
}
klog.V(4).Infof("received msg from %s: %+v", nodeName, msg)
_ = SendToCloud(nodeName, msg)
}
closeCh <- struct{}{}
klog.Errorf("read loop of node %s closed, due to: %+v", nodeName, err)
}()

go func() {
// write loop
q := GetNodeQueue(nodeName)
s := GetNodeStore(nodeName)
var err error
for {
key, shutdown := q.Get()
if shutdown {
err = fmt.Errorf("node queue for node %s shutdown", nodeName)
break
}
obj, exists, _ := s.GetByKey(key.(string))
if !exists {
klog.Warningf("key %s not exists in node store %s", key, nodeName)
q.Forget(key)
q.Done(key)
continue
}
msg := obj.(*model.Message)
err = write(*msg)
klog.V(4).Infof("writing msg to %s: %+v", nodeName, msg)
if err != nil {
klog.Warningf("failed to write key %s to node %s, requeue", key, nodeName)
q.AddRateLimited(key)
q.Forget(key)
q.Done(key)
break
}
klog.Infof("write key %s to node %s successfully", key, nodeName)
_ = s.Delete(msg)
q.Forget(key)
q.Done(key)
}
closeCh <- struct{}{}
klog.Errorf("write loop of node %s closed, due to: %+v", nodeName, err)
}()
}

+ 98
- 0
pkg/globalmanager/messagelayer/ws/server.go View File

@@ -0,0 +1,98 @@
package ws

import (
"net/http"

"k8s.io/klog/v2"

"github.com/gorilla/websocket"

"github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer/model"
)

// websocket protocol server
type Server struct {
server *http.Server
}

func NewServer(address string) *Server {
server := http.Server{
Addr: address,
}

wsServer := &Server{
server: &server,
}
http.HandleFunc("/", wsServer.ServeHTTP)
return wsServer
}

func (srv *Server) upgrade(w http.ResponseWriter, r *http.Request) *websocket.Conn {
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return nil
}
return conn
}

func (srv *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
nodeName := req.Header.Get("Node-Name")
wsConn := srv.upgrade(w, req)
if wsConn == nil {
klog.Errorf("failed to upgrade to websocket for node %s", nodeName)
return
}

// serve connection
nodeClient := &nodeClient{conn: wsConn, req: req}
go nodeClient.Serve()
}

func (srv *Server) ListenAndServe() error {
return srv.server.ListenAndServe()
}

func (srv *Server) Close() error {
if srv.server != nil {
return srv.server.Close()
}
return nil
}

type nodeClient struct {
conn *websocket.Conn
req *http.Request
nodeName string
}

func (nc *nodeClient) readOneMsg() (model.Message, error) {
var msg model.Message

err := nc.conn.ReadJSON(&msg)
if err != nil {
return msg, err
}

return msg, nil
}

func (nc *nodeClient) writeOneMsg(msg model.Message) error {
return nc.conn.WriteJSON(&msg)
}

func (nc *nodeClient) Serve() {
nodeName := nc.req.Header.Get("Node-Name")
nc.nodeName = nodeName
klog.Infof("established connection for node %s", nodeName)
// nc.conn.SetCloseHandler
closeCh := make(chan struct{}, 2)
AddNode(nodeName, nc.readOneMsg, nc.writeOneMsg, closeCh)
<-closeCh

klog.Infof("closed connection for node %s", nodeName)
_ = nc.conn.Close()
}

+ 28
- 0
pkg/globalmanager/types.go View File

@@ -0,0 +1,28 @@
package globalmanager

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// ContainerPara describes initial values need by creating a pod
type ContainerPara struct {
volumeMountList []string
volumeList []string
volumeMapName []string
env map[string]string
frameName string
frameVersion string
scriptBootFile string
nodeName string
}

type CommonInterface interface {
metav1.Object
schema.ObjectKind
}

type FeatureControllerI interface {
Start() error
GetName() string
}

+ 233
- 0
pkg/globalmanager/upstream.go View File

@@ -0,0 +1,233 @@
package globalmanager

import (
"context"
"encoding/json"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

neptunev1 "github.com/edgeai-neptune/neptune/pkg/apis/neptune/v1alpha1"
clientset "github.com/edgeai-neptune/neptune/pkg/client/clientset/versioned/typed/neptune/v1alpha1"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/config"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/messagelayer"
"github.com/edgeai-neptune/neptune/pkg/globalmanager/utils"
)

// updateHandler handles the updates from LC(running at edge) to update the
// corresponding resource
type updateHandler func(namespace, name, operation string, content []byte) error

// UpstreamController subscribes the updates from edge and syncs to k8s api server
type UpstreamController struct {
client *clientset.NeptuneV1alpha1Client
messageLayer messagelayer.MessageLayer
updateHandlers map[string]updateHandler
}

const upstreamStatusUpdateRetries = 3

// retryUpdateStatus simply retries to call the status update func
func retryUpdateStatus(name, namespace string, updateStatusFunc func() error) error {
var err error
for retry := 0; retry <= upstreamStatusUpdateRetries; retry++ {
err = updateStatusFunc()
if err == nil {
return nil
}
klog.Warningf("Error to update %s/%s status, retried %d times: %+v", namespace, name, retry, err)
}
return err
}

func newUnmarshalError(namespace, name, operation string, content []byte) error {
return fmt.Errorf("Unable to unmarshal content for (%s/%s) operation: '%s', content: '%+v'", namespace, name, operation, string(content))
}

// updateDatasetStatus updates the dataset status
func (uc *UpstreamController) updateDatasetStatus(name, namespace string, status neptunev1.DatasetStatus) error {
client := uc.client.Datasets(namespace)

if status.UpdateTime == nil {
now := metav1.Now()
status.UpdateTime = &now
}

return retryUpdateStatus(name, namespace, func() error {
dataset, err := client.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
dataset.Status = status
_, err = client.UpdateStatus(context.TODO(), dataset, metav1.UpdateOptions{})
return err
})
}

// updateDatasetFromEdge syncs update from edge
func (uc *UpstreamController) updateDatasetFromEdge(name, namespace, operation string, content []byte) error {
if operation != "status" {
return fmt.Errorf("unknown operation %s", operation)
}

status := neptunev1.DatasetStatus{}
err := json.Unmarshal(content, &status)
if err != nil {
return newUnmarshalError(namespace, name, operation, content)
}

return uc.updateDatasetStatus(name, namespace, status)
}

// convertToMetrics converts the metrics from LCs to resource metrics
func convertToMetrics(m map[string]interface{}) []neptunev1.Metric {
var l []neptunev1.Metric
for k, v := range m {
var displayValue string
switch t := v.(type) {
case string:
displayValue = t
default:
// ignore the json marshal error
b, _ := json.Marshal(v)
displayValue = string(b)
}

l = append(l, neptunev1.Metric{Key: k, Value: displayValue})
}
return l
}

func (uc *UpstreamController) updateJointInferenceMetrics(name, namespace string, metrics []neptunev1.Metric) error {
client := uc.client.JointInferenceServices(namespace)

return retryUpdateStatus(name, namespace, func() error {
joint, err := client.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
joint.Status.Metrics = metrics
_, err = client.UpdateStatus(context.TODO(), joint, metav1.UpdateOptions{})
return err
})
}

// updateJointInferenceFromEdge syncs the edge updates to k8s
func (uc *UpstreamController) updateJointInferenceFromEdge(name, namespace, operation string, content []byte) error {
// current only support the 'status' operation
if operation != "status" {
return fmt.Errorf("unknown operation %s", operation)
}

var err error

// Output defines owner output information
type Output struct {
ServiceInfo map[string]interface{} `json:"ownerInfo"`
}

var status struct {
// Phase always should be "inference"
Phase string `json:"phase"`
Status string `json:"status"`
Output *Output `json:"output"`
}

err = json.Unmarshal(content, &status)
if err != nil {
return newUnmarshalError(namespace, name, operation, content)
}

// TODO: propagate status.Status to k8s

output := status.Output
if output == nil || output.ServiceInfo == nil {
// no output info
klog.Warningf("empty status info for joint inference service %s/%s", namespace, name)
return nil
}

info := output.ServiceInfo

for _, ignoreTimeKey := range []string{
"startTime",
"updateTime",
} {
delete(info, ignoreTimeKey)
}

metrics := convertToMetrics(info)

err = uc.updateJointInferenceMetrics(name, namespace, metrics)
if err != nil {
return fmt.Errorf("failed to update metrics, err:%+w", err)
}
return nil
}

// syncEdgeUpdate receives the updates from edge and syncs these to k8s.
func (uc *UpstreamController) syncEdgeUpdate() {
for {
select {
case <-uc.messageLayer.Done():
klog.Info("Stop neptune upstream loop")
return
default:
}

update, err := uc.messageLayer.ReceiveResourceUpdate()
if err != nil {
klog.Warningf("Ignore update since this err: %+v", err)
continue
}

kind := update.Kind
namespace := update.Namespace
name := update.Name
operation := update.Operation

handler, ok := uc.updateHandlers[kind]
if ok {
err := handler(name, namespace, operation, update.Content)
if err != nil {
klog.Errorf("Error to handle %s %s/%s operation(%s): %+v", kind, namespace, name, operation, err)
}
} else {
klog.Warningf("No handler for resource kind %s", kind)
}
}
}

// Start the upstream controller
func (uc *UpstreamController) Start() error {
klog.Info("Start the neptune upstream controller")

go uc.syncEdgeUpdate()
return nil
}

func (uc *UpstreamController) GetName() string {
return "UpstreamController"
}

// NewUpstreamController create UpstreamController from config
func NewUpstreamController(cfg *config.ControllerConfig) (FeatureControllerI, error) {
client, err := utils.NewCRDClient()
if err != nil {
return nil, fmt.Errorf("create crd client failed with error: %w", err)
}
uc := &UpstreamController{
client: client,
messageLayer: messagelayer.NewContextMessageLayer(),
}

// NOTE: current no direct model update from edge,
// model update will be triggered by the corresponding training feature
uc.updateHandlers = map[string]updateHandler{
"dataset": uc.updateDatasetFromEdge,
"jointinferenceservice": uc.updateJointInferenceFromEdge,
}

return uc, nil
}

+ 19
- 0
pkg/globalmanager/utils/crdclient.go View File

@@ -0,0 +1,19 @@
package utils

import (
"k8s.io/klog/v2"

clientset "github.com/edgeai-neptune/neptune/pkg/client/clientset/versioned/typed/neptune/v1alpha1"
)

// NewCRDClient is used to create a restClient for crd
func NewCRDClient() (*clientset.NeptuneV1alpha1Client, error) {
cfg, _ := KubeConfig()
client, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Errorf("Failed to create REST Client due to error %v", err)
return nil, err
}

return client, nil
}

+ 16
- 0
pkg/globalmanager/utils/kubeclient.go View File

@@ -0,0 +1,16 @@
package utils

import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

// KubeClient from config
func KubeClient() (*kubernetes.Clientset, error) {
kubeConfig, err := KubeConfig()
if err != nil {
klog.Warningf("get kube config failed with error: %s", err)
return nil, err
}
return kubernetes.NewForConfig(kubeConfig)
}

+ 20
- 0
pkg/globalmanager/utils/kubeconfig.go View File

@@ -0,0 +1,20 @@
package utils

import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/edgeai-neptune/neptune/pkg/globalmanager/config"
)

// KubeConfig from flags
func KubeConfig() (conf *rest.Config, err error) {
kubeConfig, err := clientcmd.BuildConfigFromFlags(config.Config.Master,
config.Config.KubeConfig)
if err != nil {
return nil, err
}
kubeConfig.ContentType = "application/json"

return kubeConfig, nil
}

Loading…
Cancel
Save