Browse Source

Merge pull request #33 from llhuii/add-pod-template

Add pod template support
tags/v0.1.0
KubeEdge Bot GitHub 4 years ago
parent
commit
32838c2861
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 47507 additions and 2222 deletions
  1. +51
    -8
      Makefile
  2. +0
    -48
      build/crd-samples/sedna/federatedlearningjob_v1alpha.yaml
  3. +60
    -0
      build/crd-samples/sedna/federatedlearningjob_v1alpha1.yaml
  4. +68
    -37
      build/crd-samples/sedna/incrementallearningjob_v1alpha1.yaml
  5. +63
    -20
      build/crd-samples/sedna/jointinferenceservice_v1alpha1.yaml
  6. +75
    -0
      build/crds/sedna.io_datasets.yaml
  7. +13423
    -0
      build/crds/sedna.io_federatedlearningjobs.yaml
  8. +19957
    -0
      build/crds/sedna.io_incrementallearningjobs.yaml
  9. +13320
    -0
      build/crds/sedna.io_jointinferenceservices.yaml
  10. +81
    -0
      build/crds/sedna.io_models.yaml
  11. +0
    -56
      build/crds/sedna/dataset_v1alpha1.yaml
  12. +0
    -178
      build/crds/sedna/federatedlearningjob_v1alpha1.yaml
  13. +0
    -285
      build/crds/sedna/incrementallearningjob_v1alpha1.yaml
  14. +0
    -194
      build/crds/sedna/jointinferenceservice_v1alpha1.yaml
  15. +0
    -57
      build/crds/sedna/model_v1alpha1.yaml
  16. +2
    -1
      cmd/sedna-lc/app/server.go
  17. +3
    -441
      docs/proposals/federated-learning.md
  18. BIN
      docs/proposals/images/federated-learning-job-crd-details.png
  19. BIN
      docs/proposals/images/incremental-learning-job-crd-details.png
  20. BIN
      docs/proposals/images/joint-inference-service-crd-details.png
  21. +0
    -2
      docs/proposals/incremental-learning.md
  22. +4
    -358
      docs/proposals/joint-inference.md
  23. +8
    -65
      docs/setup/install.md
  24. +35
    -0
      examples/build_image.sh
  25. +14
    -0
      examples/federated-learning-surface-defect-detection-aggregation.Dockerfile
  26. +4
    -2
      examples/federated-learning-surface-defect-detection-train.Dockerfile
  27. +22
    -0
      examples/incremental-learning-helmet-detection.Dockerfile
  28. +5
    -1
      examples/joint-inference-helmet-detection-big.Dockerfile
  29. +16
    -0
      examples/joint-inference-helmet-detection-little.Dockerfile
  30. +35
    -0
      examples/push_image.sh
  31. +5
    -22
      hack/local-up.sh
  32. +1
    -0
      lib/sedna/common/config.py
  33. +2
    -1
      lib/sedna/context.py
  34. +4
    -1
      lib/sedna/dataset/dataset.py
  35. +0
    -9
      pkg/apis/sedna/v1alpha1/common_types.go
  36. +2
    -1
      pkg/apis/sedna/v1alpha1/dataset_types.go
  37. +7
    -16
      pkg/apis/sedna/v1alpha1/federatedlearningjob_types.go
  38. +11
    -10
      pkg/apis/sedna/v1alpha1/incrementallearningjob_types.go
  39. +8
    -8
      pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go
  40. +2
    -1
      pkg/apis/sedna/v1alpha1/model_types.go
  41. +7
    -62
      pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go
  42. +60
    -17
      pkg/globalmanager/common.go
  43. +0
    -3
      pkg/globalmanager/config/config.go
  44. +8
    -4
      pkg/globalmanager/downstream.go
  45. +33
    -92
      pkg/globalmanager/federatedlearningjob.go
  46. +52
    -120
      pkg/globalmanager/incrementallearningjob.go
  47. +29
    -88
      pkg/globalmanager/jointinferenceservice.go
  48. +10
    -6
      pkg/globalmanager/types.go
  49. +1
    -2
      pkg/localcontroller/gmclient/websocket.go
  50. +19
    -6
      pkg/localcontroller/manager/incrementallearningjob.go

+ 51
- 8
Makefile View File

@@ -14,13 +14,21 @@

GOPATH ?= $(shell go env GOPATH)

OUT_DIR ?= _output
OUT_BINPATH := $(OUT_DIR)/bin

IMAGE_REPO ?= kubeedge
IMAGE_TAG ?= v0.1.0
GO_LDFLAGS ?=''

# set allowDangerousTypes for allowing float
CRD_OPTIONS ?= "crd:crdVersions=v1,allowDangerousTypes=true"

# make all builds both gm and lc binaries
BINARIES=gm lc
SHELL=/bin/bash

.EXPORT_ALL_VARIABLES:
OUT_DIR ?= _output
OUT_BINPATH := $(OUT_DIR)/bin

define BUILD_HELP_INFO
# Build code with verifying or not.
@@ -118,16 +126,51 @@ clean:
hack/make-rules/clean.sh
endif


IMAGE_REPO ?= ghcr.io/kubeedge/sedna
IMAGE_TAG ?= v1alpha1
GO_LDFLAGS ?=''

.PHONY: images gmimage lcimage
images: gmimage lcimage
gmimage lcimage:
docker build --build-arg GO_LDFLAGS=${GO_LDFLAGS} -t ${IMAGE_REPO}/${@:image=}:${IMAGE_TAG} -f build/${@:image=}/Dockerfile .
docker build --build-arg GO_LDFLAGS=${GO_LDFLAGS} -t ${IMAGE_REPO}/sedna-${@:image=}:${IMAGE_TAG} -f build/${@:image=}/Dockerfile .

.PHONY: push push-examples push-all
push-all: push push-examples
push: images
docker push ${IMAGE_REPO}/sedna-gm:${IMAGE_TAG}
docker push ${IMAGE_REPO}/sedna-lc:${IMAGE_TAG}

push-examples:
bash examples/push_image.sh



.PHONE: e2e
e2e:
hack/run-e2e.sh

