Browse Source

fix lifelong issue

- Reduce parameters for initial
- show all interfaces of lifelong learning in example
- fix bugs from deep learning framework

Signed-off-by: JoeyHwong <joeyhwong@gknow.cn>
tags/v0.3.1
JoeyHwong 4 years ago
parent
commit
becfe485df
13 changed files with 343 additions and 264 deletions
  1. +13
    -6
      examples/lifelong_learning/atcii/eval.py
  2. +23
    -9
      examples/lifelong_learning/atcii/inference.py
  3. +14
    -4
      examples/lifelong_learning/atcii/train.py
  4. +86
    -73
      lib/sedna/algorithms/multi_task_learning/multi_task_learning.py
  5. +1
    -0
      lib/sedna/algorithms/multi_task_learning/task_jobs/artifact.py
  6. +11
    -3
      lib/sedna/algorithms/multi_task_learning/task_jobs/task_remodeling.py
  7. +1
    -57
      lib/sedna/algorithms/unseen_task_detect/__init__.py
  8. +71
    -0
      lib/sedna/algorithms/unseen_task_detect/unseen_task_detect.py
  9. +5
    -3
      lib/sedna/backend/base.py
  10. +21
    -14
      lib/sedna/backend/tensorflow/__init__.py
  11. +0
    -15
      lib/sedna/common/constant.py
  12. +2
    -31
      lib/sedna/core/base.py
  13. +95
    -49
      lib/sedna/core/lifelong_learning/lifelong_learning.py

+ 13
- 6
examples/lifelong_learning/atcii/eval.py View File

@@ -15,7 +15,7 @@
import json

from sedna.datasources import CSVDataParse
from sedna.common.config import Context, BaseConfig
from sedna.common.config import BaseConfig
from sedna.core.lifelong_learning import LifelongLearning

from interface import DATACONF, Estimator, feature_process
@@ -26,17 +26,24 @@ def main():
valid_data = CSVDataParse(data_type="valid", func=feature_process)
valid_data.parse(test_dataset_url, label=DATACONF["LABEL"])
attribute = json.dumps({"attribute": DATACONF["ATTRIBUTES"]})
model_threshold = float(Context.get_parameters('model_threshold', 0))

task_definition = {
"method": "TaskDefinitionByDataAttr",
"param": attribute
}

ll_job = LifelongLearning(
estimator=Estimator,
task_definition="TaskDefinitionByDataAttr",
task_definition_param=attribute
task_definition=task_definition,
task_relationship_discovery=None,
task_mining=None,
task_remodeling=None,
inference_integrate=None,
unseen_task_detect=None
)
eval_experiment = ll_job.evaluate(
data=valid_data, metrics="precision_score",
metrics_param={"average": "micro"},
model_threshold=model_threshold
metrics_param={"average": "micro"}
)
return eval_experiment



+ 23
- 9
examples/lifelong_learning/atcii/inference.py View File

@@ -26,17 +26,29 @@ from interface import DATACONF, Estimator, feature_process

def main():

utd = Context.get_parameters("UTD_NAME", "TaskAttr")
utd = Context.get_parameters("UTD_NAME", "TaskAttrFilter")
attribute = json.dumps({"attribute": DATACONF["ATTRIBUTES"]})
utd_parameters = Context.get_parameters("UTD_PARAMETERS", {})
ut_saved_url = Context.get_parameters("UTD_SAVED_URL", "/tmp")

ll_job = LifelongLearning(
task_mining = {
"method": "TaskMiningByDataAttr",
"param": attribute
}

unseen_task_detect = {
"method": utd,
"param": utd_parameters
}

ll_service = LifelongLearning(
estimator=Estimator,
task_mining="TaskMiningByDataAttr",
task_mining_param=attribute,
unseen_task_detect=utd,
unseen_task_detect_param=utd_parameters)
task_mining=task_mining,
task_definition=None,
task_relationship_discovery=None,
task_remodeling=None,
inference_integrate=None,
unseen_task_detect=unseen_task_detect)

