|
|
@@ -1,5 +1,5 @@ |
|
|
|
/** |
|
|
|
* Copyright 2020 Zhejiang Lab & The OneFlow Authors. All Rights Reserved. |
|
|
|
/** |
|
|
|
* Copyright 2020 Tianshu AI Platform. All Rights Reserved. |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
@@ -143,7 +143,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @param distributeTrain |
|
|
|
*/ |
|
|
|
public void doAction(DistributeTrain distributeTrain) { |
|
|
|
log.info("doAction=>distributeTrain : 【{}】", distributeTrain); |
|
|
|
log.info("doAction=>distributeTrain : 【{}】", distributeTrain.getMetadata().getName()); |
|
|
|
ChildResourceCreateInfo info = null; |
|
|
|
try { |
|
|
|
//redis重复检查 |
|
|
@@ -200,7 +200,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void handlerAction(DistributeTrain distributeTrain) { |
|
|
|
log.info("handlerAction=>distributeTrain : 【{}】", distributeTrain); |
|
|
|
log.info("handlerAction=>distributeTrain : 【{}】", distributeTrain.getMetadata().getName()); |
|
|
|
HandlerActionTask handlerActionTask = new HandlerActionTask(distributeTrain); |
|
|
|
pool.getActiveCount(); |
|
|
|
pool.execute(handlerActionTask); |
|
|
@@ -211,7 +211,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @param distributeTrain 分布式训练 |
|
|
|
*/ |
|
|
|
private void validateParams(DistributeTrain distributeTrain) { |
|
|
|
log.info("validateParams=>distributeTrain : 【{}】", distributeTrain); |
|
|
|
log.info("validateParams=>distributeTrain : 【{}】", distributeTrain.getMetadata().getName()); |
|
|
|
Integer size = distributeTrain.getSpec().getSize(); |
|
|
|
if (size < NUMBER_2) { |
|
|
|
throw new OperatorException("size must be greater than 1"); |
|
|
@@ -254,7 +254,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @param info 资源信息 |
|
|
|
*/ |
|
|
|
private void createStatefulSet(ChildResourceCreateInfo info) { |
|
|
|
log.info("createStatefulSet=>childResourceCreateInfo : 【{}】", info); |
|
|
|
log.info("createStatefulSet=>childResourceCreateInfo : 【{}】", info.getParentName()); |
|
|
|
StatefulSet statefulSet = client.apps().statefulSets() |
|
|
|
.inNamespace(info.getNamespace()) |
|
|
|
.withName(info.getStatefulSetName()).get(); |
|
|
@@ -298,7 +298,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @param info Job信息 |
|
|
|
*/ |
|
|
|
private void createJob(ChildResourceCreateInfo info) { |
|
|
|
log.info("createJob=>childResourceCreateInfo : 【{}】", info); |
|
|
|
log.info("createJob=>childResourceCreateInfo : 【{}】", info.getParentName()); |
|
|
|
Job job = client.batch().jobs() |
|
|
|
.inNamespace(info.getNamespace()) |
|
|
|
.withName(info.getJobName()).get(); |
|
|
@@ -311,7 +311,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
JobDeployer deployer = new BaseJobDeployer(); |
|
|
|
JobBuilder builder = deployer.deploy(info); |
|
|
|
job = builder.build(); |
|
|
|
log.info("job is : 【{}】", job); |
|
|
|
log.info("job is : 【{}】", job.getMetadata().getName()); |
|
|
|
client.batch().jobs().create(job); |
|
|
|
log.info("create job【{}】 successfully", job.getMetadata().getName()); |
|
|
|
} |
|
|
@@ -394,7 +394,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @param slavePods |
|
|
|
*/ |
|
|
|
private void collectChildPodInfo(ChildResourceCreateInfo info, Pod masterPod, List<Pod> slavePods) { |
|
|
|
log.info("collectChildPodInfo=>childResourceCreateInfo : 【{}】, masterPod : 【{}】, slavePods : 【{}】", info, masterPod, slavePods); |
|
|
|
log.info("collectChildPodInfo=>childResourceCreateInfo : 【{}】, masterPod : 【{}】", info.getParentName(), masterPod.getMetadata().getName()); |
|
|
|
String key = info.getOwnerReference().getUid(); |
|
|
|
if (dtMap.containsKey(key)) { |
|
|
|
dtMap.remove(key); |
|
|
@@ -527,7 +527,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @param info |
|
|
|
*/ |
|
|
|
private void recycleCr(ChildResourceCreateInfo info) { |
|
|
|
log.info("recycleCr=>childResourceCreateInfo : 【{}】", info); |
|
|
|
log.info("recycleCr=>childResourceCreateInfo : 【{}】", info.getParentName()); |
|
|
|
Optional.ofNullable(DistributeTrainClientHolder.getClient()) |
|
|
|
.ifPresent(distributeTrainClient -> { |
|
|
|
ObjectMeta metadata = new ObjectMeta(); |
|
|
@@ -542,7 +542,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
|
|
|
|
/**更新状态*/ |
|
|
|
private void updateStatus(ChildResourceCreateInfo info, DistributeTrain distributeTrain) { |
|
|
|
log.info("updateStatus=>childResourceCreateInfo : 【{}】, distributeTrain : 【{}】", info, distributeTrain); |
|
|
|
log.info("updateStatus=>childResourceCreateInfo : 【{}】, distributeTrain : 【{}】", info.getParentName(), distributeTrain.getMetadata().getName()); |
|
|
|
if (distributeTrain.getStatus() == null) { |
|
|
|
distributeTrain.setStatus(new DistributeTrainStatus()); |
|
|
|
} |
|
|
@@ -568,7 +568,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @return List<Pod> 分布式相关Pod集合 |
|
|
|
*/ |
|
|
|
private List<Pod> getPods(ChildResourceCreateInfo info) { |
|
|
|
log.info("getPods=>childResourceCreateInfo : 【{}】", info); |
|
|
|
log.info("getPods=>childResourceCreateInfo : 【{}】", info.getParentName()); |
|
|
|
List<Pod> pods = Lists.newArrayList(); |
|
|
|
pods.add(getMasterPod(info)); |
|
|
|
pods.addAll(getSlavePods(info)); |
|
|
@@ -584,7 +584,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @return Pod Master节点对应的Pod |
|
|
|
*/ |
|
|
|
private Pod getMasterPod(ChildResourceCreateInfo info) { |
|
|
|
log.info("getMasterPod=>childResourceCreateInfo : 【{}】", info); |
|
|
|
log.info("getMasterPod=>childResourceCreateInfo : 【{}】", info.getParentName()); |
|
|
|
List<Pod> masterPods = client.pods().inNamespace(info.getNamespace()) |
|
|
|
.withLabel(JOB_LABEL, info.getJobName()) |
|
|
|
.list().getItems(); |
|
|
@@ -600,7 +600,7 @@ public class AddActionHandler implements DistributeTrainActionHandler { |
|
|
|
* @return List<Pod> Slave节点对应的Pod集合 |
|
|
|
*/ |
|
|
|
private List<Pod> getSlavePods(ChildResourceCreateInfo info) { |
|
|
|
log.info("getSlavePods=>childResourceCreateInfo : 【{}】", info); |
|
|
|
log.info("getSlavePods=>childResourceCreateInfo : 【{}】", info.getParentName()); |
|
|
|
//取得从的所有pod |
|
|
|
List<Pod> slavePods = client.pods().inNamespace(info.getNamespace()) |
|
|
|
.withLabel(STATEFULSET_LABEL, info.getStatefulSetName()) |
|
|
|