# Generate CRDs by kubebuilder
.PHONY: crds controller-gen
crds: controller-gen
$(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./pkg/apis/sedna/v1alpha1" output:crd:artifacts:config=build/crds

# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set)
ifeq (,$(shell go env GOBIN))
GOBIN=$(shell go env GOPATH)/bin
else
GOBIN=$(shell go env GOBIN)
endif

# find or download controller-gen
# download controller-gen if necessary
controller-gen:
ifeq (, $(shell which controller-gen))
@{ \
set -e ;\
CONTROLLER_GEN_TMP_DIR=$$(mktemp -d) ;\
cd $$CONTROLLER_GEN_TMP_DIR ;\
go mod init tmp ;\
go get sigs.k8s.io/controller-tools/cmd/controller-gen@v0.4.1 ;\
rm -rf $$CONTROLLER_GEN_TMP_DIR ;\
}
CONTROLLER_GEN=$(GOBIN)/controller-gen
else
CONTROLLER_GEN=$(shell which controller-gen)
endif

+ 0
- 48
build/crd-samples/sedna/federatedlearningjob_v1alpha.yaml View File

@@ -1,48 +0,0 @@
apiVersion: sedna.io/v1alpha1
kind: FederatedLearningJob
metadata:
name: surface-defect-detection
spec:
aggregationWorker:
model:
name: "surface-defect-detection-model"
nodeName: "cloud0"
workerSpec:
scriptDir: "/code"
scriptBootFile: "aggregate.py"
frameworkType: "tensorflow"
frameworkVersion: "2.3"
parameters:
- key: "exit_round"
value: "3"
trainingWorkers:
- nodeName: "edge1"
dataset:
name: "edge-1-surface-defect-detection-dataset"
workerSpec:
scriptDir: "/code"
scriptBootFile: "train.py"
frameworkType: "tensorflow"
frameworkVersion: "2.3"
parameters:
- key: "batch_size"
value: "32"
- key: "learning_rate"
value: "0.001"
- key: "epochs"
value: "1"
- nodeName: "edge2"
dataset:
name: "edge-2-surface-defect-detection-dataset"
workerSpec:
scriptDir: "/code"
scriptBootFile: "train.py"
frameworkType: "tensorflow"
frameworkVersion: "2.3"
parameters:
- key: "batch_size"
value: "32"
- key: "learning_rate"
value: "0.001"
- key: "epochs"
value: "1"

+ 60
- 0
build/crd-samples/sedna/federatedlearningjob_v1alpha1.yaml View File

@@ -0,0 +1,60 @@
apiVersion: sedna.io/v1alpha1
kind: FederatedLearningJob
metadata:
name: surface-defect-detection
spec:
aggregationWorker:
model:
name: "surface-defect-detection-model"
template:
spec:
nodeName: "cloud"
containers:
- image: kubeedge/sedna-example-federated-learning-surface-defect-detection-aggregation:v0.1.0
name: agg-worker
imagePullPolicy: IfNotPresent
env: # user defined environments
- name: "exit_round"
value: "3"
resources: # user defined resources
limits:
memory: 2Gi
trainingWorkers:
- dataset:
name: "edge1-surface-defect-detection-dataset"
template:
spec:
nodeName: "edge1"
containers:
- image: kubeedge/sedna-example-federated-learning-surface-defect-detection-train:v0.1.0
name: train-worker
imagePullPolicy: IfNotPresent
env: # user defined environments
- name: "batch_size"
value: "32"
- name: "learning_rate"
value: "0.001"
- name: "epochs"
value: "1"
resources: # user defined resources
limits:
memory: 2Gi
- dataset:
name: "edge2-surface-defect-detection-dataset"
template:
spec:
nodeName: "edge2"
containers:
- image: kubeedge/sedna-example-federated-learning-surface-defect-detection-train:v0.1.0
name: train-worker
imagePullPolicy: IfNotPresent
env: # user defined environments
- name: "batch_size"
value: "32"
- name: "learning_rate"
value: "0.001"
- name: "epochs"
value: "1"
resources: # user defined resources
limits:
memory: 2Gi

+ 68
- 37
build/crd-samples/sedna/incrementallearningjob_v1alpha1.yaml View File

@@ -9,59 +9,90 @@ spec:
name: "incremental-dataset"
trainProb: 0.8
trainSpec:
workerSpec:
scriptDir: "/code_il"
scriptBootFile: "train.py"
frameworkType: "tensorflow"
frameworkVersion: "1.15"
parameters:
- key: "batch_size"
value: "32"
- key: "learning_rate"
value: "0.001"
- key: "max_epochs"
value: "100"

template:
spec:
nodeName: "edge-node"
containers:
- image: kubeedge/sedna-example-incremental-learning-helmet-detection:v0.1.0
name: train-worker
imagePullPolicy: IfNotPresent
args: ["train.py"]
env:
- name: "batch_size"
value: "32"
- name: "epochs"
value: "1"
- name: "input_shape"
value: "352,640"
- name: "class_names"
value: "person,helmet,helmet-on,helmet-off"
- name: "nms_threshold"
value: "0.4"
- name: "obj_threshold"
value: "0.3"
trigger:
checkPeriodSeconds: 60
timer:
start: 02:00
end: 04:00
end: 24:00
condition:
operator: ">"
threshold: 500
metric: num_of_samples
evalSpec:
workerSpec:
scriptDir: "/code_il"
scriptBootFile: "eval.py"
frameworkType: "tensorflow"
frameworkVersion: "1.15"
parameters:
- key: "input_shape"
value: "352,640"
- key: "class_names"
value: "helmet,helmet-on,person,helmet-off"

template:
spec:
nodeName: "edge-node"
containers:
- image: kubeedge/sedna-example-incremental-learning-helmet-detection:v0.1.0
name: eval-worker
imagePullPolicy: IfNotPresent
args: ["eval.py"]
env:
- name: "input_shape"
value: "352,640"
- name: "class_names"
value: "person,helmet,helmet-on,helmet-off"
deploySpec:
model:
name: "inference-model"
name: "deploy-model"
trigger:
condition:
operator: ">"
threshold: 0.1
metric: precision_delta
nodeName: "edge0"
hardExampleMining:
name: "IBT"
workerSpec:
scriptDir: "/code_il"
scriptBootFile: "eval.py"
frameworkType: "tensorflow"
frameworkVersion: "1.15"
parameters:
- key: "nms_threshold"
value: "0.6"

nodeName: "cloud0"
template:
spec:
nodeName: "edge-node"
containers:
- image: kubeedge/sedna-example-incremental-learning-helmet-detection:v0.1.0
name: infer-worker
imagePullPolicy: IfNotPresent
args: ["inference.py"]
env:
- name: "input_shape"
value: "352,640"
- name: "video_url"
value: "file://video/video.mp4"
- name: "HE_SAVED_URL"
value: "/he_saved_url"
volumeMounts:
- name: localvideo
mountPath: /video/
- name: hedir
mountPath: /he_saved_url
resources: # user defined resources
limits:
memory: 2Gi
volumes: # user defined volumes
- name: localvideo
hostPath:
path: /incremental_learning/video/
type: Directory
- name: hedir
hostPath:
path: /incremental_learning/he/
type: Directory
outputDir: "/output"

+ 63
- 20
build/crd-samples/sedna/jointinferenceservice_v1alpha1.yaml View File

@@ -1,31 +1,74 @@
apiVersion: sedna.io/v1alpha1
kind: JointInferenceService
metadata:
name: example
name: helmet-detection-inference-example
namespace: default
spec:
edgeWorker:
model:
name: "small-model"
nodeName: "edge0"
name: "helmet-detection-inference-little-model"
hardExampleMining:
name: "IBT"
workerSpec:
scriptDir: "/code"
scriptBootFile: "edge_inference.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "nms_threshold"
value: "0.6"
- key: "threshold_img"
value: "0.5"
- key: "threshold_box"
value: "0.5"
template:
spec:
nodeName: "edge-node"
containers:
- image: kubeedge/sedna-example-joint-inference-helmet-detection-little:v0.1.0
imagePullPolicy: IfNotPresent
name: little-model
env: # user defined environments
- name: input_shape
value: "416,736"
- name: "video_url"
value: "file://video/video.mp4"
- name: "all_examples_inference_output"
value: "/data/output"
- name: "hard_example_cloud_inference_output"
value: "/data/hard_example_cloud_inference_output"
- name: "hard_example_edge_inference_output"
value: "/data/hard_example_edge_inference_output"
ports: # user defined ports
- containerPort: 80
protocol: TCP
resources: # user defined resources
requests:
memory: 64Mi
cpu: 100m
limits:
memory: 2Gi
volumeMounts:
- name: localvideo
mountPath: /video/
- name: outputdir
mountPath: /data/
volumes: # user defined volumes
- name: outputdir
hostPath:
path: /joint_inference/output
type: Directory
- name: localvideo
hostPath:
path: /joint_inference/video/
type: Directory

cloudWorker:
model:
name: "big-model"
nodeName: "solar-corona-cloud"
workerSpec:
scriptDir: "/code"
scriptBootFile: "cloud_inference.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "nms_threshold"
value: "0.6"
name: "helmet-detection-inference-big-model"
template:
spec:
nodeName: "edge-node"
containers:
- image: kubeedge/sedna-example-joint-inference-helmet-detection-big:v0.1.0
name: big-model
imagePullPolicy: IfNotPresent
env: # user defined environments
- name: "input_shape"
value: "544,544"
resources: # user defined resources
limits:
memory: 2Gi

+ 75
- 0
build/crds/sedna.io_datasets.yaml View File

@@ -0,0 +1,75 @@

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
creationTimestamp: null
name: datasets.sedna.io
spec:
group: sedna.io
names:
kind: Dataset
listKind: DatasetList
plural: datasets
singular: dataset
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Dataset describes the data that a dataset resource should have
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: DatasetSpec is a description of a dataset
properties:
format:
type: string
nodeName:
type: string
url:
type: string
required:
- format
- nodeName
- url
type: object
status:
description: DatasetStatus represents information about the status of
a dataset including the time a dataset updated, and number of samples
in a dataset
properties:
numberOfSamples:
type: integer
updateTime:
format: date-time
type: string
required:
- numberOfSamples
type: object
required:
- spec
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

+ 13423
- 0
build/crds/sedna.io_federatedlearningjobs.yaml
File diff suppressed because it is too large
View File


+ 19957
- 0
build/crds/sedna.io_incrementallearningjobs.yaml
File diff suppressed because it is too large
View File


+ 13320
- 0
build/crds/sedna.io_jointinferenceservices.yaml
File diff suppressed because it is too large
View File


+ 81
- 0
build/crds/sedna.io_models.yaml View File

@@ -0,0 +1,81 @@

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
creationTimestamp: null
name: models.sedna.io
spec:
group: sedna.io
names:
kind: Model
listKind: ModelList
plural: models
singular: model
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Model describes the data that a model resource should have
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: ModelSpec is a description of a model
properties:
format:
type: string
url:
type: string
required:
- format
- url
type: object
status:
description: ModelStatus represents information about the status of a
model including the time a model updated, and metrics in a model
properties:
metrics:
items:
description: Metric describes the data that a resource model metric
should have
properties:
key:
type: string
value:
type: string
required:
- key
- value
type: object
type: array
updateTime:
format: date-time
type: string
type: object
required:
- spec
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

+ 0
- 56
build/crds/sedna/dataset_v1alpha1.yaml View File

@@ -1,56 +0,0 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: datasets.sedna.io
spec:
group: sedna.io
names:
kind: Dataset
plural: datasets
scope: Namespaced
versions:
- name: v1alpha1
subresources:
# status enables the status subresource.
status: {}
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- url
- format
properties:
url:
type: string
format:
type: string
nodeName:
type: string
status:
type: object
properties:
numberOfSamples:
type: integer
updateTime:
type: string
format: datatime


additionalPrinterColumns:
- name: NumberOfSamples
type: integer
description: The number of samples in the dataset
jsonPath: ".status.numberOfSamples"
- name: Node
type: string
description: The node name of the dataset
jsonPath: ".spec.nodeName"
- name: spec
type: string
description: The spec of the dataset
jsonPath: ".spec"

+ 0
- 178
build/crds/sedna/federatedlearningjob_v1alpha1.yaml View File

@@ -1,178 +0,0 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: federatedlearningjobs.sedna.io
spec:
group: sedna.io
names:
kind: FederatedLearningJob
plural: federatedlearningjobs
shortNames:
- fl
scope: Namespaced
versions:
- name: v1alpha1
subresources:
# status enables the status subresource.
status: {}
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- aggregationWorker
- trainingWorkers
properties:
aggregationWorker:
type: object
required:
- model
- nodeName
- workerSpec
properties:
model:
type: object
required:
- name
properties:
name:
type: string
nodeName:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
trainingWorkers:
type: array
items:
type: object
required:
- nodeName
- workerSpec
- dataset
properties:
model:
type: object
properties:
name:
type: string
nodeName:
type: string
dataset:
type: object
required:
- name
properties:
name:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
status:
type: object
properties:
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastProbeTime:
type: string
format: date-time
lastTransitionTime:
type: string
format: date-time
reason:
type: string
message:
type: string
startTime:
type: string
format: date-time
completionTime:
type: string
format: date-time
active:
type: integer
succeeded:
type: integer
failed:
type: integer
phase:
type: string


additionalPrinterColumns:
- name: status
type: string
description: The status of the federated task
jsonPath: ".status.phase"
- name: active
type: integer
description: The number of active worker
jsonPath: ".status.active"
- name: failed
type: integer
description: The number of failed worker
jsonPath: ".status.failed"
- name: Age
type: date
jsonPath: .metadata.creationTimestamp

+ 0
- 285
build/crds/sedna/incrementallearningjob_v1alpha1.yaml View File

@@ -1,285 +0,0 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: incrementallearningjobs.sedna.io
spec:
group: sedna.io
names:
kind: IncrementalLearningJob
plural: incrementallearningjobs
shortNames:
- il
scope: Namespaced
versions:
- name: v1alpha1
subresources:
# status enables the status subresource.
status: {}
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- dataset
- nodeName
- outputDir
- initialModel
- trainSpec
properties:
dataset:
type: object
required:
- name
- trainProb
properties:
name:
type: string
trainProb:
type: number
nodeName:
type: string
outputDir:
type: string
initialModel:
type: object
required:
- name
properties:
name:
type: string
trainSpec:
type: object
required:
- workerSpec
- trigger
properties:
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
trigger:
type: object
properties:
checkPeriodSeconds:
type: integer
timer:
type: object
required:
- start
- end
properties:
start:
type: string
end:
type: string
condition:
type: object
required:
- operator
- threshold
- metric
properties:
operator:
type: string
enum: [">=",">","=","==","<=","<","ge","gt","eq","le","lt"]
threshold:
type: number
metric:
type: string
evalSpec:
type: object
required:
- workerSpec
properties:
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
deploySpec:
type: object
required:
- model
- trigger
- nodeName
- workerSpec
- hardExampleMining
properties:
model:
type: object
required:
- name
properties:
name:
type: string
nodeName:
type: string
hardExampleMining:
type: object
required:
- name
properties:
name:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string

trigger:
type: object
properties:
checkPeriodSeconds:
type: integer
timer:
type: object
required:
- start
- end
properties:
start:
type: string
end:
type: string
condition:
type: object
required:
- operator
- threshold
- metric
properties:
operator:
type: string
enum: [">=",">","=","==","<=","<","ge","gt","eq","le","lt"]
threshold:
type: number
metric:
type: string

status:
type: object
properties:
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastHeartbeatTime:
type: string
format: date-time
lastTransitionTime:
type: string
format: date-time
reason:
type: string
message:
type: string
data:
type: string
stage:
type: string
startTime:
type: string
format: date-time
completionTime:
type: string
format: date-time
active:
type: integer
succeeded:
type: integer
failed:
type: integer


additionalPrinterColumns:
- name: stage
type: string
description: The status of the incremental job
jsonPath: ".status.conditions[-1].stage"
- name: status
type: string
description: The status of the incremental job
jsonPath: ".status.conditions[-1].type"
- name: Age
type: date
jsonPath: .metadata.creationTimestamp

+ 0
- 194
build/crds/sedna/jointinferenceservice_v1alpha1.yaml View File

@@ -1,194 +0,0 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: jointinferenceservices.sedna.io
spec:
group: sedna.io
names:
kind: JointInferenceService
plural: jointinferenceservices
shortNames:
- jointinferenceservice
- jis
scope: Namespaced
versions:
- name: v1alpha1
subresources:
# status enables the status subresource.
status: {}
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- edgeWorker
- cloudWorker
properties:
edgeWorker:
type: object
required:
- model
- nodeName
- hardExampleMining
- workerSpec
properties:
model:
type: object
required:
- name
properties:
name:
type: string
nodeName:
type: string
hardExampleMining:
type: object
required:
- name
properties:
name:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
cloudWorker:
type: object
required:
- model
- nodeName
- workerSpec
properties:
model:
type: object
required:
- name
properties:
name:
type: string
nodeName:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
status:
type: object
properties:
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastHeartbeatTime:
type: string
format: date-time
lastTransitionTime:
type: string
format: date-time
reason:
type: string
message:
type: string
startTime:
type: string
format: date-time
active:
type: integer
failed:
type: integer
metrics:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string


additionalPrinterColumns:
- name: status
type: string
description: The status of the jointinference service
jsonPath: ".status.conditions[-1].type"
- name: active
type: integer
description: The number of active worker
jsonPath: ".status.active"
- name: failed
type: integer
description: The number of failed worker
jsonPath: ".status.failed"
- name: Age
type: date
jsonPath: .metadata.creationTimestamp

+ 0
- 57
build/crds/sedna/model_v1alpha1.yaml View File

@@ -1,57 +0,0 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: models.sedna.io
spec:
group: sedna.io
names:
kind: Model
plural: models
scope: Namespaced
versions:
- name: v1alpha1
subresources:
# status enables the status subresource.
status: {}
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- url
- format
properties:
url:
type: string
format:
type: string
status:
type: object
properties:
updateTime:
type: string
format: datetime
metrics:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string


additionalPrinterColumns:
- name: updateAGE
type: date
description: The update age
jsonPath: ".status.updateTime"
- name: metrics
type: string
description: The metrics
jsonPath: ".status.metrics"

+ 2
- 1
cmd/sedna-lc/app/server.go View File

@@ -67,7 +67,8 @@ It manages dataset and models, and controls ai features in local nodes.`, cmdNam
Options.NodeName = os.Getenv(constants.HostNameENV)
}

if Options.VolumeMountPrefix = os.Getenv(constants.RootFSMountDirENV); Options.VolumeMountPrefix == "" {
var ok bool
if Options.VolumeMountPrefix, ok = os.LookupEnv(constants.RootFSMountDirENV); !ok {
Options.VolumeMountPrefix = "/rootfs"
}



+ 3
- 441
docs/proposals/federated-learning.md View File

@@ -71,368 +71,12 @@ The tables below summarize the group, kind and API version details for the CRD.
### Federated learning CRD
![](./images/federated-learning-job-crd-details.png)
Notes:
1. We use `WorkerSpec` to represent the worker runtime config which all EdgeAI features use.
1. Currently `WorkerSpec` limits to the code directory on host path or s3-like storage.
We will extend it to the support with `pod template` like k8s deployment.
1. We will add the [resources](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/) support in the future.
Below is the CustomResourceDefinition yaml for `FederatedLearningJob`:
[crd source](/build/crds/sedna/federatedjob_v1alpha1.yaml)
```yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: federatedjobs.sedna.io
spec:
group: sedna.io
names:
kind: FederatedJob
plural: federatedjobs
shortNames:
- federated
- ft
scope: Namespaced
versions:
- name: v1alpha1
subresources:
# status enables the status subresource.
status: {}
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- aggregationWorker
- trainingWorkers
properties:
aggregationWorker:
type: object
required:
- name
- model
- nodeName
- workerSpec
properties:
name:
type: string
model:
type: object
required:
- name
properties:
name:
type: string
nodeName:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
trainingWorkers:
type: array
items:
type: object
required:
- name
- nodeName
- workerSpec
properties:
name:
type: string
model:
type: object
properties:
name:
type: string
nodeName:
type: string
workerSpec:
type: object
required:
- dataset
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
dataset:
type: object
required:
- name
properties:
name:
type: string
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
status:
type: object
properties:
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastProbeTime:
type: string
format: date-time
lastTransitionTime:
type: string
format: date-time
reason:
type: string
message:
type: string
startTime:
type: string
format: date-time
completionTime:
type: string
format: date-time
active:
type: integer
succeeded:
type: integer
failed:
type: integer
phase:
type: string
additionalPrinterColumns:
- name: status
type: string
description: The status of the federated job
jsonPath: ".status.phase"
- name: active
type: integer
description: The number of active worker
jsonPath: ".status.active"
- name: failed
type: integer
description: The number of failed worker
jsonPath: ".status.failed"
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
```
[crd source](/build/crds/sedna.io_jointinferenceservices.yaml)
### Federated learning type definition
[go source](cloud/pkg/apis/sedna/v1alpha1/federatedjob_types.go)
```go
package v1alpha1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// FederatedJob describes the data that a federatedjob resource should have
type FederatedJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec FederatedJobSpec `json:"spec"`
Status FederatedJobStatus `json:"status,omitempty"`
}
// FederatedJobSpec is a description of a federatedjob
type FederatedJobSpec struct {
AggregationWorker AggregationWorker `json:"aggregationWorker"`
TrainingWorkers []TrainingWorker `json:"trainingWorkers"`
}
// AggregationWorker describes the data an aggregation worker should have
type AggregationWorker struct {
Name string `json:"name"`
Model modelRefer `json:"model"`
NodeName string `json:"nodeName"`
WorkerSpec AggregationWorkerSpec `json:"workerSpec"`
}
// TrrainingWorker describes the data a training worker should have
type TrainingWorker struct {
Name string `json:"name"`
NodeName string `json:"nodeName"`
WorkerSpec TrainingWorkerSpec `json:"workerSpec"`
}
// AggregationWorkerSpec is a description of a aggregationworker
type AggregationWorkerSpec struct {
CommonWorkerSpec
}
// TrainingWorkerSpec is a description of a trainingworker
type TrainingWorkerSpec struct {
CommonWorkerSpec
Dataset datasetRefer `json:"dataset"`
}
type datasetRefer struct {
Name string `json:"name"`
}
type modelRefer struct {
Name string `json:"name"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// FederatedJobList is a list of FederatedJobs.
type FederatedJobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []FederatedJob `json:"items"`
}
// FederatedJobStatus represents the current state of a federated job.
type FederatedJobStatus struct {
// The latest available observations of a federated job's current state.
// +optional
Conditions []FederatedJobCondition `json:"conditions,omitempty"`
// Represents time when the job was acknowledged by the job controller.
// It is not guaranteed to be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// +optional
StartTime *metav1.Time `json:"startTime,omitempty"`
// Represents time when the job was completed. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// +optional
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
// The number of actively running pods.
// +optional
Active int32 `json:"active"`
// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"succeeded"`
// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed"`
// The phase of the federatedjob.
// +optional
Phase FederatedJobPhase `json:"phase,omitempty"`
}
type FederatedJobConditionType string
// These are valid conditions of a job.
const (
// FederatedJobComplete means the job has completed its execution.
FederatedJobCondComplete FederatedJobConditionType = "Complete"
// FederatedJobFailed means the job has failed its execution.
FederatedJobCondFailed FederatedJobConditionType = "Failed"
// FederatedJobTraining means the job has been training.
FederatedJobCondTraining FederatedJobConditionType = "Training"
)
// FederatedJobCondition describes current state of a job.
type FederatedJobCondition struct {
// Type of job condition, Complete or Failed.
Type FederatedJobConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
Status v1.ConditionStatus `json:"status"`
// Last time the condition was checked.
// +optional
LastProbeTime metav1.Time `json:"lastProbeTime,omitempty"`
// Last time the condition transit from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// (brief) reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty"`
// Human readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty"`
}
// FederatedJobPhase is a label for the condition of a job at the current time.
type FederatedJobPhase string
// These are the valid statuses of jobs.
const (
// FederatedJobPending means the job has been accepted by the system, but one or more of the pods
// has not been started. This includes time before being bound to a node, as well as time spent
// pulling images onto the host.
FederatedJobPending FederatedJobPhase = "Pending"
// FederatedJobRunning means the job has been bound to a node and all of the pods have been started.
// At least one container is still running or is in the process of being restarted.
FederatedJobRunning FederatedJobPhase = "Running"
// FederatedJobSucceeded means that all pods in the job have voluntarily terminated
// with a container exit code of 0, and the system is not going to restart any of these pods.
FederatedJobSucceeded FederatedJobPhase = "Succeeded"
// FederatedJobFailed means that all pods in the job have terminated, and at least one container has
// terminated in a failure (exited with a non-zero exit code or was stopped by the system).
FederatedJobFailed FederatedJobPhase = "Failed"
)
```
[go source](/pkg/apis/sedna/v1alpha1/federatedjob_types.go)
#### Validation
[Open API v3 Schema based validation](https://kubernetes.io/docs/jobs/access-kubernetes-api/custom-resources/custom-resource-definitions/#validation) can be used to guard against bad requests.
@@ -444,89 +88,7 @@ Here is a list of validations we need to support :
1. The edgenode name specified in the crd should exist in k8s.
### federated learning sample
```yaml
apiVersion: sedna.io/v1alpha1
kind: FederatedLearningJob
metadata:
name: magnetic-tile-defect-detection
spec:
aggregationWorker:
name: "aggregationworker"
model:
name: "model-demo1"
nodeName: "cloud"
workerSpec:
scriptDir: "/code"
scriptBootFile: "aggregate.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "exit_round"
value: "3"
trainingWorkers:
- name: "work0"
nodeName: "edge0"
workerSpec:
dataset:
name: "dataset-demo0"
scriptDir: "/code"
scriptBootFile: "train.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "batch_size"
value: "32"
- key: "learning_rate"
value: "0.001"
- key: "epochs"
value: "1"
- name: "work1"
nodeName: "edge1"
workerSpec:
dataset:
name: "dataset-demo1"
scriptDir: "/code"
scriptBootFile: "train.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "batch_size"
value: "32"
- key: "learning_rate"
value: "0.001"
- key: "epochs"
value: "1"
- key: "min_sample_number_per"
value: "500"
- key: "min_node_number"
value: "3"
- key: "rounds_between_valida"
value: "3"
- name: "work2"
nodeName: "edge2"
workerSpec:
dataset:
name: "dataset-demo2"
scriptDir: "/code"
scriptBootFile: "train.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "batch_size"
value: "32"
- key: "learning_rate"
value: "0.001"
- key: "epochs"
value: "1"
- key: "min_sample_number_per"
value: "500"
- key: "min_node_number"
value: "3"
- key: "rounds_between_valida"
value: "3"
```
see [sample source](/build/crd-samples/sedna/federatedlearningjob_v1alpha1.yaml)
### Creation of the federated learning job


BIN
docs/proposals/images/federated-learning-job-crd-details.png View File

Before After
Width: 994  |  Height: 667  |  Size: 27 kB

BIN
docs/proposals/images/incremental-learning-job-crd-details.png View File

Before After
Width: 831  |  Height: 713  |  Size: 33 kB

BIN
docs/proposals/images/joint-inference-service-crd-details.png View File

Before After
Width: 814  |  Height: 613  |  Size: 34 kB

+ 0
- 2
docs/proposals/incremental-learning.md View File

@@ -74,8 +74,6 @@ The tables below summarize the group, kind and API version details for the CRD.
|Kind | IncrementalLearningJob |
### Incremental learning CRD
![](./images/incremental-learning-job-crd-details.png)
See the [crd source](/build/crds/sedna/incrementallearningjob_v1alpha1.yaml) for details.
### Incremental learning job type definition


+ 4
- 358
docs/proposals/joint-inference.md View File

@@ -63,331 +63,12 @@ The tables below summarize the group, kind and API version details for the CRD.
|Kind | JointInferenceService |
### Joint inference CRD
![](./images/joint-inference-service-crd-details.png)
Below is the CustomResourceDefinition yaml for `JointInferenceService`:
[crd source](/build/crds/sedna/jointinferenceservice_v1alpha1.yaml)
```yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: jointinferenceservices.sedna.io
spec:
group: sedna.io
names:
kind: JointInferenceService
plural: jointinferenceservices
shortNames:
- jointinferenceservice
- jis
scope: Namespaced
versions:
- name: v1alpha1
subresources:
# status enables the status subresource.
status: {}
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- edgeWorker
- cloudWorker
properties:
edgeWorker:
type: object
required:
- name
- model
- nodeName
- hardExampleAlgorithm
- workerSpec
properties:
name:
type: string
model:
type: object
required:
- name
properties:
name:
type: string
nodeName:
type: string
hardExampleAlgorithm:
type: object
required:
- name
properties:
name:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
cloudWorker:
type: object
required:
- name
- model
- nodeName
- workerSpec
properties:
name:
type: string
model:
type: object
required:
- name
properties:
name:
type: string
nodeName:
type: string
workerSpec:
type: object
required:
- scriptDir
- scriptBootFile
- frameworkType
- frameworkVersion
properties:
scriptDir:
type: string
scriptBootFile:
type: string
frameworkType:
type: string
frameworkVersion:
type: string
parameters:
type: array
items:
type: object
required:
- key
- value
properties:
key:
type: string
value:
type: string
status:
type: object
properties:
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastHeartbeatTime:
type: string
format: date-time
lastTransitionTime:
type: string
format: date-time
reason:
type: string
message:
type: string
startTime:
type: string
format: date-time
active:
type: integer
failed:
type: integer
metrics:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string
additionalPrinterColumns:
- name: status
type: string
description: The status of the jointinference service
jsonPath: ".status.conditions[-1].type"
- name: active
type: integer
description: The number of active worker
jsonPath: ".status.active"
- name: failed
type: integer
description: The number of failed worker
jsonPath: ".status.failed"
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
```
see [crd source](/build/crd-samples/sedna/jointinferenceservice_v1alpha1.yaml)
### Joint inference type definition
[go source](cloud/pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go)
```go
package v1alpha1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// JointInferenceService describes the data that a jointinferenceservice resource should have
type JointInferenceService struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
Spec JointInferenceServiceSpec `json:"spec"`
Status JointInferenceServiceStatus `json:"status,omitempty"`
}
// JointInferenceServiceSpec is a description of a jointinferenceservice
type JointInferenceServiceSpec struct {
EdgeWorker EdgeWorker `json:"edgeWorker"`
CloudWorker CloudWorker `json:"cloudWorker"`
}
// EdgeWorker describes the data a edge worker should have
type EdgeWorker struct {
Name string `json:"name"`
Model SmallModel `json:"model"`
NodeName string `json:"nodeName"`
HardExampleAlgorithm HardExampleAlgorithm `json:"hardExampleAlgorithm"`
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
}
// CloudWorker describes the data a cloud worker should have
type CloudWorker struct {
Name string `json:"name"`
Model BigModel `json:"model"`
NodeName string `json:"nodeName"`
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
}
type SmallModel struct {
Name string `json:"name"`
}
type BigModel struct {
Name string `json:"name"`
}
type HardExampleAlgorithm struct {
Name string `json:"name"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// JointInferenceServiceList is a list of JointInferenceServices.
type JointInferenceServiceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []JointInferenceService `json:"items"`
}
// JointInferenceServiceStatus represents the current state of a joint inference service.
type JointInferenceServiceStatus struct {
// The latest available observations of a joint inference service's current state.
// +optional
Conditions []JointInferenceServiceCondition `json:"conditions,omitempty"`
// Represents time when the service was acknowledged by the service controller.
// It is not guaranteed to be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// +optional
StartTime *metav1.Time `json:"startTime,omitempty"`
// The number of actively running workers.
// +optional
Active int32 `json:"active"`
// The number of workers which reached to Failed.
// +optional
Failed int32 `json:"failed"`
// Metrics of the joint inference service.
Metrics []Metric `json:"metrics,omitempty"`
}
type JointInferenceServiceConditionType string
// These are valid conditions of a service.
const (
// JointInferenceServiceCondPending means the service has been accepted by the system,
// but one or more of the workers has not been started.
JointInferenceServiceCondPending JointInferenceServiceConditionType = "Pending"
// JointInferenceServiceCondFailed means the service has failed its execution.
JointInferenceServiceCondFailed JointInferenceServiceConditionType = "Failed"
// JointInferenceServiceReady means the service has been ready.
JointInferenceServiceCondRunning JointInferenceServiceConditionType = "Running"
)
// JointInferenceServiceCondition describes current state of a service.
type JointInferenceServiceCondition struct {
// Type of service condition, Complete or Failed.
Type JointInferenceServiceConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
Status v1.ConditionStatus `json:"status"`
// Last time the condition was checked.
// +optional
LastHeartbeatTime metav1.Time `json:"lastHeartbeatTime,omitempty"`
// Last time the condition transit from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// (brief) reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty"`
// Human readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty"`
}
```
see [go source](/pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go)
#### Validation
[Open API v3 Schema based validation](https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#validation) can be used to guard against bad requests.
@@ -399,42 +80,7 @@ Here is a list of validations we need to support :
1. The edgenode name specified in the crd should exist in k8s.
### joint inference sample
```yaml
apiVersion: sedna.io/v1alpha1
kind: JointInferenceService
metadata:
name: helmet-detection-demo
namespace: default
spec:
edgeWorker:
name: "edgeworker"
model:
name: "small-model"
nodeName: "edge0"
hardExampleAlgorithm:
name: "IBT"
workerSpec:
scriptDir: "/code"
scriptBootFile: "edge_inference.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "nms_threshold"
value: "0.6"
cloudWorker:
name: "work"
model:
name: "big-model"
nodeName: "solar-corona-cloud"
workerSpec:
scriptDir: "/code"
scriptBootFile: "cloud_inference.py"
frameworkType: "tensorflow"
frameworkVersion: "1.18"
parameters:
- key: "nms_threshold"
value: "0.6"
```
see [sample source](/build/crd-samples/sedna/jointinferenceservice_v1alpha1.yaml)
## Controller Design
The joint inference controller starts three separate goroutines called `upstream`, `downstream` and `joint-inference`controller. These are not separate controllers as such but named here for clarity.


+ 8
- 65
docs/setup/install.md View File

@@ -3,10 +3,7 @@
* [Create CRDs](#create-crds)
* [Deploy GM](#deploy-gm)
* [Prepare GM config](#prepare-gm-config)
* [Build worker base images](#build-worker-base-images)
* [Run GM as k8s deployment(recommended)](#run-gm-as-k8s-deploymentrecommended)
* [Run GM as a single process(alternative)](#run-gm-as-a-single-processalternative)
* [Run GM as docker container(alternative)](#run-gm-as-docker-containeralternative)
* [Run GM as k8s deployment](#run-gm-as-k8s-deployment)
* [Deploy LC](#deploy-lc)

## Deploy Sedna
@@ -30,14 +27,14 @@ The shell commands below should to be executed in this node and **one terminal s
```shell
git clone http://github.com/kubeedge/sedna.git
cd sedna
git checkout master
git checkout main
```

### Create CRDs

```shell
# create these crds including dataset, model, joint-inference
kubectl apply -f build/crds/sedna/
kubectl create -f build/crds/
```

### Deploy GM
@@ -48,8 +45,6 @@ Get `build/gm/gm-config.yaml` for a copy
kubeConfig: ""
master: ""
namespace: ""
imageHub:
"tensorflow:1.15": "docker.io/sedna/tensorflow-base-image-to-filled:1.15"
websocket:
address: 0.0.0.0
port: 9000
@@ -59,33 +54,11 @@ localController:
1. `kubeConfig`: config to connect k8s, default `""`
1. `master`: k8s master addr, default `""`
1. `namespace`: the namespace GM watches, `""` means that gm watches all namespaces, default `""`.
1. `imageHub`: the base image mapping for model training/evaluation/inference which key is frameworkType/frameVersion.
1. `websocket`: since the current limit of kubeedge(1.5), GM needs to build the websocket channel for communicating between GM and LCs.
1. `localController`:
- `server`: to be injected into the worker to connect LC.

#### Build worker base images

Here build worker base image for tensorflow 1.15 for example:
```shell
# here using github container registry for example.
# edit it with the truly container registry by your choice.
IMAGE_REPO=ghcr.io/kubeedge/sedna