infer_dataset_url = Context.get_parameters('infer_dataset_url')
file_handle = open(infer_dataset_url, "r", encoding="utf-8")
@@ -60,12 +72,14 @@ def main():
rows = reader[0]
data = dict(zip(header, rows))
infer_data.parse(data, label=DATACONF["LABEL"])
rsl, is_unseen, target_task = ll_job.inference(infer_data)
rsl, is_unseen, target_task = ll_service.inference(infer_data)

rows.append(list(rsl)[0])

output = "\t".join(map(str, rows)) + "\n"
if is_unseen:
unseen_sample.write("\t".join(map(str, rows)) + "\n")
output_sample.write("\t".join(map(str, rows)) + "\n")
unseen_sample.write(output)
output_sample.write(output)
unseen_sample.close()
output_sample.close()



+ 14
- 4
examples/lifelong_learning/atcii/train.py View File

@@ -28,13 +28,23 @@ def main():
train_data.parse(train_dataset_url, label=DATACONF["LABEL"])
attribute = json.dumps({"attribute": DATACONF["ATTRIBUTES"]})
early_stopping_rounds = int(
Context.get_parameters(
"early_stopping_rounds", 100))
Context.get_parameters("early_stopping_rounds", 100)
)
metric_name = Context.get_parameters("metric_name", "mlogloss")

task_definition = {
"method": "TaskDefinitionByDataAttr",
"param": attribute
}

ll_job = LifelongLearning(
estimator=Estimator,
task_definition="TaskDefinitionByDataAttr",
task_definition_param=attribute
task_definition=task_definition,
task_relationship_discovery=None,
task_mining=None,
task_remodeling=None,
inference_integrate=None,
unseen_task_detect=None
)
train_experiment = ll_job.train(
train_data=train_data,


+ 86
- 73
lib/sedna/algorithms/multi_task_learning/multi_task_learning.py View File

@@ -12,15 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import json
import pandas as pd

from sedna.datasources import BaseDataSource
from sedna.backend import set_backend
from sedna.common.log import LOGGER
from sedna.common.file_ops import FileOps
from sedna.common.config import Context
from sedna.common.constant import KBResourceConstant
from sedna.common.file_ops import FileOps
from sedna.common.class_factory import ClassFactory, ClassType

from .task_jobs.artifact import Model, Task, TaskGroup
@@ -36,118 +36,113 @@ class MulTaskLearning:

def __init__(self,
estimator=None,
task_definition="TaskDefinitionByDataAttr",
task_definition=None,
task_relationship_discovery=None,
task_mining=None,
task_remodeling=None,
inference_integrate=None,
task_definition_param=None,
relationship_discovery_param=None,
task_mining_param=None,
task_remodeling_param=None,
inference_integrate_param=None
inference_integrate=None
):
if not task_relationship_discovery:
task_relationship_discovery = "DefaultTaskRelationDiscover"
if not task_remodeling:
task_remodeling = "DefaultTaskRemodeling"
if not inference_integrate:
inference_integrate = "DefaultInferenceIntegrate"
self.method_selection = dict(
task_definition=task_definition,
task_relationship_discovery=task_relationship_discovery,
task_mining=task_mining,
task_remodeling=task_remodeling,
inference_integrate=inference_integrate,
task_definition_param=task_definition_param,
task_relationship_discovery_param=relationship_discovery_param,
task_mining_param=task_mining_param,
task_remodeling_param=task_remodeling_param,
inference_integrate_param=inference_integrate_param)

self.task_definition = task_definition or {
"method": "TaskDefinitionByDataAttr"
}
self.task_relationship_discovery = task_relationship_discovery or {
"method": "DefaultTaskRelationDiscover"
}
self.task_mining = task_mining or {}
self.task_remodeling = task_remodeling or {
"method": "DefaultTaskRemodeling"
}
self.inference_integrate = inference_integrate or {
"method": "DefaultInferenceIntegrate"
}
self.models = None
self.extractor = None
self.base_model = estimator
self.task_groups = None
self.task_index_url = KBResourceConstant.KB_INDEX_NAME.value
self.min_train_sample = int(
Context.get_parameters(
"MIN_TRAIN_SAMPLE",
KBResourceConstant.MIN_TRAIN_SAMPLE.value
)
)
self.min_train_sample = int(Context.get_parameters(
"MIN_TRAIN_SAMPLE", KBResourceConstant.MIN_TRAIN_SAMPLE.value
))

@staticmethod
def parse_param(param_str):
if not param_str:
return {}
if isinstance(param_str, dict):
return param_str
try:
raw_dict = json.loads(param_str, encoding="utf-8")
except json.JSONDecodeError:
raw_dict = {}
return raw_dict

def task_definition(self, samples):
def _task_definition(self, samples):
"""
Task attribute extractor and multi-task definition
"""
method_name = self.method_selection.get(
"task_definition", "TaskDefinitionByDataAttr")
method_name = self.task_definition.get(
"method", "TaskDefinitionByDataAttr"
)
extend_param = self.parse_param(
self.method_selection.get("task_definition_param"))
self.task_definition.get("param")
)
method_cls = ClassFactory.get_cls(
ClassType.MTL, method_name)(**extend_param)
return method_cls(samples)