# build tensorflow image
WORKER_TF1_IMAGE=$IMAGE_REPO/worker-tensorflow:1.15

docker build -f build/worker/base_images/tensorflow/tensorflow-1.15.Dockerfile -t $WORKER_TF1_IMAGE .

# push worker image to registry, login to registry first if needed
docker push $WORKER_TF1_IMAGE
```



There are some methods to run gm, you can choose one method below:

#### Run GM as k8s deployment(**recommended**):
#### Run GM as k8s deployment:

We don't need to config the kubeconfig in this method said by [accessing the API from a Pod](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod).

@@ -103,8 +76,8 @@ LC_PORT=9100

# here using github container registry for example
# edit it with the truly container registry by your choice.
IMAGE_REPO=ghcr.io/kubeedge/sedna
IMAGE_TAG=v1alpha1
IMAGE_REPO=kubeedge
IMAGE_TAG=v0.1.0

LC_SERVER="http://localhost:$LC_PORT"

@@ -122,9 +95,6 @@ sed -i 's@master:.*@master: ""@' $CONFIG_FILE

sed -i "s@port:.*@port: $GM_PORT@" $CONFIG_FILE

# setting tensorflow1.15 base image
sed -i 's@\("tensorflow:1.15":\).*@\1 '"$WORKER_TF1_IMAGE@" $CONFIG_FILE

# setting lc server
sed -i "s@http://localhost:9100@$LC_SERVER@" $CONFIG_FILE

@@ -135,7 +105,7 @@ sed -i "s@http://localhost:9100@$LC_SERVER@" $CONFIG_FILE
# build image from source OR use the gm image previous built.

# edit it with the truly base repo by your choice.
GM_IMAGE=$IMAGE_REPO/gm:$IMAGE_TAG
GM_IMAGE=$IMAGE_REPO/sedna-gm:$IMAGE_TAG

make gmimage IMAGE_REPO=$IMAGE_REPO IMAGE_TAG=$IMAGE_TAG

@@ -216,33 +186,6 @@ EOF
kubectl get deploy -n sedna gm
```