def task_relationship_discovery(self, tasks):
def _task_relationship_discovery(self, tasks):
"""
Merge tasks from task_definition
"""
method_name = self.method_selection.get("task_relationship_discovery")
method_name = self.task_relationship_discovery.get("method")
extend_param = self.parse_param(
self.method_selection.get("task_relationship_discovery_param")
self.task_relationship_discovery.get("param")
)
method_cls = ClassFactory.get_cls(
ClassType.MTL, method_name)(**extend_param)
return method_cls(tasks)

def task_mining(self, samples):
def _task_mining(self, samples):
"""
Mining tasks of inference sample base on task attribute extractor
"""
method_name = self.method_selection.get("task_mining")
method_name = self.task_mining.get("method")
extend_param = self.parse_param(
self.method_selection.get("task_mining_param"))
self.task_mining.get("param")
)

if not method_name:
task_definition = self.method_selection.get(
"task_definition", "TaskDefinitionByDataAttr")
task_definition = self.task_definition.get(
"method", "TaskDefinitionByDataAttr"
)
method_name = self._method_pair.get(task_definition,
'TaskMiningByDataAttr')
extend_param = self.parse_param(
self.method_selection.get("task_definition_param"))
self.task_definition.get("param"))
method_cls = ClassFactory.get_cls(ClassType.MTL, method_name)(
task_extractor=self.extractor, **extend_param
)
return method_cls(samples=samples)

def task_remodeling(self, samples, mappings):
def _task_remodeling(self, samples, mappings):
"""
Remodeling tasks from task mining
"""
method_name = self.method_selection.get("task_remodeling")
method_name = self.task_remodeling.get("method")
extend_param = self.parse_param(
self.method_selection.get("task_remodeling_param"))
self.task_remodeling.get("param"))
method_cls = ClassFactory.get_cls(ClassType.MTL, method_name)(
models=self.models, **extend_param)
return method_cls(samples=samples, mappings=mappings)

def inference_integrate(self, tasks):
def _inference_integrate(self, tasks):
"""
Aggregate inference results from target models
"""
method_name = self.method_selection.get("inference_integrate")
method_name = self.inference_integrate.get("method")
extend_param = self.parse_param(
self.method_selection.get("inference_integrate_param"))
self.inference_integrate.get("param"))
method_cls = ClassFactory.get_cls(ClassType.MTL, method_name)(
models=self.models, **extend_param)
return method_cls(tasks=tasks) if method_cls else tasks
@@ -155,12 +150,12 @@ class MulTaskLearning:
def train(self, train_data: BaseDataSource,
valid_data: BaseDataSource = None,
post_process=None, **kwargs):
tasks, task_extractor, train_data = self.task_definition(train_data)
tasks, task_extractor, train_data = self._task_definition(train_data)
self.extractor = task_extractor
task_groups = self.task_relationship_discovery(tasks)
task_groups = self._task_relationship_discovery(tasks)
self.models = []
callback = None
if post_process:
if isinstance(post_process, str):
callback = ClassFactory.get_cls(ClassType.CALLBACK, post_process)()
self.task_groups = []
feedback = {}
@@ -181,18 +176,31 @@ class MulTaskLearning:
continue
LOGGER.info(f"MTL Train start {i} : {task.entry}")

model_obj = set_backend(estimator=self.base_model)
res = model_obj.train(train_data=task.samples, **kwargs)
if callback:
res = callback(model_obj, res)
model_path = model_obj.save(model_name=f"{task.entry}.model")
model = Model(index=i, entry=task.entry,
model=model_path, result=res)
model.meta_attr = [t.meta_attr for t in task.tasks]
model = None
for t in task.tasks: # if model has train in tasks
if not (t.model and t.result):
continue
model_path = t.model.save(model_name=f"{task.entry}.model")
t.model = model_path
model = Model(index=i, entry=t.entry,
model=model_path, result=t.result)
model.meta_attr = t.meta_attr
break
if not model:
model_obj = set_backend(estimator=self.base_model)
res = model_obj.train(train_data=task.samples, **kwargs)
if callback:
res = callback(model_obj, res)
model_path = model_obj.save(model_name=f"{task.entry}.model")
model = Model(index=i, entry=task.entry,
model=model_path, result=res)

model.meta_attr = [t.meta_attr for t in task.tasks]
task.model = model
self.models.append(model)
feedback[task.entry] = res
feedback[task.entry] = model.result
self.task_groups.append(task)

if len(rare_task):
model_obj = set_backend(estimator=self.base_model)
res = model_obj.train(train_data=train_data, **kwargs)
@@ -240,11 +248,13 @@ class MulTaskLearning:

def predict(self, data: BaseDataSource,
post_process=None, **kwargs):

if not (self.models and self.extractor):
self.load()

data, mappings = self.task_mining(samples=data)
samples, models = self.task_remodeling(samples=data, mappings=mappings)
data, mappings = self._task_mining(samples=data)
samples, models = self._task_remodeling(samples=data,
mappings=mappings)

callback = None
if post_process:
@@ -255,17 +265,19 @@ class MulTaskLearning:
m = models[inx]
if not isinstance(m, Model):
continue
model_obj = set_backend(estimator=self.base_model)
evaluator = model_obj.load(m.model) if isinstance(
m.model, str) else m.model
pred = evaluator.predict(df.x, kwargs=kwargs)
if isinstance(m.model, str):
evaluator = set_backend(estimator=self.base_model)
evaluator.load(m.model)
else:
evaluator = m.model
pred = evaluator.predict(df.x, **kwargs)
if callable(callback):
pred = callback(pred, df)
task = Task(entry=m.entry, samples=df)
task.result = pred
task.model = m
tasks.append(task)
res = self.inference_integrate(tasks)
res = self._inference_integrate(tasks)
return res, tasks

def evaluate(self, data: BaseDataSource,
@@ -294,7 +306,7 @@ class MulTaskLearning:
m_dict = {
metrics: getattr(sk_metrics, metrics, sk_metrics.log_loss)
}
elif isinstance(metrics, dict): # if metrics with name
elif isinstance(metrics, dict): # if metrics with name
for k, v in metrics.items():
if isinstance(v, str):
v = getattr(sk_metrics, v)
@@ -307,8 +319,9 @@ class MulTaskLearning:
}
metrics_param = {"average": "micro"}

data.x['pred_y'] = result
data.x['real_y'] = data.y
if isinstance(data.x, pd.DataFrame):
data.x['pred_y'] = result
data.x['real_y'] = data.y
if not metrics_param:
metrics_param = {}
elif isinstance(metrics_param, str):


+ 1
- 0
lib/sedna/algorithms/multi_task_learning/task_jobs/artifact.py View File

@@ -22,6 +22,7 @@ class Task:
self.entry = entry
self.samples = samples
self.meta_attr = meta_attr
self.test_samples = None # assign on task definition and use in TRD
self.model = None # assign on running
self.result = None # assign on running



+ 11
- 3
lib/sedna/algorithms/multi_task_learning/task_jobs/task_remodeling.py View File

@@ -15,6 +15,7 @@
from typing import List

import numpy as np
import pandas as pd