#### Run GM as a single process(alternative)
1\. config GM:
```shell
cp build/gm/sedna-gm.yaml gm.yaml
# make sure /root/.kube/config exists
sed -i 's@kubeConfig.*@kubeConfig: /root/.kube/config@' gm.yaml
```

2\. compile and run GM direct:
```shell
go build cmd/sedna-gm/sedna-gm.go
./sedna-gm --config gm.yaml -v2
```

#### Run GM as docker container(alternative)
1\. build GM image:
```shell
GM_IMAGE=$IMAGE_REPO/gm:$IMAGE_TAG
sed -i 's@kubeConfig.*@kubeConfig: /root/.kube/config@' build/gm/sedna-gm.yaml
make gmimage IMAGE_REPO=$IMAGE_REPO IMAGE_TAG=$IMAGE_TAG
```

2\. run GM as container:
```shell
docker run --net host -v /root/.kube:/root/.kube $GM_IMAGE
```

### Deploy LC
Prerequisites:
1. Run GM successfully.
@@ -252,7 +195,7 @@ Steps:

1\. Build LC image:
```shell
LC_IMAGE=$IMAGE_REPO/lc:$IMAGE_TAG
LC_IMAGE=$IMAGE_REPO/sedna-lc:$IMAGE_TAG

make lcimage IMAGE_REPO=$IMAGE_REPO IMAGE_TAG=$IMAGE_TAG



+ 35
- 0
examples/build_image.sh View File

@@ -0,0 +1,35 @@
#!/bin/bash

# Copyright 2021 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cd "$(dirname "${BASH_SOURCE[0]}")"

IMAGE_REPO=${IMAGE_REPO:-kubeedge}
IMAGE_TAG=${IMAGE_TAG:-v0.1.0}

EXAMPLE_REPO_PREFIX=${IMAGE_REPO}/sedna-example-

dockerfiles=(
federated-learning-surface-defect-detection-aggregation.Dockerfile
federated-learning-surface-defect-detection-train.Dockerfile
incremental-learning-helmet-detection.Dockerfile
joint-inference-helmet-detection-big.Dockerfile
joint-inference-helmet-detection-little.Dockerfile
)

for dockerfile in ${dockerfiles[@]}; do
example_name=${dockerfile/.Dockerfile}
docker build -f $dockerfile -t ${EXAMPLE_REPO_PREFIX}${example_name}:${IMAGE_TAG} --label sedna=examples ..
done

+ 14
- 0
examples/federated-learning-surface-defect-detection-aggregation.Dockerfile View File

@@ -0,0 +1,14 @@
FROM tensorflow/tensorflow:2.3.0

RUN apt update \
&& apt install -y libgl1-mesa-glx
COPY ./lib/requirements.txt /home
RUN pip install -r /home/requirements.txt

ENV PYTHONPATH "/home/lib"

COPY ./lib /home/lib
WORKDIR /home/work
COPY examples/federated_learning/surface_defect_detection/aggregation_worker/ /home/work/

ENTRYPOINT ["python", "aggregate.py"]

build/worker/base_images/tensorflow/tensorflow-2.3.Dockerfile → examples/federated-learning-surface-defect-detection-train.Dockerfile View File

@@ -7,7 +7,9 @@ RUN pip install -r /home/requirements.txt

ENV PYTHONPATH "/home/lib"

WORKDIR /home/work
COPY ./lib /home/lib

ENTRYPOINT ["python"]
WORKDIR /home/work
COPY examples/federated_learning/surface_defect_detection/training_worker/ /home/work/

ENTRYPOINT ["python", "train.py"]

+ 22
- 0
examples/incremental-learning-helmet-detection.Dockerfile View File

@@ -0,0 +1,22 @@
FROM tensorflow/tensorflow:1.15.4

RUN apt update \
&& apt install -y libgl1-mesa-glx
COPY ./lib/requirements.txt /home
# install requirements of sedna lib
RUN pip install -r /home/requirements.txt

# extra requirements for example
RUN pip install tqdm==4.56.0
RUN pip install matplotlib==3.3.3


ENV PYTHONPATH "/home/lib"

WORKDIR /home/work
COPY ./lib /home/lib

COPY examples/incremental_learning/helmet_detection_incremental_train/training/ /home/work/


ENTRYPOINT ["python"]

build/worker/base_images/tensorflow/tensorflow-1.15.Dockerfile → examples/joint-inference-helmet-detection-big.Dockerfile View File

@@ -10,4 +10,8 @@ ENV PYTHONPATH "/home/lib"
WORKDIR /home/work
COPY ./lib /home/lib

ENTRYPOINT ["python"]
ENTRYPOINT ["python"]

COPY examples/joint_inference/helmet_detection_inference/big_model/big_model.py /home/work/infer.py

CMD ["infer.py"]

+ 16
- 0
examples/joint-inference-helmet-detection-little.Dockerfile View File

@@ -0,0 +1,16 @@
FROM tensorflow/tensorflow:1.15.4

RUN apt update \
&& apt install -y libgl1-mesa-glx
COPY ./lib/requirements.txt /home
RUN pip install -r /home/requirements.txt

ENV PYTHONPATH "/home/lib"

WORKDIR /home/work
COPY ./lib /home/lib

ENTRYPOINT ["python"]
COPY examples/joint_inference/helmet_detection_inference/little_model/little_model.py /home/work/infer.py

CMD ["infer.py"]

+ 35
- 0
examples/push_image.sh View File

@@ -0,0 +1,35 @@
#!/bin/bash

# Copyright 2021 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cd "$(dirname "${BASH_SOURCE[0]}")"

export IMAGE_REPO=${IMAGE_REPO:-kubeedge}
export IMAGE_TAG=${IMAGE_TAG:-v0.1.0}

bash build_image.sh

for i in $(
docker images --filter label=sedna=examples |
grep $IMAGE_REPO |
grep $IMAGE_TAG |
awk 'a++&&$0=$1":"$2'
); do
docker push $i && {
echo done docker push $i
} || {
echo failed to docker push $i
}
done

+ 5
- 22
hack/local-up.sh View File

@@ -44,7 +44,7 @@ cd "$SEDNA_ROOT"

NO_CLEANUP=${NO_CLEANUP:-false}

IMAGE_REPO=localhost/kubeedge/sedna
IMAGE_REPO=localhost/sedna
IMAGE_TAG=localup

# local k8s cluster name for local-up-kubeedge.sh
@@ -176,21 +176,11 @@ build_component_image() {
for bin; do
echo "building $bin image"
make -C "${SEDNA_ROOT}" ${bin}image IMAGE_REPO=$IMAGE_REPO IMAGE_TAG=$IMAGE_TAG
eval ${bin^^}_IMAGE="'${IMAGE_REPO}/${bin}:${IMAGE_TAG}'"
eval ${bin^^}_IMAGE="'${IMAGE_REPO}/sedna-${bin}:${IMAGE_TAG}'"
done
# no clean up for images
}

build_worker_base_images() {
echo "building worker base images"
# build tensorflow1.15 image
WORKER_TF1_IMAGE=$IMAGE_REPO/worker-tensorflow:1.15
docker build -f build/worker/base_images/tensorflow/tensorflow-1.15.Dockerfile -t $WORKER_TF1_IMAGE .

WORKER_IMAGE_HUB="'tensorflow:1.15': $WORKER_TF1_IMAGE"
# add more base images
}

load_images_to_master() {
local image

@@ -206,16 +196,13 @@ prepare_k8s_env() {
export KUBECONFIG=$(realpath $TMP_DIR/kubeconfig)
# prepare our k8s environment
# create these crds including dataset, model, joint-inference etc.
kubectl apply -f build/crds/sedna/

# gm, lc will be created in this namespace
kubectl create namespace $NAMESPACE
kubectl create -f build/crds/

# create the cluster role for gm
kubectl apply -f build/gm/rbac/
kubectl create -f build/gm/rbac/

add_cleanup "
kubectl delete -f build/crds/sedna/
kubectl delete -f build/crds/
kubectl delete namespace $NAMESPACE --timeout=5s
"
load_images_to_master
@@ -233,8 +220,6 @@ start_gm() {
cat > gmconfig <<EOF
kubeConfig: ""
namespace: ""
imageHub:
${WORKER_IMAGE_HUB:-}
websocket:
port: $GM_BIND_PORT
localController:
@@ -470,8 +455,6 @@ do_up() {

build_component_image gm lc

build_worker_base_images

check_prerequisites

prepare_k8s_env


+ 1
- 0
lib/sedna/common/config.py View File

@@ -18,6 +18,7 @@ import os
class BaseConfig:
"""The base config, the value can not be changed."""
# dataset
original_dataset_url = os.getenv("ORIGINAL_DATASET_URL")
train_dataset_url = os.getenv("TRAIN_DATASET_URL")
test_dataset_url = os.getenv("TEST_DATASET_URL")
data_path_prefix = os.getenv("DATA_PATH_PREFIX", "/home/data")


+ 2
- 1
lib/sedna/context.py View File

@@ -14,6 +14,7 @@

import json
import logging
import os

from sedna.common.config import BaseConfig

@@ -45,7 +46,7 @@ class Context:
`PARAMETERS` and `HEM_PARAMETERS` field"""

def __init__(self):
self.parameters = parse_parameters(BaseConfig.parameters)
self.parameters = os.environ
self.hem_parameters = parse_parameters(BaseConfig.hem_parameters)

def get_context(self):


+ 4
- 1
lib/sedna/dataset/dataset.py View File

@@ -63,7 +63,10 @@ def load_test_dataset(data_format, preprocess_fun=None, **kwargs):

def _load_txt_dataset(dataset_url):
LOG.info(f'dataset_url is {dataset_url}, now reading dataset_url')
root_path = os.path.dirname(dataset_url)

# use original dataset url,
# see https://github.com/kubeedge/sedna/issues/35
root_path = os.path.dirname(BaseConfig.original_dataset_url or dataset_url)
with open(dataset_url) as f:
lines = f.readlines()
new_lines = [root_path + os.path.sep + l for l in lines]


+ 0
- 9
pkg/apis/sedna/v1alpha1/common_types.go View File

@@ -22,15 +22,6 @@ type Metric struct {
Value string `json:"value"`
}

// CommonWorkerSpec is a description of a worker both for edge and cloud
type CommonWorkerSpec struct {
ScriptDir string `json:"scriptDir"`
ScriptBootFile string `json:"scriptBootFile"`
FrameworkType string `json:"frameworkType"`
FrameworkVersion string `json:"frameworkVersion"`
Parameters []ParaSpec `json:"parameters"`
}

// ParaSpec is a description of a parameter
type ParaSpec struct {
Key string `json:"key"`


+ 2
- 1
pkg/apis/sedna/v1alpha1/dataset_types.go View File

@@ -22,6 +22,7 @@ import (

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status

// Dataset describes the data that a dataset resource should have
type Dataset struct {
@@ -30,7 +31,7 @@ type Dataset struct {
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec DatasetSpec `json:"spec"`
Status DatasetStatus `json:"status"`
Status DatasetStatus `json:"status,omitempty"`
}

// DatasetSpec is a description of a dataset


+ 7
- 16
pkg/apis/sedna/v1alpha1/federatedlearningjob_types.go View File

@@ -23,6 +23,9 @@ import (

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:resource:shortName=fl
// +kubebuilder:subresource:status

// FederatedLearningJob describes the data that a FederatedLearningJob resource should have
type FederatedLearningJob struct {
metav1.TypeMeta `json:",inline"`
@@ -40,26 +43,14 @@ type FLJobSpec struct {

// AggregationWorker describes the data an aggregation worker should have
type AggregationWorker struct {
Model modelRefer `json:"model"`
NodeName string `json:"nodeName"`
WorkerSpec AggregationWorkerSpec `json:"workerSpec"`
Model modelRefer `json:"model"`
Template v1.PodTemplateSpec `json:"template"`
}

// TrrainingWorker describes the data a training worker should have
type TrainingWorker struct {
NodeName string `json:"nodeName"`
WorkerSpec TrainingWorkerSpec `json:"workerSpec"`
Dataset datasetRefer `json:"dataset"`
}

// AggregationWorkerSpec is a description of a aggregationworker
type AggregationWorkerSpec struct {
CommonWorkerSpec
}

// TrainingWorkerSpec is a description of a trainingworker
type TrainingWorkerSpec struct {
CommonWorkerSpec
Dataset datasetRefer `json:"dataset"`
Template v1.PodTemplateSpec `json:"template"`
}

type datasetRefer struct {


+ 11
- 10
pkg/apis/sedna/v1alpha1/incrementallearningjob_types.go View File

@@ -23,6 +23,9 @@ import (

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:resource:shortName=il
// +kubebuilder:subresource:status

// IncrementalLearningJob describes the data that a incrementallearningjob resource should have
type IncrementalLearningJob struct {
metav1.TypeMeta `json:",inline"`
@@ -30,14 +33,13 @@ type IncrementalLearningJob struct {
metav1.ObjectMeta `json:"metadata"`

Spec ILJobSpec `json:"spec"`
Status ILJobStatus `json:"status"`
Status ILJobStatus `json:"status,omitempty"`
}

// ILJobSpec is a description of a incrementallearningjob
type ILJobSpec struct {
Dataset ILDataset `json:"dataset"`
OutputDir string `json:"outputDir"`
NodeName string `json:"nodeName"`
InitialModel InitialModel `json:"initialModel"`
TrainSpec TrainSpec `json:"trainSpec"`
EvalSpec EvalSpec `json:"evalSpec"`
@@ -46,22 +48,21 @@ type ILJobSpec struct {

// TrainSpec describes the data an train worker should have
type TrainSpec struct {
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
Trigger Trigger `json:"trigger"`
Template v1.PodTemplateSpec `json:"template"`
Trigger Trigger `json:"trigger"`
}

// EvalSpec describes the data an eval worker should have
type EvalSpec struct {
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
Template v1.PodTemplateSpec `json:"template"`
}

// DeploySpec describes the deploy model to be updated
type DeploySpec struct {
Model DeployModel `json:"model"`
Trigger Trigger `json:"trigger"`
NodeName string `json:"nodeName"`
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
HardExampleMining HardExampleMining `json:"hardExampleMining"`
Model DeployModel `json:"model"`
Trigger Trigger `json:"trigger"`
HardExampleMining HardExampleMining `json:"hardExampleMining"`
Template v1.PodTemplateSpec `json:"template"`
}

type Trigger struct {


+ 8
- 8
pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go View File

@@ -23,6 +23,8 @@ import (

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:resource:shortName=ji
// +kubebuilder:subresource:status

// JointInferenceService describes the data that a jointinferenceservice resource should have
type JointInferenceService struct {
@@ -42,17 +44,15 @@ type JointInferenceServiceSpec struct {

// EdgeWorker describes the data a edge worker should have
type EdgeWorker struct {
Model SmallModel `json:"model"`
NodeName string `json:"nodeName"`
HardExampleMining HardExampleMining `json:"hardExampleMining"`
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
Model SmallModel `json:"model"`
HardExampleMining HardExampleMining `json:"hardExampleMining"`
Template v1.PodTemplateSpec `json:"template"`
}

// CloudWorker describes the data a cloud worker should have
type CloudWorker struct {
Model BigModel `json:"model"`
NodeName string `json:"nodeName"`
WorkerSpec CommonWorkerSpec `json:"workerSpec"`
Model BigModel `json:"model"`
Template v1.PodTemplateSpec `json:"template"`
}

// SmallModel describes the small model
@@ -68,7 +68,7 @@ type BigModel struct {
// HardExampleMining describes the hard example algorithm to be used
type HardExampleMining struct {
Name string `json:"name"`
Parameters []ParaSpec `json:"parameters"`
Parameters []ParaSpec `json:"parameters,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object