from sedna.datasources import BaseDataSource
from sedna.common.class_factory import ClassFactory, ClassType
@@ -34,11 +35,18 @@ class DefaultTaskRemodeling:
for m in np.unique(mappings):
task_df = BaseDataSource(data_type=d_type)
_inx = np.where(mappings == m)
task_df.x = samples.x.iloc[_inx]
if isinstance(samples.x, pd.DataFrame):
task_df.x = samples.x.iloc[_inx]
else:
task_df.x = np.array(samples.x)[_inx]
if d_type != "test":
task_df.y = samples.y.iloc[_inx]
if isinstance(samples.x, pd.DataFrame):
task_df.y = samples.y.iloc[_inx]
else:
task_df.y = np.array(samples.y)[_inx]
task_df.inx = _inx[0].tolist()
task_df.meta_attr = samples.meta_attr.iloc[_inx].values
if samples.meta_attr is not None:
task_df.meta_attr = np.array(samples.meta_attr)[_inx]
data.append(task_df)
model = self.models[m] or self.models[0]
models.append(model)


+ 1
- 57
lib/sedna/algorithms/unseen_task_detect/__init__.py View File

@@ -12,60 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unseen Task detect Algorithms for Lifelong Learning"""

import abc
from typing import List

import numpy as np

from sedna.algorithms.multi_task_learning.task_jobs.artifact import Task
from sedna.common.class_factory import ClassFactory, ClassType

__all__ = ('ModelProbeFilter', 'TaskAttrFilter')


class BaseFilter(metaclass=abc.ABCMeta):
"""The base class to define unified interface."""

def __call__(self, task: Task = None):
"""predict function, and it must be implemented by
different methods class.

:param task: inference task
:return: `True` means unseen task, `False` means not an unseen task.
"""
raise NotImplementedError


@ClassFactory.register(ClassType.UTD, alias="ModelProbe")
class ModelProbeFilter(BaseFilter, abc.ABC):
def __init__(self):
pass

def __call__(self, tasks: List[Task] = None, threshold=0.5, **kwargs):
all_proba = []
for task in tasks:
sample = task.samples
model = task.model
if hasattr(model, "predict_proba"):
proba = model.predict_proba(sample)
all_proba.append(np.max(proba))
return np.mean(all_proba) > threshold if all_proba else True


@ClassFactory.register(ClassType.UTD, alias="TaskAttr")
class TaskAttrFilter(BaseFilter, abc.ABC):
def __init__(self):
pass

def __call__(self, tasks: List[Task] = None, **kwargs):
for task in tasks:
model_attr = list(map(list, task.model.meta_attr))
sample_attr = list(map(list, task.samples.meta_attr))

if not (model_attr and sample_attr):
continue
if list(model_attr) == list(sample_attr):
return False
return True
from .unseen_task_detect import ModelProbeFilter, TaskAttrFilter

+ 71
- 0
lib/sedna/algorithms/unseen_task_detect/unseen_task_detect.py View File

@@ -0,0 +1,71 @@
# Copyright 2021 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unseen task detection algorithms for Lifelong Learning"""

import abc
from typing import List

import numpy as np

from sedna.algorithms.multi_task_learning.task_jobs.artifact import Task
from sedna.common.class_factory import ClassFactory, ClassType

__all__ = ('ModelProbeFilter', 'TaskAttrFilter')


class BaseFilter(metaclass=abc.ABCMeta):
"""The base class to define unified interface."""

def __call__(self, task: Task = None):
"""predict function, and it must be implemented by
different methods class.

:param task: inference task
:return: `True` means unseen task, `False` means not an unseen task.
"""
raise NotImplementedError


@ClassFactory.register(ClassType.UTD)
class ModelProbeFilter(BaseFilter, abc.ABC):
def __init__(self):
pass

def __call__(self, tasks: List[Task] = None, threshold=0.5, **kwargs):
all_proba = []
for task in tasks:
sample = task.samples
model = task.model
if hasattr(model, "predict_proba"):
proba = model.predict_proba(sample)
all_proba.append(np.max(proba))
return np.mean(all_proba) > threshold if all_proba else True


@ClassFactory.register(ClassType.UTD)
class TaskAttrFilter(BaseFilter, abc.ABC):
def __init__(self):
pass

def __call__(self, tasks: List[Task] = None, **kwargs):
for task in tasks:
model_attr = list(map(list, task.model.meta_attr))
sample_attr = list(map(list, task.samples.meta_attr))

if not (model_attr and sample_attr):
continue
if list(model_attr) == list(sample_attr):
return False
return True

+ 5
- 3
lib/sedna/backend/base.py View File

@@ -35,7 +35,7 @@ class BackendBase:
if self.default_name:
return self.default_name
model_postfix = {"pytorch": ".pth",
"keras": ".h5", "tensorflow": ".pb"}
"keras": ".pb", "tensorflow": ".pb"}
continue_flag = "_finetune_" if self.fine_tune else ""
post_fix = model_postfix.get(self.framework, ".pkl")
return f"model{continue_flag}{self.framework}{post_fix}"
@@ -107,9 +107,11 @@ class BackendBase:
self.model_save_path, mname = os.path.split(self.model_save_path)
model_path = FileOps.join_path(self.model_save_path, mname)
if model_url:
FileOps.download(model_url, model_path)
model_path = FileOps.download(model_url, model_path)
self.has_load = True

if not (hasattr(self.estimator, "load")
and os.path.exists(model_path)):
return
return self.estimator.load(model_url=model_path)

def set_weights(self, weights):


+ 21
- 14
lib/sedna/backend/tensorflow/__init__.py View File

@@ -25,10 +25,12 @@ if hasattr(tf, "compat"):
# version 2.0 tf
ConfigProto = tf.compat.v1.ConfigProto
Session = tf.compat.v1.Session
reset_default_graph = tf.compat.v1.reset_default_graph
else:
# version 1
ConfigProto = tf.ConfigProto
Session = tf.Session
reset_default_graph = tf.reset_default_graph


class TFBackend(BackendBase):
@@ -64,24 +66,27 @@ class TFBackend(BackendBase):
self.estimator = self.estimator()
if self.fine_tune and FileOps.exists(self.model_save_path):
self.finetune()

self.has_load = True
varkw = self.parse_kwargs(self.estimator.train, **kwargs)
return self.estimator.train(
train_data=train_data,
valid_data=valid_data,
**kwargs
**varkw
)

def predict(self, data, **kwargs):
if not self.has_load:
tf.reset_default_graph()
self.sess = self.load()
return self.estimator.predict(data, **kwargs)
reset_default_graph()
self.load()
varkw = self.parse_kwargs(self.estimator.predict, **kwargs)
return self.estimator.predict(data=data, **varkw)

def evaluate(self, data, **kwargs):
if not self.has_load:
tf.reset_default_graph()
self.sess = self.load()
return self.estimator.evaluate(data, **kwargs)
reset_default_graph()
self.load()
varkw = self.parse_kwargs(self.estimator.evaluate, **kwargs)
return self.estimator.evaluate(data, **varkw)

def finetune(self):
"""todo: no support yet"""
@@ -99,23 +104,25 @@ class TFBackend(BackendBase):

def model_info(self, model, relpath=None, result=None):
ckpt = os.path.dirname(model)
_, _type = os.path.splitext(model)
if relpath:
_url = FileOps.remove_path_prefix(model, relpath)
ckpt_url = FileOps.remove_path_prefix(ckpt, relpath)
else:
_url = model
ckpt_url = ckpt
results = [
{
"format": "pb",
_type = _type.lstrip(".").lower()
results = [{
"format": _type,
"url": _url,
"metrics": result
}, {
}]
if _type == "pb": # report ckpt path when model save as pb file
results.append({
"format": "ckpt",
"url": ckpt_url,
"metrics": result
}
]
})
return results




+ 0
- 15
lib/sedna/common/constant.py View File

@@ -12,23 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from enum import Enum

LOG = logging.getLogger(__name__)


class ModelType(Enum):
GlobalModel = 1
PersonalizedModel = 2


class Framework(Enum):
Tensorflow = "tensorflow"
Keras = "keras"
Pytorch = "pytorch"
Mindspore = "mindspore"


class K8sResourceKind(Enum):
DEFAULT = "default"


+ 2
- 31
lib/sedna/core/base.py View File

@@ -27,35 +27,7 @@ from sedna.common.class_factory import ClassFactory, ClassType
__all__ = ('JobBase',)


class DistributedWorker:
""""Class of Distributed Worker use to manage all jobs"""
# original params
__worker_path__ = None
__worker_module__ = None
# id params
__worker_id__ = 0

def __init__(self):
DistributedWorker.__worker_id__ += 1
self._worker_id = DistributedWorker.__worker_id__
self.timeout = 0