+ 2
- 1
pkg/apis/sedna/v1alpha1/model_types.go View File

@@ -22,6 +22,7 @@ import (

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status

// Model describes the data that a model resource should have
type Model struct {
@@ -30,7 +31,7 @@ type Model struct {
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec ModelSpec `json:"spec"`
Status ModelStatus `json:"status"`
Status ModelStatus `json:"status,omitempty"`
}

// ModelSpec is a description of a model


+ 7
- 62
pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go View File

@@ -28,7 +28,7 @@ import (
func (in *AggregationWorker) DeepCopyInto(out *AggregationWorker) {
*out = *in
out.Model = in.Model
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.Template.DeepCopyInto(&out.Template)
return
}

@@ -42,23 +42,6 @@ func (in *AggregationWorker) DeepCopy() *AggregationWorker {
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AggregationWorkerSpec) DeepCopyInto(out *AggregationWorkerSpec) {
*out = *in
in.CommonWorkerSpec.DeepCopyInto(&out.CommonWorkerSpec)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AggregationWorkerSpec.
func (in *AggregationWorkerSpec) DeepCopy() *AggregationWorkerSpec {
if in == nil {
return nil
}
out := new(AggregationWorkerSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BigModel) DeepCopyInto(out *BigModel) {
*out = *in
@@ -79,7 +62,7 @@ func (in *BigModel) DeepCopy() *BigModel {
func (in *CloudWorker) DeepCopyInto(out *CloudWorker) {
*out = *in
out.Model = in.Model
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.Template.DeepCopyInto(&out.Template)
return
}

@@ -93,27 +76,6 @@ func (in *CloudWorker) DeepCopy() *CloudWorker {
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CommonWorkerSpec) DeepCopyInto(out *CommonWorkerSpec) {
*out = *in
if in.Parameters != nil {
in, out := &in.Parameters, &out.Parameters
*out = make([]ParaSpec, len(*in))
copy(*out, *in)
}
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CommonWorkerSpec.
func (in *CommonWorkerSpec) DeepCopy() *CommonWorkerSpec {
if in == nil {
return nil
}
out := new(CommonWorkerSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Condition) DeepCopyInto(out *Condition) {
*out = *in
@@ -248,8 +210,8 @@ func (in *DeploySpec) DeepCopyInto(out *DeploySpec) {
*out = *in
out.Model = in.Model
in.Trigger.DeepCopyInto(&out.Trigger)
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.HardExampleMining.DeepCopyInto(&out.HardExampleMining)
in.Template.DeepCopyInto(&out.Template)
return
}

@@ -268,7 +230,7 @@ func (in *EdgeWorker) DeepCopyInto(out *EdgeWorker) {
*out = *in
out.Model = in.Model
in.HardExampleMining.DeepCopyInto(&out.HardExampleMining)
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.Template.DeepCopyInto(&out.Template)
return
}

@@ -285,7 +247,7 @@ func (in *EdgeWorker) DeepCopy() *EdgeWorker {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EvalSpec) DeepCopyInto(out *EvalSpec) {
*out = *in
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.Template.DeepCopyInto(&out.Template)
return
}

@@ -915,7 +877,7 @@ func (in *Timer) DeepCopy() *Timer {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TrainSpec) DeepCopyInto(out *TrainSpec) {
*out = *in
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
in.Template.DeepCopyInto(&out.Template)
in.Trigger.DeepCopyInto(&out.Trigger)
return
}
@@ -933,8 +895,8 @@ func (in *TrainSpec) DeepCopy() *TrainSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TrainingWorker) DeepCopyInto(out *TrainingWorker) {
*out = *in
in.WorkerSpec.DeepCopyInto(&out.WorkerSpec)
out.Dataset = in.Dataset
in.Template.DeepCopyInto(&out.Template)
return
}

@@ -948,23 +910,6 @@ func (in *TrainingWorker) DeepCopy() *TrainingWorker {
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TrainingWorkerSpec) DeepCopyInto(out *TrainingWorkerSpec) {
*out = *in
in.CommonWorkerSpec.DeepCopyInto(&out.CommonWorkerSpec)
return
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrainingWorkerSpec.
func (in *TrainingWorkerSpec) DeepCopy() *TrainingWorkerSpec {
if in == nil {
return nil
}
out := new(TrainingWorkerSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Trigger) DeepCopyInto(out *Trigger) {
*out = *in


+ 60
- 17
pkg/globalmanager/common.go View File

@@ -30,6 +30,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
k8scontroller "k8s.io/kubernetes/pkg/controller"
)

const (
@@ -39,20 +40,19 @@ const (
MaxBackOff = 360 * time.Second
statusUpdateRetries = 3
// setting some prefix for container path, include data and code prefix
codePrefix = "/home/work"
dataPrefix = "/home/data"
bigModelPort int32 = 5000
)

// CreateVolumeMap creates volumeMap for container and
// returns volumeMounts and volumes for stage of creating pod
func CreateVolumeMap(containerPara *ContainerPara) ([]v1.VolumeMount, []v1.Volume) {
func CreateVolumeMap(workerPara *WorkerPara) ([]v1.VolumeMount, []v1.Volume) {
var volumeMounts []v1.VolumeMount
var volumes []v1.Volume
volumetype := v1.HostPathDirectory
mountPathMap := make(map[string]bool)
duplicateIdx := make(map[int]bool)
for i, v := range containerPara.volumeMountList {
for i, v := range workerPara.volumeMountList {
if mountPathMap[v] {
duplicateIdx[i] = true
continue
@@ -60,16 +60,16 @@ func CreateVolumeMap(containerPara *ContainerPara) ([]v1.VolumeMount, []v1.Volum
mountPathMap[v] = true
tempVolumeMount := v1.VolumeMount{
MountPath: v,
Name: containerPara.volumeMapName[i],
Name: workerPara.volumeMapName[i],
}
volumeMounts = append(volumeMounts, tempVolumeMount)
}
for i, v := range containerPara.volumeList {
for i, v := range workerPara.volumeList {
if duplicateIdx[i] {
continue
}
tempVolume := v1.Volume{
Name: containerPara.volumeMapName[i],
Name: workerPara.volumeMapName[i],
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: v,
@@ -96,17 +96,6 @@ func CreateEnvVars(envMap map[string]string) []v1.EnvVar {
return envVars
}

// MatchContainerBaseImage searches the base image
func MatchContainerBaseImage(imageHub map[string]string, frameName string, frameVersion string) (string, error) {
inputImageName := frameName + ":" + frameVersion
for imageName, imageURL := range imageHub {
if inputImageName == imageName {
return imageURL, nil
}
}
return "", fmt.Errorf("image %v not exists in imagehub", inputImageName)
}

// GetNodeIPByName get node ip by node name
func GetNodeIPByName(kubeClient kubernetes.Interface, name string) (string, error) {
n, err := kubeClient.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
@@ -223,3 +212,57 @@ func calcActivePodCount(pods []*v1.Pod) int32 {
}
return result
}

// injectWorkerPara modifies pod in-place
func injectWorkerPara(pod *v1.Pod, workerPara *WorkerPara, object CommonInterface) {
// inject our predefined volumes/envs
volumeMounts, volumes := CreateVolumeMap(workerPara)
envs := CreateEnvVars(workerPara.env)
pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
for idx := range pod.Spec.Containers {
pod.Spec.Containers[idx].Env = append(
pod.Spec.Containers[idx].Env, envs...,
)
pod.Spec.Containers[idx].VolumeMounts = append(
pod.Spec.Containers[idx].VolumeMounts, volumeMounts...,
)
}

// inject our labels
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
for k, v := range GenerateLabels(object) {
pod.Labels[k] = v
}

pod.GenerateName = object.GetName() + "-" + strings.ToLower(workerPara.workerType) + "-"

pod.Namespace = object.GetNamespace()

if workerPara.hostNetwork {
// FIXME
// force to set hostnetwork
pod.Spec.HostNetwork = true
}

if pod.Spec.RestartPolicy == "" {
pod.Spec.RestartPolicy = workerPara.restartPolicy
}
}

// createPodWithTemplate creates and returns a pod object given a crd object, pod template, and workerPara
func createPodWithTemplate(client kubernetes.Interface, object CommonInterface, spec *v1.PodTemplateSpec, workerPara *WorkerPara) (*v1.Pod, error) {
objectKind := object.GroupVersionKind()
pod, _ := k8scontroller.GetPodFromTemplate(spec, object, metav1.NewControllerRef(object, objectKind))
injectWorkerPara(pod, workerPara, object)

createdPod, err := client.CoreV1().Pods(object.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{})
objectName := object.GetNamespace() + "/" + object.GetName()
if err != nil {
klog.Warningf("failed to create pod(type=%s) for %s %s, err:%s", workerPara.workerType, objectKind, objectName, err)
return nil, err
}
klog.V(2).Infof("pod %s is created successfully for %s %s", createdPod.Name, objectKind, objectName)
return createdPod, nil
}

+ 0
- 3
pkg/globalmanager/config/config.go View File

@@ -47,9 +47,6 @@ type ControllerConfig struct {
// Namespace indicates which namespace the controller listening to.
// default ""
Namespace string `json:"namespace,omitempty"`
// ImageHub indicates the image which the framework/version mapping to
// +Required
ImageHub map[string]string `json:"imageHub,omitempty"`

// websocket server config
// Since the current limit of kubeedge(1.5), GM needs to build the websocket channel for communicating between GM and LCs.


+ 8
- 4
pkg/globalmanager/downstream.go View File

@@ -60,7 +60,8 @@ func (dc *DownstreamController) syncDataset(eventType watch.EventType, dataset *
// syncJointInferenceService syncs the joint-inference-service resources
func (dc *DownstreamController) syncJointInferenceService(eventType watch.EventType, joint *sednav1.JointInferenceService) error {
// Here only propagate to the nodes with non empty name
nodeName := joint.Spec.EdgeWorker.NodeName
// FIXME: only the case that Spec.NodeName specified is support
nodeName := joint.Spec.EdgeWorker.Template.Spec.NodeName
if len(nodeName) == 0 {
return fmt.Errorf("empty node name")
}
@@ -74,8 +75,8 @@ func (dc *DownstreamController) syncFederatedLearningJob(eventType watch.EventTy
nodeset := make(map[string]bool)
for _, trainingWorker := range job.Spec.TrainingWorkers {
// Here only propagate to the nodes with non empty name
if len(trainingWorker.NodeName) > 0 {
nodeset[trainingWorker.NodeName] = true
if len(trainingWorker.Template.Spec.NodeName) > 0 {
nodeset[trainingWorker.Template.Spec.NodeName] = true
}
}

@@ -108,7 +109,10 @@ func (dc *DownstreamController) syncModelWithName(nodeName, modelName, namespace
// syncIncrementalJob syncs the incremental learning jobs
func (dc *DownstreamController) syncIncrementalJob(eventType watch.EventType, job *sednav1.IncrementalLearningJob) error {
// Here only propagate to the nodes with non empty name
nodeName := job.Spec.NodeName

// FIXME(llhuii): only the case that all workers having the same nodeName are support,
// will support Spec.NodeSelector and differenect nodeName.
nodeName := job.Spec.TrainSpec.Template.Spec.NodeName
if len(nodeName) == 0 {
return fmt.Errorf("empty node name")
}


+ 33
- 92
pkg/globalmanager/federatedlearningjob.go View File

@@ -20,10 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/klog/v2"
"path/filepath"
"strconv"
"strings"
"time"

v1 "k8s.io/api/core/v1"
@@ -40,6 +38,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
k8scontroller "k8s.io/kubernetes/pkg/controller"

sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
@@ -52,11 +51,9 @@ import (
"github.com/kubeedge/sedna/pkg/globalmanager/utils"
)

type FLJobStage string

const (
FLJobStageAgg FLJobStage = "Aggregation"
FLJobStageTrain FLJobStage = "Training"
FLJobStageAgg = "Aggregation"
FLJobStageTrain = "Training"
)

// flJobControllerKind contains the schema.GroupVersionKind for this controller type.
@@ -423,44 +420,45 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act

// deliver pod for aggregation worker
aggWorker := job.Spec.AggregationWorker
aggCodePath := aggWorker.WorkerSpec.ScriptDir
parameterJSON, _ := json.Marshal(aggWorker.WorkerSpec.Parameters)
parameterString := string(parameterJSON)

// Container VolumeMounts parameters
aggCodeConPath := codePrefix
aggModelConPath := dataPrefix + modelPath

// Env parameters for agg
aggModelURL := aggModelConPath

// Configure container mounting and Env information by initial ContainerPara
// Configure container mounting and Env information by initial WorkerPara
var aggPort int32 = 7363
var aggContainer *ContainerPara = new(ContainerPara)
aggContainer.volumeMountList = []string{aggCodeConPath, aggModelConPath}
aggContainer.volumeList = []string{aggCodePath, modelPath}
aggContainer.volumeMapName = []string{"code", "model"}
aggContainer.env = map[string]string{
var aggWorkerPara *WorkerPara = new(WorkerPara)
aggWorkerPara.volumeMountList = []string{aggModelConPath}
aggWorkerPara.volumeList = []string{modelPath}
aggWorkerPara.volumeMapName = []string{"model"}
aggWorkerPara.env = map[string]string{
"MODEL": modelstring,
"WORKER_NAME": "aggworker-" + utilrand.String(5),
"JOB_NAME": job.Name,
"PARTICIPANTS_COUNT": participantsCount,
"PARAMETERS": parameterString,
"MODEL_URL": aggModelURL,
"NAMESPACE": job.Namespace,
"AGG_BIND_PORT": strconv.Itoa(int(aggPort)),
}
aggContainer.scriptBootFile = aggWorker.WorkerSpec.ScriptBootFile
aggContainer.nodeName = aggWorker.NodeName
aggContainer.frameName = aggWorker.WorkerSpec.FrameworkType
aggContainer.frameVersion = aggWorker.WorkerSpec.FrameworkVersion

aggWorkerPara.workerType = FLJobStageAgg
aggWorkerPara.restartPolicy = v1.RestartPolicyOnFailure

// create aggpod based on configured parameters
fc.generatedPod(job, FLJobStageAgg, aggContainer, &active, false)
_, err = createPodWithTemplate(fc.kubeClient, job, &aggWorker.Template, aggWorkerPara)
if err != nil {
return active, err
}
active++

var appIP string
var aggServicePort int32

appIP, err = GetNodeIPByName(fc.kubeClient, job.Spec.AggregationWorker.NodeName)
// FIXME(llhuii): only the case that Spec.NodeName specified is support,
// will support Spec.NodeSelector.
appIP, err = GetNodeIPByName(fc.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName)
aggServicePort, err = CreateKubernetesService(fc.kubeClient, job, aggPort, appIP)
if err != nil {
return active, err
@@ -468,8 +466,6 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act
// deliver pod for training worker
for _, trainingWorker := range job.Spec.TrainingWorkers {
// get dataseturl through parsing crd of dataset
parameterJSON, _ = json.Marshal(trainingWorker.WorkerSpec.Parameters)
parameterString = string(parameterJSON)
datasetName := trainingWorker.Dataset.Name
dataset, err := fc.client.Datasets(job.Namespace).Get(ctx, datasetName, metav1.GetOptions{})
datasetjson, _ := json.Marshal(dataset)
@@ -481,10 +477,7 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act
}
trainDatasetPath = dataset.Spec.URL
datasetParent := filepath.Dir(trainDatasetPath)
trainCodePath := trainingWorker.WorkerSpec.ScriptDir

// Container VolumeMounts parameters
trainCodeConPath := codePrefix
trainDataConPath := dataPrefix + datasetParent
trainModelConPath := dataPrefix + modelPath

@@ -492,12 +485,12 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act
trainDatasetURL := dataPrefix + trainDatasetPath
trainModelURL := trainModelConPath

// Configure container mounting and Env information by initial ContainerPara
var trainContainer *ContainerPara = new(ContainerPara)
trainContainer.volumeMountList = []string{trainCodeConPath, trainDataConPath, trainModelConPath}
trainContainer.volumeList = []string{trainCodePath, datasetParent, modelPath}
trainContainer.volumeMapName = []string{"code", "data", "model"}
trainContainer.env = map[string]string{
// Configure container mounting and Env information by initial WorkerPara
var workerPara *WorkerPara = new(WorkerPara)
workerPara.volumeMountList = []string{trainDataConPath, trainModelConPath}
workerPara.volumeList = []string{datasetParent, modelPath}
workerPara.volumeMapName = []string{"data", "model"}
workerPara.env = map[string]string{
"DATASET": datasetstring,
"AGG_PORT": strconv.Itoa(int(aggServicePort)),
"AGG_IP": appIP,
@@ -505,77 +498,25 @@ func (fc *FederatedController) createPod(job *sednav1.FederatedLearningJob) (act
"TRAIN_DATASET_URL": trainDatasetURL,
"WORKER_NAME": "trainworker-" + utilrand.String(5),
"JOB_NAME": job.Name,
"PARAMETERS": parameterString,
"PARTICIPANTS_COUNT": participantsCount,
"NAMESPACE": job.Namespace,
"MODEL_NAME": modelName,
"DATASET_NAME": datasetName,
"LC_SERVER": fc.cfg.LC.Server,
}
trainContainer.scriptBootFile = trainingWorker.WorkerSpec.ScriptBootFile
trainContainer.nodeName = trainingWorker.NodeName
trainContainer.frameName = trainingWorker.WorkerSpec.FrameworkType
trainContainer.frameVersion = trainingWorker.WorkerSpec.FrameworkVersion

// create trainpod based on configured parameters
err = fc.generatedPod(job, FLJobStageTrain, trainContainer, &active, true)
workerPara.workerType = "train"
workerPara.hostNetwork = true
workerPara.restartPolicy = v1.RestartPolicyOnFailure
// create train pod based on configured parameters
_, err = createPodWithTemplate(fc.kubeClient, job, &trainingWorker.Template, workerPara)
if err != nil {
return active, err
}
active++
}
return
}

func (fc *FederatedController) generatedPod(job *sednav1.FederatedLearningJob, podtype FLJobStage, containerPara *ContainerPara, active *int32, hostNetwork bool) error {
var volumeMounts []v1.VolumeMount
var volumes []v1.Volume
var envs []v1.EnvVar
ctx := context.Background()
command := []string{"python"}
// get baseImgURL from imageHub based on user's configuration in job CRD
baseImgURL, err := MatchContainerBaseImage(fc.cfg.ImageHub, containerPara.frameName, containerPara.frameVersion)
// TODO: if matched image is empty, the pod creation process will not proceed, return error directly.
if err != nil {
klog.Warningf("federatedlearning job %v/%v %v worker matching container base image occurs error:%v", job.Namespace, job.Name, podtype, err)
return fmt.Errorf("%s pod occurs error: %w",
podtype, err)
}
volumeMounts, volumes = CreateVolumeMap(containerPara)
envs = CreateEnvVars(containerPara.env)
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
GenerateName: job.Name + "-" + strings.ToLower(string(podtype)) + "-",
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, sednav1.SchemeGroupVersion.WithKind("FederatedLearningJob")),
},
Labels: GenerateLabels(job),
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
NodeName: containerPara.nodeName,
Containers: []v1.Container{
{Name: "container-" + job.Name + "-" + strings.ToLower(string(podtype)) + "-" + utilrand.String(5),
Image: baseImgURL,
Command: command,
Args: []string{containerPara.scriptBootFile},
Env: envs,
VolumeMounts: volumeMounts,
}},
Volumes: volumes,
HostNetwork: hostNetwork,
},
}
pod, err := fc.kubeClient.CoreV1().Pods(job.Namespace).Create(ctx, podSpec, metav1.CreateOptions{})
if err != nil {
klog.Warningf("failed to create %s pod %s for federatedlearning job %v/%v, err:%s", string(podtype), pod.Name, job.Namespace, job.Name, err)
return err
}
klog.V(2).Infof("%s pod %s is created successfully for federatedlearning job %v/%v", string(podtype), pod.Name, job.Namespace, job.Name)
*active++
return nil
}

func (fc *FederatedController) GetName() string {
return "FederatedLearningJobController"
}


+ 52
- 120
pkg/globalmanager/incrementallearningjob.go View File

@@ -18,7 +18,6 @@ package globalmanager

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"strings"
@@ -497,10 +496,7 @@ func IsIncrementalJobFinished(j *sednav1.IncrementalLearningJob) bool {

func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJob, podtype sednav1.ILJobStage) (err error) {
ctx := context.Background()
var workerName string
var workerSpec sednav1.CommonWorkerSpec
trainworkerspec := job.Spec.TrainSpec
evalworkerspec := job.Spec.EvalSpec
var podTemplate *v1.PodTemplateSpec

incrementalDatasetName := job.Spec.Dataset.Name
initialModelName := job.Spec.InitialModel.Name
@@ -534,18 +530,6 @@ func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJo

outputDir := job.Spec.OutputDir
datasetParent := filepath.Dir(datasetPath)
var trainCodePath string
var evalCodePath string
trainCodePath = trainworkerspec.WorkerSpec.ScriptDir
evalCodePath = evalworkerspec.WorkerSpec.ScriptDir

trainParameterJSON, _ := json.Marshal(trainworkerspec.WorkerSpec.Parameters)
evalParameterJSON, _ := json.Marshal(evalworkerspec.WorkerSpec.Parameters)
trainParameterString := string(trainParameterJSON)
evalParameterString := string(evalParameterJSON)

klog.V(2).Infof("incrementallearning job %v/%v train parameters:%s", job.Namespace, job.Name, trainParameterString)
klog.V(2).Infof("incrementallearning job %v/%v eval parameters:%s", job.Namespace, job.Name, evalParameterString)

// get all url for train and eval from data in condition
condDataStr := job.Status.Conditions[len(job.Status.Conditions)-1].Data
@@ -576,57 +560,65 @@ func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJo
dataURLContain = dataPrefix + dataURL

// Container VolumeMounts parameters
codeConPath := codePrefix
dataConPath := dataPrefix + datasetParent
basemodelConPath := dataPrefix + basemodelPath
deploymodelConPath := dataPrefix + deploymodelPath
outputConPath := dataPrefix + outputDir
var containerPara *ContainerPara = new(ContainerPara)
originalDatasetPathInContainer := dataPrefix + datasetPath
var workerPara *WorkerPara = new(WorkerPara)
if podtype == sednav1.ILJobTrain {
workerName = "Train"
workerSpec = trainworkerspec.WorkerSpec
workerPara.workerType = "Train"

podTemplate = &job.Spec.TrainSpec.Template
// Env parameters for train
preModelURL := inputmodelURLContain // premodel savepath before increase
outputModelURL := outputmodelURLContain // outputmodel savepath after increase, should be under outputdir
trainDataURL := dataURLContain

// Configure container mounting and Env information for train by initial ContainerPara
containerPara.volumeMountList = []string{codeConPath, dataConPath, basemodelConPath, deploymodelConPath, outputConPath}
containerPara.volumeList = []string{trainCodePath, datasetParent, basemodelPath, deploymodelPath, outputDir}
containerPara.volumeMapName = []string{"code", "data", "base-model", "deploy-model", "output-dir"}
containerPara.env = map[string]string{
"TRAIN_DATASET_URL": trainDataURL,
"MODEL_URL": outputModelURL,
"BASE_MODEL_URL": preModelURL,
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
"WORKER_NAME": "train-worker-" + utilrand.String(5),
"PARAMETERS": trainParameterString,
"LC_SERVER": jc.cfg.LC.Server,
// Configure container mounting and Env information for train by initial WorkerPara
workerPara.volumeMountList = []string{dataConPath, basemodelConPath, deploymodelConPath, outputConPath}
workerPara.volumeList = []string{datasetParent, basemodelPath, deploymodelPath, outputDir}
workerPara.volumeMapName = []string{"data", "base-model", "deploy-model", "output-dir"}
workerPara.env = map[string]string{
// see https://github.com/kubeedge/sedna/issues/35
"ORIGINAL_DATASET_URL": originalDatasetPathInContainer,
"TRAIN_DATASET_URL": trainDataURL,
"MODEL_URL": outputModelURL,
"BASE_MODEL_URL": preModelURL,
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
"WORKER_NAME": "train-worker-" + utilrand.String(5),
"LC_SERVER": jc.cfg.LC.Server,
}
} else {
workerName = "Eval"
workerSpec = evalworkerspec.WorkerSpec
podTemplate = &job.Spec.EvalSpec.Template
workerPara.workerType = "Eval"

// Env parameters for eval
evalDataURL := dataURLContain
modelForEval := inputmodelURLContain // can be single or multi models

// Configure container mounting and Env information for eval by initial ContainerPara
containerPara.volumeMountList = []string{codeConPath, dataConPath, basemodelConPath, deploymodelConPath, outputConPath}
containerPara.volumeList = []string{evalCodePath, datasetParent, basemodelPath, deploymodelPath, outputDir}
containerPara.volumeMapName = []string{"code", "data", "base-model", "deploy-model", "output-dir"}
containerPara.env = map[string]string{
"TEST_DATASET_URL": evalDataURL,
"MODEL_URLS": modelForEval,
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
"WORKER_NAME": "eval-worker-" + utilrand.String(5),
"PARAMETERS": evalParameterString,
"LC_SERVER": jc.cfg.LC.Server,
// Configure container mounting and Env information for eval by initial WorkerPara
workerPara.volumeMountList = []string{dataConPath, basemodelConPath, deploymodelConPath, outputConPath}
workerPara.volumeList = []string{datasetParent, basemodelPath, deploymodelPath, outputDir}
workerPara.volumeMapName = []string{"data", "base-model", "deploy-model", "output-dir"}
workerPara.env = map[string]string{
"ORIGINAL_DATASET_URL": originalDatasetPathInContainer,
"TEST_DATASET_URL": evalDataURL,
"MODEL_URLS": modelForEval,
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
"WORKER_NAME": "eval-worker-" + utilrand.String(5),
"LC_SERVER": jc.cfg.LC.Server,
}
}

// set the default policy instead of Always policy
workerPara.restartPolicy = v1.RestartPolicyOnFailure
workerPara.hostNetwork = true

// create pod based on podtype
err = jc.generatePod(job, workerSpec, workerName, containerPara)
_, err = createPodWithTemplate(jc.kubeClient, job, podTemplate, workerPara)
if err != nil {
return err
}
@@ -634,9 +626,8 @@ func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJo
}

func (jc *IncrementalJobController) createInferPod(job *sednav1.IncrementalLearningJob) error {
ctx := context.Background()
infermodelName := job.Spec.DeploySpec.Model.Name
inferModel, err := jc.client.Models(job.Namespace).Get(ctx, infermodelName, metav1.GetOptions{})
inferModel, err := jc.client.Models(job.Namespace).Get(context.TODO(), infermodelName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get infer model %s: %w",
infermodelName, err)
@@ -646,92 +637,33 @@ func (jc *IncrementalJobController) createInferPod(job *sednav1.IncrementalLearn
// convert crd to JSON, and put them into env of container
inferModelParent := filepath.Dir(inferModelPath)

inferWorkerSpec := job.Spec.DeploySpec.WorkerSpec
inferCodePath := inferWorkerSpec.ScriptDir
inferParameterJSON, _ := json.Marshal(inferWorkerSpec.Parameters)
inferParameterString := string(inferParameterJSON)

// Container VolumeMounts parameters
inferCodeConPath := codePrefix
inferModelConPath := dataPrefix + inferModelParent

// Env parameters for edge
inferModelURL := dataPrefix + inferModelPath

// Configure container mounting and Env information by initial ContainerPara
var inferContainer *ContainerPara = new(ContainerPara)
inferContainer.volumeMountList = []string{inferCodeConPath, inferModelConPath}
inferContainer.volumeList = []string{inferCodePath, inferModelParent}
inferContainer.volumeMapName = []string{"code", "model"}
inferContainer.env = map[string]string{
// Configure container mounting and Env information by initial WorkerPara
var workerParam *WorkerPara = new(WorkerPara)
workerParam.volumeMountList = []string{inferModelConPath}
workerParam.volumeList = []string{inferModelParent}
workerParam.volumeMapName = []string{"model"}
workerParam.env = map[string]string{
"WORKER_NAME": "inferworker-" + utilrand.String(5),
"PARAMETERS": inferParameterString,
"MODEL_URL": inferModelURL,
"NAMESPACE": job.Namespace,
"HARD_SAMPLE_ALGORITHM": job.Spec.DeploySpec.HardExampleMining.Name,
"LC_SERVER": jc.cfg.LC.Server,
}

workerParam.workerType = "inference"
workerParam.hostNetwork = true

// create edge pod
err = jc.generatePod(job, inferWorkerSpec, "inference", inferContainer)
_, err = createPodWithTemplate(jc.kubeClient, job, &job.Spec.DeploySpec.Template, workerParam)
return err
}

// generatePod forms a pod for train and eval for incrementaljob
func (jc *IncrementalJobController) generatePod(job *sednav1.IncrementalLearningJob, workerSpec sednav1.CommonWorkerSpec, workerType string, containerPara *ContainerPara) error {
var volumeMounts []v1.VolumeMount
var volumes []v1.Volume
var envs []v1.EnvVar
var nodeName string
if workerType == "inference" {
nodeName = job.Spec.DeploySpec.NodeName
} else {
nodeName = job.Spec.NodeName
}
ctx := context.Background()
// get baseImgURL from imageHub based on user's configuration in job CRD
frameName := workerSpec.FrameworkType
frameVersion := workerSpec.FrameworkVersion
baseImgURL, err := MatchContainerBaseImage(jc.cfg.ImageHub, frameName, frameVersion)
// TODO: if matched image is empty, the pod creation process will not proceed, return error directly.
if err != nil {
klog.Warningf("incrementallearning job %v/%v %v worker matching container base image occurs error:%v", job.Namespace, job.Name, workerType, err)
return err
}
volumeMounts, volumes = CreateVolumeMap(containerPara)
envs = CreateEnvVars(containerPara.env)
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: jc.generatePodName(job.Name, workerType),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, sednav1.SchemeGroupVersion.WithKind("IncrementalLearningJob")),
},
Labels: GenerateLabels(job),
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
HostNetwork: true,
NodeName: nodeName,
Containers: []v1.Container{
{Name: "container-" + job.Name + "-" + strings.ToLower(workerType) + "-" + utilrand.String(5),
Image: baseImgURL,
Args: []string{workerSpec.ScriptBootFile},
Env: envs,
VolumeMounts: volumeMounts,
}},
Volumes: volumes,
},
}
pod, err := jc.kubeClient.CoreV1().Pods(job.Namespace).Create(ctx, podSpec, metav1.CreateOptions{})
if err != nil {
klog.Warningf("failed to create %s pod %s for incrementallearning job %v/%v, err:%s", workerType, pod.Name, job.Namespace, job.Name, err)
return err
}
klog.V(2).Infof("%s pod %s is created successfully for incrementallearning job %v/%v", workerType, pod.Name, job.Namespace, job.Name)
return nil
}

// GetName returns the name of the incrementallearning job controller
func (jc *IncrementalJobController) GetName() string {
return "IncrementalLearningJobController"


+ 29
- 88
pkg/globalmanager/jointinferenceservice.go View File

@@ -22,7 +22,6 @@ import (
"fmt"
"path/filepath"
"strconv"
"strings"
"time"

v1 "k8s.io/api/core/v1"
@@ -52,11 +51,9 @@ import (
"github.com/kubeedge/sedna/pkg/globalmanager/utils"
)

type jointInferenceType string

const (
jointInferenceForEdge jointInferenceType = "Edge"
jointInferenceForCloud jointInferenceType = "Cloud"
jointInferenceForEdge = "Edge"
jointInferenceForCloud = "Cloud"
)

// jointServiceControllerKind contains the schema.GroupVersionKind for this controller type.
@@ -409,7 +406,9 @@ func (jc *JointInferenceServiceController) createPod(service *sednav1.JointInfer

// create kubernetesService for cloudPod, and get bigServicePort for edgePod
var bigServicePort int32
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.NodeName)
// FIXME(llhuii): only the case that Spec.NodeName specified is support,
// will support Spec.NodeSelector.
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.Template.Spec.NodeName)
bigServicePort, err = CreateKubernetesService(jc.kubeClient, service, bigModelPort, bigModelIP)
if err != nil {
return active, err
@@ -443,35 +442,33 @@ func (jc *JointInferenceServiceController) createCloudPod(service *sednav1.Joint
cloudModelString := string(cloudModelJSON)
cloudModelParent := filepath.Dir(cloudModelPath)

cloudWorker := service.Spec.CloudWorker
cloudCodePath := cloudWorker.WorkerSpec.ScriptDir
cloudParameterJSON, _ := json.Marshal(cloudWorker.WorkerSpec.Parameters)
cloudParameterString := string(cloudParameterJSON)

// Container VolumeMounts parameters
cloudCodeConPath := codePrefix
cloudModelConPath := dataPrefix + cloudModelParent

// Env parameters for cloud
cloudModelURL := dataPrefix + cloudModelPath

// Configure container mounting and Env information by initial ContainerPara
var cloudContainer *ContainerPara = new(ContainerPara)
cloudContainer.volumeMountList = []string{cloudCodeConPath, cloudModelConPath}
cloudContainer.volumeList = []string{cloudCodePath, cloudModelParent}
// Configure container mounting and Env information by initial WorkerPara
var cloudContainer *WorkerPara = new(WorkerPara)
cloudContainer.volumeMountList = []string{cloudModelConPath}
cloudContainer.volumeList = []string{cloudModelParent}
cloudContainer.volumeMapName = []string{"code", "model"}
cloudContainer.env = map[string]string{
"MODEL": cloudModelString,
"WORKER_NAME": "cloudworker-" + utilrand.String(5),
"SERVICE_NAME": service.Name,
"PARAMETERS": cloudParameterString,
"MODEL_URL": cloudModelURL,
"NAMESPACE": service.Namespace,
"BIG_MODEL_BIND_PORT": strconv.Itoa(int(bigModelPort)),
}

cloudContainer.workerType = jointInferenceForCloud

// create cloud pod
err = jc.generatedPod(service, jointInferenceForCloud, cloudContainer, false)
_, err = createPodWithTemplate(jc.kubeClient,
service,
&service.Spec.CloudWorker.Template,
cloudContainer)
if err != nil {
return err
}
@@ -489,8 +486,10 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI
}
edgeModelPath := edgeModel.Spec.URL

// FIXME(llhuii): only the case that Spec.NodeName specified is support,
// will support Spec.NodeSelector.
// get bigModelIP from nodeName in cloudWorker
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.NodeName)
bigModelIP, err := GetNodeIPByName(jc.kubeClient, service.Spec.CloudWorker.Template.Spec.NodeName)
if err != nil {
return fmt.Errorf("failed to get node ip: %w", err)
}
@@ -501,23 +500,19 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI
edgeModelParent := filepath.Dir(edgeModelPath)

edgeWorker := service.Spec.EdgeWorker
edgeCodePath := edgeWorker.WorkerSpec.ScriptDir
edgeParameterJSON, _ := json.Marshal(edgeWorker.WorkerSpec.Parameters)
edgeParameterString := string(edgeParameterJSON)
HEMParameterJSON, _ := json.Marshal(edgeWorker.HardExampleMining.Parameters)
HEMParameterString := string(HEMParameterJSON)

// Container VolumeMounts parameters
edgeCodeConPath := codePrefix
edgeModelConPath := dataPrefix + edgeModelParent

// Env parameters for edge
edgeModelURL := dataPrefix + edgeModelPath

// Configure container mounting and Env information by initial ContainerPara
var edgeContainer *ContainerPara = new(ContainerPara)
edgeContainer.volumeMountList = []string{edgeCodeConPath, edgeModelConPath}
edgeContainer.volumeList = []string{edgeCodePath, edgeModelParent}
// Configure container mounting and Env information by initial WorkerPara
var edgeContainer *WorkerPara = new(WorkerPara)
edgeContainer.volumeMountList = []string{edgeModelConPath}
edgeContainer.volumeList = []string{edgeModelParent}
edgeContainer.volumeMapName = []string{"code", "model"}
edgeContainer.env = map[string]string{
"MODEL": edgeModelString,
@@ -525,7 +520,6 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI
"SERVICE_NAME": service.Name,
"BIG_MODEL_IP": bigModelIP,
"BIG_MODEL_PORT": strconv.Itoa(int(bigServicePort)),
"PARAMETERS": edgeParameterString,
"HEM_PARAMETERS": HEMParameterString,
"MODEL_URL": edgeModelURL,
"NAMESPACE": service.Namespace,
@@ -533,70 +527,17 @@ func (jc *JointInferenceServiceController) createEdgePod(service *sednav1.JointI
"LC_SERVER": jc.cfg.LC.Server,
}

// create edge pod
err = jc.generatedPod(service, jointInferenceForEdge, edgeContainer, true)
if err != nil {
return err
}
return nil
}
edgeContainer.workerType = jointInferenceForEdge
edgeContainer.hostNetwork = true

func (jc *JointInferenceServiceController) generatedPod(service *sednav1.JointInferenceService, podtype jointInferenceType,
containerPara *ContainerPara, hostNetwork bool) error {
var workerSpec sednav1.CommonWorkerSpec
var volumeMounts []v1.VolumeMount
var volumes []v1.Volume
var envs []v1.EnvVar
var nodeName string
ctx := context.Background()
if podtype == jointInferenceForEdge {
workerSpec = service.Spec.EdgeWorker.WorkerSpec
nodeName = service.Spec.EdgeWorker.NodeName
} else {
workerSpec = service.Spec.CloudWorker.WorkerSpec
nodeName = service.Spec.CloudWorker.NodeName
}
// get baseImgURL from imageHub based on user's configuration in job CRD
frameName := workerSpec.FrameworkType
frameVersion := workerSpec.FrameworkVersion
baseImgURL, err := MatchContainerBaseImage(jc.cfg.ImageHub, frameName, frameVersion)
// TODO: if matched image is empty, the pod creation process will not proceed, return error directly.
if err != nil {
klog.Warningf("jointinference service %v/%v %v worker matching container base image occurs error:%v", service.Namespace, service.Name, podtype, err)
return fmt.Errorf("%s pod occurs error: %w",
podtype, err)
}
volumeMounts, volumes = CreateVolumeMap(containerPara)
envs = CreateEnvVars(containerPara.env)
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: service.Namespace,
GenerateName: service.Name + "-" + strings.ToLower(string(podtype)) + "-",
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(service, jointServiceControllerKind),
},
Labels: GenerateLabels(service),
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
NodeName: nodeName,
Containers: []v1.Container{
{Name: "container-" + service.Name + "-" + strings.ToLower(string(podtype)) + "-" + utilrand.String(5),
Image: baseImgURL,
Args: []string{workerSpec.ScriptBootFile},
Env: envs,
VolumeMounts: volumeMounts,
}},
Volumes: volumes,
HostNetwork: hostNetwork,
},
}
pod, err := jc.kubeClient.CoreV1().Pods(service.Namespace).Create(ctx, podSpec, metav1.CreateOptions{})
// create edge pod
_, err = createPodWithTemplate(jc.kubeClient,
service,
&service.Spec.EdgeWorker.Template,
edgeContainer)
if err != nil {
klog.Warningf("failed to create %s pod %s for jointinference service %v/%v, err:%s", string(podtype), pod.Name, service.Namespace, service.Name, err)
return err
}
klog.V(2).Infof("%s pod %s is created successfully for jointinference service %v/%v", string(podtype), pod.Name, service.Namespace, service.Name)
return nil
}



+ 10
- 6
pkg/globalmanager/types.go View File

@@ -19,26 +19,30 @@ package globalmanager
import (
"encoding/json"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// ContainerPara describes initial values need by creating a pod
type ContainerPara struct {
// WorkerPara describes the system-defined parameters of worker
type WorkerPara struct {
volumeMountList []string
volumeList []string
volumeMapName []string
env map[string]string
frameName string
frameVersion string
scriptBootFile string
nodeName string
workerType string
// if true, force to use hostNetwork
hostNetwork bool

restartPolicy v1.RestartPolicy
}

// CommonInterface describes the commom interface of CRs
type CommonInterface interface {
metav1.Object
schema.ObjectKind
runtime.Object
}

// FeatureControllerI defines the interface of an AI Feature controller


+ 1
- 2
pkg/localcontroller/gmclient/websocket.go View File

@@ -143,11 +143,10 @@ func (c *wsClient) sendMessage(stop chan struct{}) {
stop <- struct{}{}
}()

messageChannel := c.SendMessageChannel
ws := c.WSConnection.WSConn

for {
message, ok := <-messageChannel
message, ok := <-c.SendMessageChannel
if !ok {
return
}


+ 19
- 6
pkg/localcontroller/manager/incrementallearningjob.go View File

@@ -150,6 +150,8 @@ func (im *IncrementalJobManager) trainTask(job *IncrementalLearningJob) error {

err = im.Client.WriteMessage(payload, job.getHeader())
if err != nil {
klog.Errorf("job(name=%s) failed to write message: %v",
jobConfig.UniqueIdentifier, err)
return err
}

@@ -281,6 +283,7 @@ func (im *IncrementalJobManager) startJob(name string) {
defer klog.Infof("incremental learning job(name=%s) is stopped", name)
go im.handleData(job)

tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
for {
select {
case <-job.Done:
@@ -295,7 +298,15 @@ func (im *IncrementalJobManager) startJob(name string) {
klog.Warningf("job(name=%s) failed to load models, and waiting it: %v",
jobConfig.UniqueIdentifier,
err)
<-time.After(100 * time.Millisecond)
<-tick.C
continue
}

if job.Dataset == nil {
klog.V(3).Infof("job(name=%s) dataset not ready",
jobConfig.UniqueIdentifier)

<-tick.C
continue
}

@@ -316,7 +327,7 @@ func (im *IncrementalJobManager) startJob(name string) {
jobConfig.UniqueIdentifier, jobConfig.Phase, err)
}

<-time.After(JobIterationIntervalSeconds * time.Second)
<-tick.C
}
}

@@ -673,6 +684,7 @@ func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) {
tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)

jobConfig := job.JobConfig
iterCount := 0
for {
select {
case <-job.Done:
@@ -683,11 +695,15 @@ func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) {
// in case dataset is not synced to LC before job synced to LC
// here call loadDataset in each period
err := im.loadDataset(job)
if iterCount%100 == 0 {
klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier)
}
iterCount++
if err != nil {
klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v",
jobConfig.UniqueIdentifier,
err)
<-time.After(100 * time.Millisecond)
<-tick.C
continue
}

@@ -714,9 +730,6 @@ func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) {
jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))

jobConfig.DataSamples.Numbers = len(samples)
} else {
klog.Warningf("job(name=%s) didn't get new data from dataset(name=%s)",
jobConfig.UniqueIdentifier, job.Spec.Dataset.Name)
}
<-tick.C
}


Loading…
Cancel
Save