@property
def worker_id(self):
"""Property: worker_id."""
return self._worker_id

@worker_id.setter
def worker_id(self, value):
"""Setter: set worker_id with value.

:param value: worker id
:type value: int
"""
self._worker_id = value


class JobBase(DistributedWorker):
class JobBase:
""" sedna feature base class """
parameters = Context

@@ -68,8 +40,7 @@ class JobBase(DistributedWorker):
self.estimator = set_backend(estimator=estimator, config=self.config)
self.job_kind = K8sResourceKind.DEFAULT.value
self.job_name = self.config.job_name or self.config.service_name
work_name = f"{self.job_name}-{self.worker_id}"
self.worker_name = self.config.worker_name or work_name
self.worker_name = self.config.worker_name or self.job_name

@property
def initial_hem(self):


+ 95
- 49
lib/sedna/core/lifelong_learning/lifelong_learning.py View File

@@ -18,8 +18,7 @@ import tempfile
from sedna.backend import set_backend
from sedna.core.base import JobBase
from sedna.common.file_ops import FileOps
from sedna.common.constant import K8sResourceKind
from sedna.common.constant import K8sResourceKindStatus
from sedna.common.constant import K8sResourceKind, K8sResourceKindStatus
from sedna.common.constant import KBResourceConstant
from sedna.common.config import Context
from sedna.common.class_factory import ClassType, ClassFactory
@@ -34,43 +33,39 @@ class LifelongLearning(JobBase):

def __init__(self,
estimator,
task_definition="TaskDefinitionByDataAttr",
task_definition=None,
task_relationship_discovery=None,
task_mining=None,
task_remodeling=None,
inference_integrate=None,
unseen_task_detect="TaskAttrFilter",

task_definition_param=None,
relationship_discovery_param=None,
task_mining_param=None,
task_remodeling_param=None,
inference_integrate_param=None,
unseen_task_detect_param=None):
unseen_task_detect=None):

if not task_definition:
task_definition = {
"method": "TaskDefinitionByDataAttr"
}
if not unseen_task_detect:
unseen_task_detect = {
"method": "TaskAttrFilter"
}
e = MulTaskLearning(
estimator=estimator,
task_definition=task_definition,
task_relationship_discovery=task_relationship_discovery,
task_mining=task_mining,
task_remodeling=task_remodeling,
inference_integrate=inference_integrate,
task_definition_param=task_definition_param,
relationship_discovery_param=relationship_discovery_param,
task_mining_param=task_mining_param,
task_remodeling_param=task_remodeling_param,
inference_integrate_param=inference_integrate_param)
self.unseen_task_detect = unseen_task_detect
inference_integrate=inference_integrate)
self.unseen_task_detect = unseen_task_detect.get("method",
"TaskAttrFilter")
self.unseen_task_detect_param = e.parse_param(
unseen_task_detect_param
unseen_task_detect.get("param", {})
)
config = dict(
ll_kb_server=Context.get_parameters("KB_SERVER"),
output_url=Context.get_parameters("OUTPUT_URL", "/tmp")
)
task_index = FileOps.join_path(
config['output_url'],
KBResourceConstant.KB_INDEX_NAME
)
task_index = FileOps.join_path(config['output_url'],
KBResourceConstant.KB_INDEX_NAME)
config['task_index'] = task_index
super(LifelongLearning, self).__init__(
estimator=e, config=config
@@ -100,30 +95,62 @@ class LifelongLearning(JobBase):
**kwargs
) # todo: Distinguishing incremental update and fully overwrite

task_groups = self.estimator.estimator.task_groups
extractor_file = FileOps.join_path(
os.path.dirname(self.estimator.estimator.task_index_url),
"kb_extractor.pkl"
)
try:
extractor_file = self.kb_server.upload_file(extractor_file)
except Exception as err:
self.log.error(
f"Upload task extractor_file fail {extractor_file}: {err}")
extractor_file = FileOps.load(extractor_file)
if isinstance(task_index_url, str) and FileOps.exists(task_index_url):
task_index = FileOps.load(task_index_url)
else:
task_index = task_index_url

extractor = task_index['extractor']
task_groups = task_index['task_groups']

model_upload_key = {}
for task in task_groups:
model_file = task.model.model
save_model = FileOps.join_path(
self.config.output_url,
os.path.basename(model_file)
)
if model_file not in model_upload_key:
model_upload_key[model_file] = FileOps.upload(model_file,
save_model)
model_file = model_upload_key[model_file]

try:
model = self.kb_server.upload_file(task.model.model)
except Exception:
model_obj = set_backend(
model = self.kb_server.upload_file(save_model)
except Exception as err:
self.log.error(
f"Upload task model of {model_file} fail: {err}"
)
model = set_backend(
estimator=self.estimator.estimator.base_model
)
model = model_obj.load(task.model.model)
model.load(model_file)
task.model.model = model

for _task in task.tasks:
sample_dir = FileOps.join_path(
self.config.output_url,
f"{_task.samples.data_type}_{_task.entry}.sample")
task.samples.save(sample_dir)
try:
sample_dir = self.kb_server.upload_file(sample_dir)
except Exception as err:
self.log.error(
f"Upload task samples of {_task.entry} fail: {err}")
_task.samples.data_url = sample_dir

save_extractor = FileOps.join_path(
self.config.output_url,
KBResourceConstant.TASK_EXTRACTOR_NAME
)
extractor = FileOps.dump(extractor, save_extractor)
try:
extractor = self.kb_server.upload_file(extractor)
except Exception as err:
self.log.error(f"Upload task extractor fail: {err}")
task_info = {
"task_groups": task_groups,
"extractor": extractor_file
"extractor": extractor
}
fd, name = tempfile.mkstemp()
FileOps.dump(task_info, name)
@@ -132,13 +159,10 @@ class LifelongLearning(JobBase):
if not index_file:
self.log.error(f"KB update Fail !")
index_file = name

FileOps.upload(index_file, self.config.task_index)
if os.path.isfile(name):
os.close(fd)
os.remove(name)

task_info_res = self.estimator.model_info(
self.config.task_index, result=res,
self.config.task_index,
relpath=self.config.data_path_prefix)
self.report_task_info(
None, K8sResourceKindStatus.COMPLETED.value, task_info_res)
@@ -155,7 +179,7 @@ class LifelongLearning(JobBase):
**kwargs
)

def evaluate(self, data, post_process=None, model_threshold=0.1, **kwargs):
def evaluate(self, data, post_process=None, **kwargs):
callback_func = None
if callable(post_process):
callback_func = post_process
@@ -170,14 +194,35 @@ class LifelongLearning(JobBase):
FileOps.download(task_index_url, index_url)
res, tasks_detail = self.estimator.evaluate(data=data, **kwargs)
drop_tasks = []

model_filter_operator = self.get_parameters("operator", ">")
model_threshold = float(self.get_parameters('model_threshold', 0.1))

operator_map = {
">": lambda x, y: x > y,
"<": lambda x, y: x < y,
"=": lambda x, y: x == y,
">=": lambda x, y: x >= y,
"<=": lambda x, y: x <= y,
}
if model_filter_operator not in operator_map:
self.log.warn(
f"operator {model_filter_operator} use to "
f"compare is not allow, set to <"
)
model_filter_operator = "<"
operator_func = operator_map[model_filter_operator]

for detail in tasks_detail:
scores = detail.scores
entry = detail.entry
self.log.info(f"{entry} socres: {scores}")
if any(map(lambda x: float(x) < model_threshold, scores.values())):
self.log.info(f"{entry} scores: {scores}")
if any(map(lambda x: operator_func(float(x),
model_threshold),
scores.values())):
self.log.warn(
f"{entry} will not be deploy "
f"because scores lt {model_threshold}")
f"{entry} will not be deploy because all "
f"scores {model_filter_operator} {model_threshold}")
drop_tasks.append(entry)
continue
drop_task = ",".join(drop_tasks)
@@ -199,6 +244,7 @@ class LifelongLearning(JobBase):
return callback_func(res) if callback_func else res

def inference(self, data=None, post_process=None, **kwargs):

task_index_url = self.get_parameters(
"MODEL_URLS", self.config.task_index)
index_url = self.estimator.estimator.task_index_url


Loading…
Cancel
Save