Browse Source

update data process

tags/v0.3.0
之江实验室 4 years ago
parent
commit
916216f4ed
66 changed files with 2191 additions and 0 deletions
  1. +0
    -0
      dubhe_data_process/common/data/__init__.py
  2. +0
    -0
      dubhe_data_process/entrance/__init__.py
  3. +61
    -0
      dubhe_data_process/entrance/algorithm-annotation.py
  4. +69
    -0
      dubhe_data_process/entrance/algorithm-imagenet.py
  5. +55
    -0
      dubhe_data_process/entrance/algorithm-imgprocess.py
  6. +63
    -0
      dubhe_data_process/entrance/algorithm-lungsegmenatation.py
  7. +80
    -0
      dubhe_data_process/entrance/algorithm-ofrecord.py
  8. +58
    -0
      dubhe_data_process/entrance/algorithm-text-classification.py
  9. +66
    -0
      dubhe_data_process/entrance/algorithm-track.py
  10. +63
    -0
      dubhe_data_process/entrance/algorithm-videosample.py
  11. +0
    -0
      dubhe_data_process/entrance/executor/__init__.py
  12. +46
    -0
      dubhe_data_process/entrance/executor/annotation.py
  13. +246
    -0
      dubhe_data_process/entrance/executor/classify_by_textcnn.py
  14. +103
    -0
      dubhe_data_process/entrance/executor/imagenet.py
  15. +189
    -0
      dubhe_data_process/entrance/executor/imgprocess.py
  16. +175
    -0
      dubhe_data_process/entrance/executor/lungsegmentation.py
  17. +181
    -0
      dubhe_data_process/entrance/executor/ofrecord.py
  18. +283
    -0
      dubhe_data_process/entrance/executor/predict_with_print_box.py
  19. +108
    -0
      dubhe_data_process/entrance/executor/taskexecutor.py
  20. +45
    -0
      dubhe_data_process/entrance/executor/text_classification.py
  21. +94
    -0
      dubhe_data_process/entrance/executor/text_taskexecutor.py
  22. +93
    -0
      dubhe_data_process/entrance/executor/track.py
  23. +100
    -0
      dubhe_data_process/entrance/executor/videosample.py
  24. +0
    -0
      dubhe_data_process/entrance/log/dev/.gitkeep
  25. +1
    -0
      dubhe_data_process/of_model/imdb_word_index/imdb_word_index.json
  26. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/System-Train-TrainStep-train_job/out
  27. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-bias-m/out
  28. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-bias-v/out
  29. +3
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-bias/out
  30. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-weight-m/out
  31. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-weight-v/out
  32. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-weight/out
  33. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-bias-m/out
  34. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-bias-v/out
  35. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-bias/out
  36. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-weight-m/out
  37. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-weight-v/out
  38. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-weight/out
  39. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-bias-m/out
  40. +3
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-bias-v/out
  41. +2
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-bias/out
  42. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-weight-m/out
  43. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-weight-v/out
  44. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-weight/out
  45. +1
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-bias-m/out
  46. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-bias-v/out
  47. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-bias/out
  48. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-weight-m/out
  49. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-weight-v/out
  50. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-weight/out
  51. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-bias-m/out
  52. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-bias-v/out
  53. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-bias/out
  54. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-weight-m/out
  55. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-weight-v/out
  56. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-weight/out
  57. +1
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-bias-m/out
  58. +1
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-bias-v/out
  59. +1
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-bias/out
  60. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-weight-m/out
  61. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-weight-v/out
  62. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-weight/out
  63. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/embedding-weight-m/out
  64. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/embedding-weight-v/out
  65. BIN
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/embedding-weight/out
  66. +0
    -0
      dubhe_data_process/of_model/textcnn_imdb_of_best_model/snapshot_done

+ 0
- 0
dubhe_data_process/common/data/__init__.py View File


+ 0
- 0
dubhe_data_process/entrance/__init__.py View File


+ 61
- 0
dubhe_data_process/entrance/algorithm-annotation.py View File

@@ -0,0 +1,61 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8
import threading

import time
import sys
sys.path.append("../")
import common.RedisUtil as f
from common import config as config
from entrance.executor import annotation as annotation, taskexecutor
import luascript.starttaskscript as start_script
import logging
from common import select_gpu as gpu

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

if __name__ == '__main__':
"""Automatic annotation algorithm entry."""
gpu.select_gpu()
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=taskexecutor.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
annotation._init()
while 1:
try:
if config.loadJsonData(config.sign) == 0:
logging.info('not to execute new task')
time.sleep(1)
else:
logging.info('get one task')
element = redisClient.eval(start_script.startTaskLua, 1, config.queue,
config.annotationStartQueue, int(time.time()))
if len(element) > 0:
taskexecutor.annotationExecutor(redisClient, element[0]);
else:
logging.info('task queue is empty.')
time.sleep(1)
except Exception as e:
logging.error('except:', e)
time.sleep(1)

+ 69
- 0
dubhe_data_process/entrance/algorithm-imagenet.py View File

@@ -0,0 +1,69 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8

import json
import threading
import time
import sys
sys.path.append("../")
from entrance.executor import imagenet as imagenet
import common.RedisUtil as f
import common.config as config
import luascript.starttaskscript as start_script
import logging
import common.select_gpu as gpu

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

if __name__ == '__main__':
"""Imagenet algorithm entry."""
gpu.select_gpu()
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=imagenet.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
imagenet._init()
while 1:
try:
if config.loadJsonData(config.sign) == 0:
logging.info('not to execute new task')
time.sleep(1)
else:
logging.info('get one task')
element = redisClient.eval(start_script.startTaskLua, 1, config.imagenetTaskQueue,
config.imagenetStartQueue, int(time.time()))
if len(element) > 0:
key = element[0].decode()
jsonStr = f.getByKey(redisClient, key.replace('"', ''));
result = imagenet.process(jsonStr, element[0])
logging.info("result:", result)

logging.info('save result to redis')
f.pushToQueue(redisClient, config.imagenetFinishQueue, json.dumps(result))
redisClient.zrem(config.imagenetStartQueue, element[0])
else:
logging.info('task queue is empty.')
time.sleep(2)
except Exception as e:
logging.error('except:', e)
time.sleep(1)

+ 55
- 0
dubhe_data_process/entrance/algorithm-imgprocess.py View File

@@ -0,0 +1,55 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
import threading
import time
import sys
sys.path.append("../")
import common.RedisUtil as f
import luascript.starttaskscript as start_script
import common.config as config
import logging
from entrance.executor import imgprocess

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

if __name__ == '__main__':
"""Enhancement algorithm entry."""
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=imgprocess.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
while 1:
try:
if config.loadJsonData(config.sign) == 0:
logging.info('not to execute new task')
time.sleep(5)
else:
enhanceTaskId = redisClient.eval(start_script.startTaskLua, 1, config.imgProcessTaskQueue,
config.imgProcessStartQueue, int(time.time()))
if len(enhanceTaskId) > 0:
imgprocess.start_enhance_task(enhanceTaskId, redisClient)
else:
logging.info('task queue is empty.')
time.sleep(5)
except Exception as e:
logging.error('except:', e)
time.sleep(1)

+ 63
- 0
dubhe_data_process/entrance/algorithm-lungsegmenatation.py View File

@@ -0,0 +1,63 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
import json
import threading
from datetime import datetime
import time
import sys
sys.path.append("../")
import common.RedisUtil as f
import luascript.starttaskscript as start_script
import common.config as config
import logging
from entrance.executor import lungsegmentation as lungseg
import redis

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.DEBUG)

if __name__ == '__main__':
"""Lung segmentation algorithm based on CT image dcm entry."""
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=lungseg.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
while 1:
try:
# if config.loadJsonData(config.sign) == 0:
# logging.info('not to execute new task')
# time.sleep(5)
# else:
logging.info("read redis:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
lungTask = redisClient.eval(start_script.startTaskLua, 1, config.dcmTaskQueue, config.dcmStartQueue, int(time.time()))
if len(lungTask) > 0:
logging.info("start process.")
key = lungTask[0].decode()
jsonStr = f.getByKey(redisClient, key.replace('"', ''))
if lungseg.process(jsonStr, lungTask[0]):
f.pushToQueue(redisClient, config.dcmFinishQueue, key)
redisClient.zrem(config.dcmStartQueue, lungTask[0])
logging.info('success.')
else:
logging.info('task queue is empty.')
time.sleep(1)
except Exception as e:
logging.error('except:', e)
time.sleep(1)

+ 80
- 0
dubhe_data_process/entrance/algorithm-ofrecord.py View File

@@ -0,0 +1,80 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8

import os
import json
import threading
import time
import sys
sys.path.append("../")
import common.RedisUtil as f
import common.config as config
import luascript.starttaskscript as start_script
import logging
import traceback
from entrance.executor import ofrecord

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',level=logging.DEBUG)

basePath = '/nfs/'
descPath = 'ofrecord/train'

if __name__ == '__main__':
"""Ofrecord algorithm entry."""
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=ofrecord.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
while 1:
try:
if config.loadJsonData(config.sign) == 0:
logging.info('not to execute new task')
time.sleep(1)
else:
element = redisClient.eval(start_script.startTaskLua, 1, config.ofrecordTaskQueue,
config.ofrecordStartQueue, int(time.time()))
if len(element) > 0:
key = element[0].decode()
detail = f.getByKey(redisClient, key.replace('"', ''))
jsonStr = json.loads(detail.decode())
label_map = {}
index = 0
for item in jsonStr["datasetLabels"].keys():
if index >= 0 and item != '@type':
label_map[item] = jsonStr["datasetLabels"][item]
index += 1
ofrecord.execute(os.path.join(basePath, jsonStr["datasetPath"]),
os.path.join(basePath, jsonStr["datasetPath"], descPath),
label_map,
jsonStr["files"],
jsonStr["partNum"],
element[0])
logging.info('save result to redis')
f.pushToQueue(redisClient, config.ofrecordFinishQueue, key)
redisClient.zrem(config.ofrecordStartQueue, element[0])
else:
logging.info('task queue is empty.')
time.sleep(2)
except Exception as e:
logging.error('except:', e)
redisClient.zrem(config.ofrecordStartQueue, element[0])
time.sleep(1)

+ 58
- 0
dubhe_data_process/entrance/algorithm-text-classification.py View File

@@ -0,0 +1,58 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8
import sys
sys.path.append("../")
import threading
import time
import common.RedisUtil as f
import common.config as config
from entrance.executor import text_classification as classify, text_taskexecutor
import luascript.starttaskscript as start_script
import logging

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

if __name__ == '__main__':
"""Automatic text classification algorithm entry."""
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=text_taskexecutor.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
classify._init() #
while 1:
try:
if config.loadJsonData(config.sign) == 0:
logging.info('not to execute new task')
time.sleep(1)
else:
logging.info('get one task')
element = redisClient.eval(start_script.startTaskLua, 1, config.textClassificationQueue,
config.textClassificationStartQueue, int(time.time()))
if len(element) > 0:
text_taskexecutor.textClassificationExecutor(redisClient, element[0])
else:
logging.info('task queue is empty.')
time.sleep(1)
except Exception as e:
logging.error('except:', e)
time.sleep(1)

+ 66
- 0
dubhe_data_process/entrance/algorithm-track.py View File

@@ -0,0 +1,66 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8
import threading
import time
import sys
sys.path.append("../")
import common.RedisUtil as f
import common.config as config
import luascript.starttaskscript as start_script
import logging
from entrance.executor import track

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

if __name__ == '__main__':
"""Track algorithm entry."""
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=track.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
while 1:
try:
if config.loadJsonData(config.sign) == 0:
logging.info('not to execute new task')
time.sleep(1)
else:
logging.info('get one task')
element = redisClient.eval(start_script.startTaskLua, 1, config.trackTaskQueue,
config.trackStartQueue, int(time.time()))
if len(element) > 0:
key = element[0].decode()
jsonStr = f.getByKey(redisClient, key.replace('"', ''));
if track.trackProcess(jsonStr, element[0]):
f.pushToQueue(redisClient, config.trackFinishQueue, key)
redisClient.zrem(config.trackStartQueue, element[0])
logging.info('success')
else:
f.pushToQueue(redisClient, config.trackFailedQueue, key)
redisClient.zrem(config.trackStartQueue, element[0])
logging.info('failed')
else:
logging.info('task queue is empty.')
time.sleep(1)
except Exception as e:
logging.error('except:', e)
time.sleep(1)

+ 63
- 0
dubhe_data_process/entrance/algorithm-videosample.py View File

@@ -0,0 +1,63 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
import json
import threading
from datetime import datetime
import time
import sys
sys.path.append("../")
import common.RedisUtil as f
import luascript.starttaskscript as start_script
import common.config as config
import logging
from entrance.executor import videosample

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

if __name__ == '__main__':
"""VideoSample algorithm entry."""
jsonData = config.loadJsonData(config.configPath)
redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
logging.info('init redis client %s', redisClient)
t = threading.Thread(target=videosample.delayKeyThread, args=(redisClient,))
t.setDaemon(True)
t.start()
while 1:
try:
if config.loadJsonData(config.sign) == 0:
logging.info('not to execute new task')
time.sleep(5)
else:
logging.info("read redis:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
sampleTask = redisClient.eval(start_script.startTaskLua, 1, config.videoPendingQueue,
config.videoStartQueue, int(time.time()))
logging.info(int(time.time()))
if len(sampleTask) > 0:
datasetId = json.loads(sampleTask[0])['datasetIdKey']
taskParameters = json.loads(redisClient.get("videoSample:" + str(datasetId)))
path = taskParameters['path']
frameList = taskParameters['frames']
videosample.sampleProcess(datasetId, path, frameList, redisClient)
else:
logging.info('task queue is empty.')
time.sleep(5)
except Exception as e:
logging.error('except:', e)
time.sleep(1)

+ 0
- 0
dubhe_data_process/entrance/executor/__init__.py View File


+ 46
- 0
dubhe_data_process/entrance/executor/annotation.py View File

@@ -0,0 +1,46 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8
import time
import sys
sys.path.append("../../")
from entrance.executor import predict_with_print_box as yolo_demo
from common.log_config import setup_log


label_log = setup_log('dev', 'label.log')


def _init():
print('init yolo_obj')
global yolo_obj
yolo_obj = yolo_demo.YoloInference(label_log)


def _annotation(type_, image_path_list, id_list, annotation_url_list, label_list, coco_flag=0):
"""Perform automatic annotation task."""
image_num = len(image_path_list)
if image_num < 16:
for i in range(16 - image_num):
image_path_list.append(image_path_list[0])
id_list.append(id_list[0])
annotation_url_list.append(annotation_url_list[0])
image_num = len(image_path_list)
annotations = yolo_obj.yolo_inference(type_, id_list, annotation_url_list, image_path_list, label_list, coco_flag)
return annotations[0:image_num]

+ 246
- 0
dubhe_data_process/entrance/executor/classify_by_textcnn.py View File

@@ -0,0 +1,246 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
import json
import re
import six
import numpy as np
from typing import Tuple
# import requests # 在 nfs 没有挂载 时使用 url 访问
import sys
sys.path.append("../../")
import oneflow as flow
import oneflow.typing as tp

BATCH_SIZE = 16


class TextCNN:
def __init__(self, emb_sz, emb_dim, ksize_list, n_filters_list, n_classes, dropout):
self.initializer = flow.random_normal_initializer(stddev=0.1)
self.emb_sz = emb_sz
self.emb_dim = emb_dim
self.ksize_list = ksize_list
self.n_filters_list = n_filters_list
self.n_classes = n_classes
self.dropout = dropout
self.total_n_filters = sum(self.n_filters_list)
def get_logits(self, inputs, is_train):
emb_weight = flow.get_variable(
'embedding-weight',
shape=(self.emb_sz, self.emb_dim),
dtype=flow.float32,
trainable=is_train,
reuse=False,
initializer=self.initializer,
)
data = flow.gather(emb_weight, inputs, axis=0)
data = flow.transpose(data, [0, 2, 1]) # BLH -> BHL
data = flow.reshape(data, list(data.shape) + [1])
seq_length = data.shape[2]
pooled_list = []
for i in range(len(self.n_filters_list)):
ksz = self.ksize_list[i]
n_filters = self.n_filters_list[i]
conv = flow.layers.conv2d(data, n_filters, [ksz, 1], data_format="NCHW",
kernel_initializer=self.initializer, name='conv-{}'.format(i)) # NCHW
# conv = flow.layers.layer_norm(conv, name='ln-{}'.format(i))
conv = flow.nn.relu(conv)
pooled = flow.nn.max_pool2d(conv, [seq_length - ksz + 1, 1], strides=1, padding='VALID', data_format="NCHW")
pooled_list.append(pooled)
pooled = flow.concat(pooled_list, 3)
pooled = flow.reshape(pooled, [-1, self.total_n_filters])
if is_train:
pooled = flow.nn.dropout(pooled, rate=self.dropout)
pooled = flow.layers.dense(pooled, self.total_n_filters, use_bias=True,
kernel_initializer=self.initializer, name='dense-1')
pooled = flow.nn.relu(pooled)
logits = flow.layers.dense(pooled, self.n_classes, use_bias=True,
kernel_initializer=self.initializer, name='dense-2')
return logits


def get_eval_config():
config = flow.function_config()
config.default_data_type(flow.float)
return config


def pad_sequences(sequences, maxlen=None, dtype='int32',
padding='pre', truncating='pre', value=0.):
"""Pads sequences to the same length.

This function transforms a list of
`num_samples` sequences (lists of integers)
into a 2D Numpy array of shape `(num_samples, num_timesteps)`.
`num_timesteps` is either the `maxlen` argument if provided,
or the length of the longest sequence otherwise.

Sequences that are shorter than `num_timesteps`
are padded with `value` at the beginning or the end
if padding='post.

Sequences longer than `num_timesteps` are truncated
so that they fit the desired length.
The position where padding or truncation happens is determined by
the arguments `padding` and `truncating`, respectively.

Pre-padding is the default.

# Arguments
sequences: List of lists, where each element is a sequence.
maxlen: Int, maximum length of all sequences.
dtype: Type of the output sequences.
To pad sequences with variable length strings, you can use `object`.
padding: String, 'pre' or 'post':
pad either before or after each sequence.
truncating: String, 'pre' or 'post':
remove values from sequences larger than
`maxlen`, either at the beginning or at the end of the sequences.
value: Float or String, padding value.

# Returns
x: Numpy array with shape `(len(sequences), maxlen)`

# Raises
ValueError: In case of invalid values for `truncating` or `padding`,
or in case of invalid shape for a `sequences` entry.
"""
if not hasattr(sequences, '__len__'):
raise ValueError('`sequences` must be iterable.')
num_samples = len(sequences)
lengths = []
sample_shape = ()
flag = True
# take the sample shape from the first non empty sequence
# checking for consistency in the main loop below.
for x in sequences:
try:
lengths.append(len(x))
if flag and len(x):
sample_shape = np.asarray(x).shape[1:]
flag = False
except TypeError:
raise ValueError('`sequences` must be a list of iterables. '
'Found non-iterable: ' + str(x))
if maxlen is None:
maxlen = np.max(lengths)
is_dtype_str = np.issubdtype(dtype, np.str_) or np.issubdtype(dtype, np.unicode_)
if isinstance(value, six.string_types) and dtype != object and not is_dtype_str:
raise ValueError("`dtype` {} is not compatible with `value`'s type: {}\n"
"You should set `dtype=object` for variable length strings."
.format(dtype, type(value)))
x = np.full((num_samples, maxlen) + sample_shape, value, dtype=dtype)
for idx, s in enumerate(sequences):
if not len(s):
continue # empty list/array was found
if truncating == 'pre':
trunc = s[-maxlen:]
elif truncating == 'post':
trunc = s[:maxlen]
else:
raise ValueError('Truncating type "%s" '
'not understood' % truncating)
# check `trunc` has expected shape
trunc = np.asarray(trunc, dtype=dtype)
if trunc.shape[1:] != sample_shape:
raise ValueError('Shape of sample %s of sequence at position %s '
'is different from expected shape %s' %
(trunc.shape[1:], idx, sample_shape))
if padding == 'post':
x[idx, :len(trunc)] = trunc
elif padding == 'pre':
x[idx, -len(trunc):] = trunc
else:
raise ValueError('Padding type "%s" not understood' % padding)
return x


@flow.global_function('predict', get_eval_config())
def predict_job(text: tp.Numpy.Placeholder((BATCH_SIZE, 150), dtype=flow.int32),
) -> Tuple[tp.Numpy, tp.Numpy]:
with flow.scope.placement("gpu", "0:0"):
model = TextCNN(50000, 100, ksize_list=[2, 3, 4, 5], n_filters_list=[100] * 4, n_classes=2, dropout=0.5)
logits = model.get_logits(text, is_train=False)
logits = flow.nn.softmax(logits)
label = flow.math.argmax(logits)
return label, logits


class TextCNNClassifier:
def __init__(self):
model_load_dir = "../of_model/textcnn_imdb_of_best_model/"
word_index_dir = "../of_model/imdb_word_index/imdb_word_index.json"
checkpoint = flow.train.CheckPoint()
checkpoint.init()
checkpoint.load(model_load_dir)
with open(word_index_dir) as f:
word_index = json.load(f)
word_index = {k: (v + 2) for k, v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2
self.word_index = word_index
def inference(self, text_path_list, id_list, label_list):
print("infer")
classifications = []
batch_text = []
for i, text_path in enumerate(text_path_list):
text = open('/nfs/' + text_path, "r").read()
"""
# 在 nfs 没有挂载 时使用 url 访问 MinIO 进行测试
url = "http://10.5.29.100:9000/" + text_path
print(url)
text = requests.get(url).text # .encode('utf-8').decode('utf-8')
"""
text = re.sub("[^a-zA-Z']", " ", text)
text = list(map(lambda x: x.lower(), text.split()))
text.insert(0, "<START>")
batch_text.append(
list(map(lambda x: self.word_index[x] if x in self.word_index else self.word_index["<UNK>"], text))
)
if i % BATCH_SIZE == BATCH_SIZE - 1:
text = pad_sequences(batch_text, value=self.word_index["<PAD>"], padding='post', maxlen=150)
text = np.array(text, dtype=np.int32)
label, logits = predict_job(text)
label = label.tolist()
logits = logits.tolist()
for k in range(BATCH_SIZE):
classifications.append({
'id': id_list[i - BATCH_SIZE + 1 + k],
'annotation': json.dumps(
[{'category_id': label_list[label[k]], 'score': round(logits[k][label[k]], 4)}])
})
batch_text = []
return classifications

+ 103
- 0
dubhe_data_process/entrance/executor/imagenet.py View File

@@ -0,0 +1,103 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sched
import sys

sys.path.append("../../common")
import logging
import time
import json
import common.of_cnn_resnet as of_cnn_resnet
import numpy as np
import luascript.delaytaskscript as delay_script
import common.config as config
from datetime import datetime

schedule = sched.scheduler(time.time, time.sleep)

base_path = "/nfs/"
delayId = ""


def _init():
of_cnn_resnet.init_resnet()
logging.info('env init finished')


def process(task_dict, key):
"""Imagenet task method.
Args:
task_dict: imagenet task details.
key: imagenet task key.
"""
global delayId
delayId = "\"" + eval(str(key, encoding="utf-8")) + "\""
task_dict = json.loads(task_dict)
id_list = []
image_path_list = []
annotation_path_list = []
for file in task_dict["files"]:
id_list.append(file["id"])
image_path = base_path + file["url"]
image_path_list.append(image_path)
annotation_url = image_path.replace("origin/", "annotation/")
annotation_path_list.append(os.path.splitext(annotation_url)[0])
isExists = os.path.exists(os.path.dirname(annotation_url))
if not isExists:
os.makedirs(os.path.dirname(annotation_url))
label_list = task_dict["labels"]
image_num = len(image_path_list)
annotations = []
for inds in range(len(image_path_list)):
temp = {}
temp['id'] = id_list[inds]
score, ca_id = of_cnn_resnet.resnet_inf(image_path_list[inds])
temp['annotation'] = [{'category_id': int(ca_id), 'score': np.float(score)}]
temp['annotation'] = json.dumps(temp['annotation'])
annotations.append(temp)
with open(annotation_path_list[inds], 'w') as w:
w.write(temp['annotation'])
result = {"annotations": annotations, "task": key.decode()}
return result


def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
redisClient.eval(delay_script.delayTaskLua, 1, config.imagenetStartQueue, delayId, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)


def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 189
- 0
dubhe_data_process/entrance/executor/imgprocess.py View File

@@ -0,0 +1,189 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# !/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
import sched
import os
import cv2
import numpy as np
import logging
import time
import json
import argparse
import sys
import codecs
import shutil

import luascript.delaytaskscript as delay_script
import common.config as config

from common.augment_utils.ACE import ACE_color
from common.augment_utils.dehaze import deHaze, addHaze
from common.augment_utils.hist_equalize import adaptive_hist_equalize
from common.log_config import setup_log

schedule = sched.scheduler(time.time, time.sleep)

delayId = ""
finish_key = {}
re_task_id = {}
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

# task url suffix
img_pro_url = 'api/data/datasets/'

# arguments
parser = argparse.ArgumentParser(description="config for image augmentation server")
parser.add_argument("-m", "--mode", type=str, default="test", required=False)
args = parser.parse_args()

# url concat(ip + port + suffix)
url_json = '../common/config/url.json'
with open(url_json) as f:
url_dict = json.loads(f.read())
img_pro_url = url_dict[args.mode] + img_pro_url

# creat task quene
base_path = "/nfs/"

# create log path and file
des_folder = os.path.join('./log', args.mode)
if not os.path.exists(des_folder):
os.makedirs(des_folder)

logging = setup_log(args.mode, 'enhance-' + args.mode + '.log')
enhanceTaskId = ""


def start_enhance_task(enhanceTaskId, redisClient):
"""Enhance task method.
Args:
enhanceTaskId: enhance task id.
redisClient: redis client.
"""
global delayId
detailKey = 'imgProcess:' + eval(str(enhanceTaskId[0], encoding="utf-8"))
delayId = "\"" + eval(str(enhanceTaskId[0], encoding="utf-8")) + "\""
print(detailKey)
taskParameters = json.loads(redisClient.get(detailKey).decode())
dataset_id = taskParameters['id']
img_save_path = taskParameters['enhanceFilePath']
ann_save_path = taskParameters["enhanceAnnotationPath"]
file_list = taskParameters['fileDtos']
nums_, img_path_list, ann_path_list = img_ann_list_gen(file_list)
process_type = taskParameters['type']
re_task_id = eval(str(enhanceTaskId[0], encoding="utf-8"))
img_process_config = [dataset_id, img_save_path,
ann_save_path, img_path_list,
ann_path_list, process_type, re_task_id]
image_enhance_process(img_process_config, redisClient)
logging.info(str(nums_) + ' images for augment')


def img_ann_list_gen(file_list):
"""Analyze the json request and convert to list"""
nums_ = len(file_list)
img_list = []
ann_list = []
for i in range(nums_):
img_list.append(file_list[i]['filePath'])
ann_list.append(file_list[i]['annotationPath'])
return nums_, img_list, ann_list


def image_enhance_process(img_task, redisClient):
"""The implementation of image augmentation thread"""
global img_pro_url
global finish_key
global re_task_id
logging.info('img_process server start'.center(66, '-'))
logging.info(img_pro_url)
try:
dataset_id = img_task[0]
img_save_path = img_task[1]
ann_save_path = img_task[2]
img_list = img_task[3]
ann_list = img_task[4]
method = img_task[5]
re_task_id = img_task[6]
suffix = '_enchanced_' + re_task_id
logging.info("dataset_id " + str(dataset_id))

finish_key = {"processKey": re_task_id}
finish_data = {"id": re_task_id,
"suffix": suffix}
for j in range(len(ann_list)):
img_path = img_list[j]
ann_path = ann_list[j]
img_process(suffix, img_path, ann_path,
img_save_path, ann_save_path, method)

redisClient.set("imgProcess:finished:" + re_task_id, json.dumps(finish_data))
redisClient.zrem(config.imgProcessStartQueue, "\"" + re_task_id + "\"")
redisClient.lpush(config.imgProcessFinishQueue, json.dumps(finish_key, separators=(',', ':')))
logging.info('suffix:' + suffix)
logging.info("End img_process of dataset:" + str(dataset_id))

except Exception as e:
redisClient.lpush(config.imgProcessFailedQueue, json.dumps(finish_key, separators=(',', ':')))
redisClient.zrem(config.imgProcessStartQueue, "\"" + re_task_id + "\"")
logging.info(img_pro_url)
logging.error("Error imgProcess")
logging.error(e)
time.sleep(0.01)


def img_process(suffix, img_path, ann_path, img_save_path, ann_save_path, method_ind):
"""Process images and save in specified path"""
inds2method = {1: deHaze, 2: addHaze, 3: ACE_color, 4: adaptive_hist_equalize}
method = inds2method[method_ind]
img_raw = cv2.imdecode(np.fromfile(img_path.encode('utf-8'), dtype=np.uint8), 1)
img_suffix = os.path.splitext(img_path)[-1]
ann_name = os.path.basename(ann_path)
if method_ind <= 3:
processed_img = method(img_raw / 255.0) * 255
else:
processed_img = method(img_raw)
cv2.imwrite(img_save_path + "/" + ann_name + suffix + img_suffix,
processed_img.astype(np.uint8))
shutil.copyfile(ann_path.encode('utf-8'), (ann_path + suffix).encode('utf-8'))


def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
logging.info("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S") + ":" + delayId)
redisClient.eval(delay_script.delayTaskLua, 1, config.imgProcessStartQueue, delayId, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)


def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 175
- 0
dubhe_data_process/entrance/executor/lungsegmentation.py View File

@@ -0,0 +1,175 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sched
import sys

sys.path.append("../../")
import logging
import time
import json
import numpy as np
import luascript.delaytaskscript as delay_script
import common.config as config
from datetime import datetime
from skimage.morphology import disk, binary_erosion, binary_closing
from skimage.measure import label,regionprops, find_contours
from skimage.filters import roberts
from scipy import ndimage as ndi
from skimage.segmentation import clear_border
import pydicom as dicom
import os
import logging

schedule = sched.scheduler(time.time, time.sleep)

base_path = "/nfs/"
delayId = ""


def process(task_dict, key):
"""Lung segmentation based on dcm task method.
Args:
task_dict: imagenet task details.
key: imagenet task key.
"""
global delayId
delayId = "\"" + eval(str(key, encoding="utf-8")) + "\""
task_dict = json.loads(task_dict)
base_path = task_dict["annotationPath"]
if not os.path.exists(base_path):
logging.info("make annotation path.")
os.makedirs(base_path)
for dcm in task_dict["dcms"]:
image, image_path = preprocesss_dcm_image(dcm)
# segmentation and wirte coutours to result_path
result_path = os.path.join(base_path, image_path)
contour(segmentation(image), result_path)
logging.info("all dcms in one task are processed.")
return True

def preprocesss_dcm_image(path):
"""Load and preprocesss dcm image.
Args:
path: dcm file path.
"""
# result_path = os.path.basename(path).split(".", 1)[0] + ".json"
result_path = ".".join(os.path.basename(path).split(".")[0:-1]) + ".json"
dcm = dicom.dcmread(path)
image = dcm.pixel_array.astype(np.int16)
# Set outside-of-scan pixels to 0.
image[image == -2000] = 0

# Convert to Hounsfield units (HU)
intercept = dcm.RescaleIntercept
slope = dcm.RescaleSlope

if slope != 1:
image = slope * image.astype(np.float64)
image = image.astype(np.int16)

image += np.int16(intercept)
logging.info("preprocesss_dcm_image done.")
return np.array(image, dtype=np.int16), result_path

def segmentation(image):
"""Segments the lung from the given 2D slice.
Args:
image: single image in one dcm.
"""
# Step 1: Convert into a binary image.
binary = image < -350

# Step 2: Remove the blobs connected to the border of the image.
cleared = clear_border(binary)

# Step 3: Label the image.
label_image = label(cleared)

# Step 4: Keep the labels with 2 largest areas.
areas = [r.area for r in regionprops(label_image)]
areas.sort()
if len(areas) > 2:
for region in regionprops(label_image):
if region.area < areas[-2]:
for coordinates in region.coords:
label_image[coordinates[0], coordinates[1]] = 0
binary = label_image > 0

# Step 5: Erosion operation with a disk of radius 2. This operation is seperate the lung nodules attached to the blood vessels.
selem = disk(1)
binary = binary_erosion(binary, selem)
# Step 6: Closure operation with a disk of radius 10. This operation is to keep nodules attached to the lung wall.
selem = disk(16)
binary = binary_closing(binary, selem)

# Step 7: Fill in the small holes inside the binary mask of lungs.
for _ in range(3):
edges = roberts(binary)
binary = ndi.binary_fill_holes(edges)
logging.info("lung segmentation done.")
return binary

def contour(image, path):
"""Get contours of segmentation.
Args:
seg: segmentation of lung.
"""
result = []
contours = find_contours(image, 0.5)
if len(contours) > 2:
contours.sort(key = lambda x: int(x.shape[0]))
contours = contours[-2:]

for n, contour in enumerate(contours):
# result.append({"type":n, "annotation":contour.tolist()})
result.append({"type":n, "annotation":np.flip(contour, 1).tolist()})
# write json
with open(path, 'w') as f:
json.dump(result, f)
logging.info("write {} done.".format(path))


def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
redisClient.eval(delay_script.delayTaskLua, 1, config.dcmStartQueue, delayId, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)


def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 181
- 0
dubhe_data_process/entrance/executor/ofrecord.py View File

@@ -0,0 +1,181 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# -*- coding: utf-8 -*-

import logging
import json
import os
import struct
import cv2
import sched
import numpy as np
import oneflow.core.record.record_pb2 as of_record
import luascript.delaytaskscript as delay_script
import time
import common.config as config
from datetime import datetime

schedule = sched.scheduler(time.time, time.sleep)

delayId = ""

class ImageCoder(object):
"""Helper class that provides image coding utilities."""

def __init__(self, size=None):
self.size = size

def _resize(self, image_data):
if self.size is not None and image_data.shape[:2] != self.size:
return cv2.resize(image_data, self.size)
return image_data

def image_to_jpeg(self, image_data):
image_data = cv2.imdecode(np.frombuffer(image_data, np.uint8), 1)
image_data = self._resize(image_data)
return cv2.imencode(".jpg", image_data)[1].tobytes(
), image_data.shape[0], image_data.shape[1]


def _process_image(filename, coder):
"""Process a single image file.
Args:
filename: string, path to an image file e.g., '/path/to/example.JPG'.
coder: instance of ImageCoder to provide image coding utils.
Returns:
image_buffer: string, JPEG encoding of RGB image.
height: integer, image height in pixels.
width: integer, image width in pixels.
"""
# Read the image file.
with open(filename, 'rb') as f:
image_data = f.read()
image_data, height, width = coder.image_to_jpeg(image_data)

return image_data, height, width


def _bytes_feature(value):
"""Wrapper for inserting bytes features into Example proto."""
return of_record.Feature(bytes_list=of_record.BytesList(value=[value]))


def dense_to_one_hot(labels_dense, num_classes):
"""Convert class labels from scalars to one-hot vectors."""
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot


def extract_img_label(names, path):
"""Extract the images and labels into np array [index].
Args:
f: A file object that contain images and annotations.
Returns:
data: A 4D uint8 np array [index, h, w, depth].
labels: a 1D uint8 np array.
num_img: the number of images.
"""
train_img = os.path.join(path, 'origin/')
train_label = os.path.join(path, 'annotation/')
num_imgs = len(names)
data = []
labels = []
print('^^^^^^^^^^ start img_set for sycle')
for i in names:
name = os.path.splitext(i)[0]
print(name)
coder = ImageCoder((224, 224))
image_buffer, height, width = _process_image(
os.path.join(train_img, i), coder)

data += [image_buffer]

if os.path.exists(os.path.join(train_label, name)):

with open(os.path.join(train_label, name), "r", encoding='utf-8') as jsonFile:
la = json.load(jsonFile)
if la:
labels += [la[0]['category_id']]
else:
data.pop()
num_imgs -= 1
else:
print('File is not found')
print('^^^^^^^^^ img_set for end')
data = np.array(data)
labels = np.array(labels)
print(data.shape, labels.shape)
return num_imgs, data, labels


def execute(src_path, desc, label_map, files, part_id, key):
"""Execute ofrecord task method."""
global delayId
delayId = delayId = "\"" + eval(str(key, encoding="utf-8")) + "\""
logging.info(part_id)
num_imgs, images, labels = extract_img_label(files, src_path)
keys = sorted(list(map(int, label_map.keys())))
for i in range(len(keys)):
label_map[str(keys[i])] = i
if not num_imgs:
return False, 0, 0
try:
os.makedirs(desc)
except Exception as e:
print('{} exists.'.format(desc))
filename = 'part-{}'.format(part_id)
filename = os.path.join(desc, filename)
f = open(filename, 'wb')
print(filename)
for i in range(num_imgs):
img = images[i]
label = label_map[str(labels[i])]
sample = of_record.OFRecord(feature={
'class/label': of_record.Feature(int32_list=of_record.Int32List(value=[label])),
'encoded': _bytes_feature(img)
})
size = sample.ByteSize()
f.write(struct.pack("q", size))
f.write(sample.SerializeToString())
if f:
f.close()

def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
redisClient.eval(delay_script.delayTaskLua, 1, config.ofrecordStartQueue, delayId, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)

def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 283
- 0
dubhe_data_process/entrance/executor/predict_with_print_box.py View File

@@ -0,0 +1,283 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
import json
import time

import cv2
import numpy as np
import oneflow_yolov3
import sys
sys.path.append("../../")
from common.yolo_net import YoloPredictNet

import oneflow as flow


'''Init oneflow config'''
model_load_dir = "../of_model/yolov3_model_python/"
label_to_name_file = "../common/data/coco.names"
use_tensorrt = 0
gpu_num_per_node = 1
batch_size = 16
image_height = 608
image_width = 608
flow.config.load_library(oneflow_yolov3.lib_path())
func_config = flow.FunctionConfig()
func_config.default_distribute_strategy(flow.distribute.consistent_strategy())
func_config.default_data_type(flow.float)
if use_tensorrt != 0:
func_config.use_tensorrt(True)
label_2_name = []
with open(label_to_name_file, 'r') as f:
label_2_name = f.readlines()
nms = True
print("nms:", nms)
input_blob_def_dict = {
"images": flow.FixedTensorDef((batch_size, 3, image_height, image_width), dtype=flow.float),
"origin_image_info": flow.FixedTensorDef((batch_size, 2), dtype=flow.int32),
}


def xywh_2_x1y1x2y2(x, y, w, h, origin_image):
"""The format of box transform"""
x1 = (x - w / 2.) * origin_image[1]
x2 = (x + w / 2.) * origin_image[1]
y1 = (y - h / 2.) * origin_image[0]
y2 = (y + h / 2.) * origin_image[0]
return x1, y1, x2, y2


def batch_boxes(positions, probs, origin_image_info):
"""The images postprocessing"""
batch_size = positions.shape[0]
batch_list = []
if nms == True:
for k in range(batch_size):
box_list = []
for i in range(1, 81):
for j in range(positions.shape[2]):
if positions[k][i][j][2] != 0 and positions[k][i][j][3] != 0 and probs[k][i][j] != 0:
x1, y1, x2, y2 = xywh_2_x1y1x2y2(positions[k][i][j][0], positions[k][i][j][1],
positions[k][i][j][2], positions[k][i][j][3],
origin_image_info[k])
bbox = [i - 1, x1, y1, x2, y2, probs[k][i][j]]
box_list.append(bbox)
batch_list.append(np.asarray(box_list))
else:
for k in range(batch_size):
box_list = []
for j in range(positions.shape[1]):
for i in range(1, 81):
if positions[k][j][2] != 0 and positions[k][j][3] != 0 and probs[k][j][i] != 0:
x1, y1, x2, y2 = xywh_2_x1y1x2y2(positions[k][j][0], positions[k][j][1], positions[k][j][2],
positions[k][j][3], origin_image_info[k])
bbox = [i - 1, x1, y1, x2, y2, probs[k][j][i]]
box_list.append(bbox)
batch_list.append(np.asarray(box_list))
return batch_list


@flow.function(func_config)
def yolo_user_op_eval_job(images=input_blob_def_dict["images"],
origin_image_info=input_blob_def_dict["origin_image_info"]):
"""The model inference"""
yolo_pos_result, yolo_prob_result = YoloPredictNet(images, origin_image_info, trainable=False)
yolo_pos_result = flow.identity(yolo_pos_result, name="yolo_pos_result_end")
yolo_prob_result = flow.identity(yolo_prob_result, name="yolo_prob_result_end")
return yolo_pos_result, yolo_prob_result, origin_image_info


def yolo_show(image_path_list, batch_list):
"""Debug the result of Yolov3"""
font = cv2.FONT_HERSHEY_SIMPLEX
for img_path, batch in zip(image_path_list, batch_list):
result_list = batch.tolist()
img = cv2.imread(img_path)
for result in result_list:
cls = int(result[0])
bbox = result[1:-1]
score = result[-1]
print('img_file:', img_path)
print('cls:', cls)
print('bbox:', bbox)
c = ((int(bbox[0]) + int(bbox[2])) / 2, (int(bbox[1] + int(bbox[3])) / 2))
cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 255, 255), 1)
cv2.putText(img, str(cls), (int(c[0]), int(c[1])), font, 1, (0, 0, 255), 1)
result_name = img_path.split('/')[-1]
cv2.imwrite("data/results/" + result_name, img)


def resize_image(img, origin_h, origin_w, image_height, image_width):
"""The resize of image preprocessing"""
w = image_width
h = image_height
resized = np.zeros((3, image_height, image_width), dtype=np.float32)
part = np.zeros((3, origin_h, image_width), dtype=np.float32)
w_scale = (float)(origin_w - 1) / (w - 1)
h_scale = (float)(origin_h - 1) / (h - 1)

for c in range(w):
if c == w - 1 or origin_w == 1:
val = img[:, :, origin_w - 1]
else:
sx = c * w_scale
ix = int(sx)
dx = sx - ix
val = (1 - dx) * img[:, :, ix] + dx * img[:, :, ix + 1]
part[:, :, c] = val
for r in range(h):
sy = r * h_scale
iy = int(sy)
dy = sy - iy
val = (1 - dy) * part[:, iy, :]
resized[:, r, :] = val
if r == h - 1 or origin_h == 1:
continue
resized[:, r, :] = resized[:, r, :] + dy * part[:, iy + 1, :]
return resized


def batch_image_preprocess_v2(img_path_list, image_height, image_width):
"""The images preprocessing"""
result_list = []
origin_info_list = []
for img_path in img_path_list:
img = cv2.imread(img_path, cv2.IMREAD_COLOR)
img = img.transpose(2, 0, 1).astype(np.float32) # hwc->chw
img = img / 255 # /255
img[[0, 1, 2], :, :] = img[[2, 1, 0], :, :] # bgr2rgb

w = image_width
h = image_height
origin_h = img.shape[1]
origin_w = img.shape[2]
new_w = origin_w
new_h = origin_h
if w / origin_w < h / origin_h:
new_w = w
new_h = origin_h * w // origin_w
else:
new_h = h
new_w = origin_w * h // origin_h
resize_img = resize_image(img, origin_h, origin_w, new_h, new_w)

dw = (w - new_w) // 2
dh = (h - new_h) // 2

padh_before = int(dh)
padh_after = int(h - new_h - padh_before)
padw_before = int(dw)
padw_after = int(w - new_w - padw_before)
result = np.pad(resize_img, pad_width=((0, 0), (padh_before, padh_after), (padw_before, padw_after)),
mode='constant', constant_values=0.5)
origin_image_info = [origin_h, origin_w]
result_list.append(result)
origin_info_list.append(origin_image_info)
results = np.asarray(result_list).astype(np.float32)
origin_image_infos = np.asarray(origin_info_list).astype(np.int32)
return results, origin_image_infos


def coco_format(type_, id_list, annotation_url_list, file_list, result_list, label_list, coco_flag=0):
"""Transform the annotations to coco format"""
annotations = []
for i, result in enumerate(result_list):
temp = {}
annotation_url = annotation_url_list[i]
file_path = file_list[i]
temp['id'] = id_list[i]
temp['annotation'] = []
im = cv2.imread(file_path)
height, width, _ = im.shape
if result.shape[0] == 0:
temp['annotation'] = json.dumps(temp['annotation'])
annotations.append(temp)
with open(annotation_url, 'w') as w:
w.write(temp['annotation'])
continue
else:
for j in range(result.shape[0]):
cls_id = int(result[j][0]) + 1 + coco_flag
x1 = result[j][1]
x2 = result[j][3]
y1 = result[j][2]
y2 = result[j][4]
score = result[j][5]
width = max(0, x2 - x1)
height = max(0, y2 - y1)
if cls_id in label_list:
temp['annotation'].append({
'area': width * height,
'bbox': [x1, y1, width, height],
'category_id': cls_id,
'iscrowd': 0,
'segmentation': [[x1, y1, x2, y1, x2, y2, x1, y2]],
'score': score
})
if type_ == 2 and len(temp['annotation']) > 0:
temp['annotation'] = [temp['annotation'][0]]
temp['annotation'][0].pop('area')
temp['annotation'][0].pop('bbox')
temp['annotation'][0].pop('iscrowd')
temp['annotation'][0].pop('segmentation')
temp['annotation'] = json.dumps(temp['annotation'])
annotations.append(temp)
with open(annotation_url, 'w') as wr:
wr.write(temp['annotation'])
return annotations


class YoloInference(object):
"""Yolov3 detection inference"""

def __init__(self, label_log):
self.label_log = label_log
flow.config.gpu_device_num(gpu_num_per_node)
flow.env.ctrl_port(9789)

check_point = flow.train.CheckPoint()
if not model_load_dir:
check_point.init()
else:
check_point.load(model_load_dir)
print("Load check_point success")
self.label_log.info("Load check_point success")

def yolo_inference(self, type_, id_list, annotation_url_list, image_path_list, label_list, coco_flag=0):
annotations = []
try:
if len(image_path_list) == 16:
t0 = time.time()
images, origin_image_info = batch_image_preprocess_v2(image_path_list, image_height, image_width)
yolo_pos, yolo_prob, origin_image_info = yolo_user_op_eval_job(images, origin_image_info).get()
batch_list = batch_boxes(yolo_pos, yolo_prob, origin_image_info)
annotations = coco_format(type_, id_list, annotation_url_list, image_path_list, batch_list, label_list, coco_flag)
t1 = time.time()
print('t1-t0:', t1 - t0)
except:
print("Forward Error")
self.label_log.error("Forward Error")
for i, image_path in enumerate(image_path_list):
temp = {}
id_name = id_list[i]
temp['id'] = id_name
temp['annotation'] = []
temp['annotation'] = json.dumps(temp['annotation'])
annotations.append(temp)
return annotations

+ 108
- 0
dubhe_data_process/entrance/executor/taskexecutor.py View File

@@ -0,0 +1,108 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8

import codecs
import os
import sched
import sys
import json
import logging
import time
import common.RedisUtil as f
import common.config as config
from entrance.executor import annotation as annotation
from datetime import datetime
import luascript.delaytaskscript as delay_script

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

schedule = sched.scheduler(time.time, time.sleep)
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

delayId = ""


def annotationExecutor(redisClient, key):
"""Annotation task method.
Args:
redisClient: redis client.
key: annotation task key.
"""
global delayId
print('-------------process one-----------------')
try:
delayId = "\"" + eval(str(key, encoding="utf-8")) + "\""
logging.info('get element is {0}'.format(key))
key = key.decode()
jsonStr = f.getByKey(redisClient, key.replace('"', ''));
print(jsonStr)
jsonObject = json.loads(jsonStr.decode('utf-8'));
image_path_list = []
id_list = []
annotation_url_list = []
label_list = []
label_list = jsonObject['labels']
for fileObject in jsonObject['files']:
pic_url = '/nfs/' + fileObject['url']
image_path_list.append(pic_url)
annotation_url = pic_url.replace("origin/", "annotation/")
annotation_url_list.append(os.path.splitext(annotation_url)[0])
isExists = os.path.exists(os.path.dirname(annotation_url))
if not isExists:
os.makedirs(os.path.dirname(annotation_url))
id_list.append(fileObject['id'])
print(image_path_list)
print(annotation_url_list)
print(label_list)
coco_flag = 0
if "labelType" in jsonObject:
label_type = jsonObject['labelType']
if label_type == 3:
coco_flag = 80
annotations = annotation._annotation(0, image_path_list, id_list, annotation_url_list, label_list, coco_flag);
result = {"task": key, "annotations": annotations}
f.pushToQueue(redisClient, config.annotationFinishQueue, json.dumps(result))
redisClient.zrem(config.annotationStartQueue, key)
except Exception as e:
print(e)


def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
redisClient.eval(delay_script.delayTaskLua, 1, config.annotationStartQueue, delayId, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)


def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 45
- 0
dubhe_data_process/entrance/executor/text_classification.py View File

@@ -0,0 +1,45 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
from entrance.executor import classify_by_textcnn as classify


def _init():
print('init classify_obj')
global classify_obj
classify_obj = classify.TextCNNClassifier() # label_log


def _classification(text_path_list, id_list, label_list):
"""Perform automatic text classification task."""
textnum = len(text_path_list)
batched_num = ((textnum - 1) // classify.BATCH_SIZE + 1) * classify.BATCH_SIZE
for i in range(batched_num - textnum):
text_path_list.append(text_path_list[0])
id_list.append(id_list[0])
annotations = classify_obj.inference(text_path_list, id_list, label_list) #
return annotations[0:textnum]


if __name__ == "__main__":
test_len = 22
_init()
ans = _classification(["dubhe-dev/dataset/2738/origin/32_3_ts1607326726114630.txt"] * test_len, [1] * test_len,
[111, 112])
print(ans)
print(len(ans))

+ 94
- 0
dubhe_data_process/entrance/executor/text_taskexecutor.py View File

@@ -0,0 +1,94 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# coding:utf-8

import codecs
import sched
import sys
import json
import logging
import time
import common.RedisUtil as f
import common.config as config
from entrance.executor import text_classification as text_classification
from datetime import datetime
import luascript.delaytaskscript as delay_script

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)

schedule = sched.scheduler(time.time, time.sleep)
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

delayId = ""


def textClassificationExecutor(redisClient, key):
"""Annotation task method.
Args:
redisClient: redis client.
key: annotation task key.
"""
global delayId
print('-------------process one-----------------')
try:
delayId = "\"" + eval(str(key, encoding="utf-8")) + "\""
logging.info('get element is {0}'.format(key))
key = key.decode()
jsonStr = f.getByKey(redisClient, key.replace('"', ''))
print(jsonStr)
jsonObject = json.loads(jsonStr.decode('utf-8'))
text_path_list = []
id_list = []
label_list = jsonObject['labels']
for fileObject in jsonObject['files']:
text_path_list.append(fileObject['url'])
id_list.append(fileObject['id'])
print(text_path_list)
print(id_list)
print(label_list)
classifications = text_classification._classification(text_path_list, id_list, label_list) # --------------
result = {"task": key, "classifications": classifications} # --------------
f.pushToQueue(redisClient, config.textClassificationFinishQueue, json.dumps(result))
redisClient.zrem(config.textClassificationStartQueue, key)
except Exception as e:
print(e)


def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
redisClient.eval(delay_script.delayTaskLua, 1, config.textClassificationStartQueue, delayId, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)


def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 93
- 0
dubhe_data_process/entrance/executor/track.py View File

@@ -0,0 +1,93 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
# -*- coding: utf-8 -*-
import sched

import common.config as config
import luascript.delaytaskscript as delay_script
from track_only.hog_track import *

schedule = sched.scheduler(time.time, time.sleep)

delayId = ""


def trackProcess(task, key):
"""Track task method.
Args:
task: dataset id.
key: video file path.
Returns:
True: track success
False: track failed
"""
global delayId
delayId = "\"" + eval(str(key, encoding="utf-8")) + "\""
task = json.loads(task.decode('utf-8'))
image_list = []
label_list = []
images_data = task['images']
path = task['path']

for file in images_data:
filePath = path + "/origin/" + file
annotationPath = path + "/annotation/" + file.split('.')[0]
if not os.path.exists(filePath):
continue
if not os.path.exists(annotationPath):
continue
image_list.append(filePath)
label_list.append(annotationPath)
image_num = len(label_list)
track_det = Detector(
'xxx.avi',
min_confidence=0.35,
max_cosine_distance=0.2,
max_iou_distance=0.7,
max_age=30,
out_dir='results/')
track_det.write_img = False
RET = track_det.run_track(image_list, label_list)
if RET == 'OK':
return True
else:
return False


def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
redisClient.eval(delay_script.delayTaskLua, 1, config.trackStartQueue, delayId, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)


def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 100
- 0
dubhe_data_process/entrance/executor/videosample.py View File

@@ -0,0 +1,100 @@
"""
/**
* Copyright 2020 Tianshu AI Platform. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =============================================================
*/
"""
import json
import os
import sched
import time
from datetime import datetime

import luascript.finishtaskscript as finish_script
import luascript.failedtaskscript as failed_script
import luascript.delaytaskscript as delay_script
import common.config as config

import cv2

schedule = sched.scheduler(time.time, time.sleep)

datasetIdKey = ""


def sampleProcess(datasetId, path, frameList, redisClient):
"""Video sampling method.
Args:
datasetId: dataset id.
path: video file path.
frameList: picture frame number list.
redisClient: redis client.
"""
global datasetIdKey
datasetIdJson = {'datasetIdKey': datasetId}
datasetIdKey = json.dumps(datasetIdJson, separators=(',', ':'))
try:
videoName = path.split('/')[-1]
save_path = path.split(videoName)[0].replace("video", "origin")
is_exists = os.path.exists(save_path)
if not is_exists:
os.makedirs(save_path)
print('path of %s is build' % save_path)
else:
print('path of %s already exist and start' % save_path)
cap = cv2.VideoCapture(path)
for i in frameList:
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
success, video_capture = cap.read()
# 保存图片
if success is True and video_capture is not None:
save_name = save_path + videoName.split('.')[0] + '_' + str(i) + '.jpg'
cv2.imwrite(save_name, video_capture)
redisClient.lpush("videoSample_pictures:" + datasetId,
'{' + '\"pictureName\":' + "\"" + save_name + "\"" + '}')
print('image of %s is saved' % save_name)
print('video is all read')
redisClient.eval(finish_script.finishTaskLua, 3, config.videoStartQueue, config.videoFinishQueue,
"videoSample:" + str(datasetId),
datasetIdKey, str(datasetIdKey))
except Exception as e:
print(e)
redisClient.eval(failed_script.failedTaskLua, 4, config.videoStartQueue, config.videoFailedQueue,
"videoSample_pictures:" + datasetId,
"videoSample:" + str(datasetId),
datasetIdKey, str(datasetIdKey))


def delaySchduled(inc, redisClient):
"""Delay task method.
Args:
inc: scheduled task time.
redisClient: redis client.
"""
try:
print("delay:" + datetime.now().strftime("B%Y-%m-%d %H:%M:%S"))
redisClient.eval(delay_script.delayTaskLua, 1, config.videoStartQueue, datasetIdKey, int(time.time()))
schedule.enter(inc, 0, delaySchduled, (inc, redisClient))
except Exception as e:
print("delay error" + e)


def delayKeyThread(redisClient):
"""Delay task thread.
Args:
redisClient: redis client.
"""
schedule.enter(0, 0, delaySchduled, (5, redisClient))
schedule.run()

+ 0
- 0
dubhe_data_process/entrance/log/dev/.gitkeep View File


+ 1
- 0
dubhe_data_process/of_model/imdb_word_index/imdb_word_index.json
File diff suppressed because it is too large
View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/System-Train-TrainStep-train_job/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-bias-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-bias-v/out View File


+ 3
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-bias/out View File

@@ -0,0 +1,3 @@
ćěa˝aéŢ˝´^«˝Ü¸ˇ˝¤G—˝W W˝24˝d¸ĽNu\˝b˝˝uͽPWÝ˝K»m˝CđŠ˝“˝ČÔ‚˝k%Ł˝ť½{¨Ä˝ŠEš˝–
“˝^čA˝/Ż—˝ŇĎ8˝:2Ź˝ă©˝–{ě˝�đ¨˝#řľ;ôľ6h»˝¨ú9ľ­sťĽ˝×ů˝ô�é˝ęĆ4Ľ`Ćş˝—Uľűľ, ľE˝Č:ż˝Ă�ň˝öq ĽÎźĽnE2ľ?�†˝b˝ĺ“ă˝Ęb¨˝\(Ű˝»Ě˝+DO˝y„î˝×«Ń˝ÎÔĘ˝ë}"˝w¤Bľc…‘˝őý˛˝X ľ×‡´˝XőÝ˝ă�„˝ľ·kuĽź `ĽŮíĂ˝:ńľ+2˝=ŔR˝9+ ľóüľOČ4ĽĎŤ
ľríĄ˝‚:ľË˛˝…¶3ľONz˝ž†Ĺ˝M€Ĺ˝@ŞĹ˝J¬v˝éŮý˝#¸ą˝ěü˝1›Ë˝¶&˝˝ßľşëš˝0Ű˝1ľYĘ˝Cę2˝çë˝–Ş•˝˘.=˝+¤˝˛W!ľ

BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-weight-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-weight-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-0-weight/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-bias-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-bias-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-bias/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-weight-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-weight-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-1-weight/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-bias-m/out View File


+ 3
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-bias-v/out View File

@@ -0,0 +1,3 @@
þ À9n5Ä8?“|9o?8Üä›9èöé8;gL:5GÌ7~ ê8òH7
Xñ9�E¶7Ëm9žZ9¢Ûº8·Ï8ûÚÚ9Ð�õ8ÄÔ·9)+7­Ý»8ð†9…+œ9L±x9±GÐ8pÎ…78Æ7t_8 z:óc,9ºÌ67£:Y(¥8òÃ…9j¨9÷8ƒ8s’:ä|å8%ó¾7w¥’7â©{7IE"9 ©8®�Ë7fÀ’9†¶`7Tÿ 8¸^Ó9Ž»¢7˜`/9!Iþ8÷U9TÇÆ9û®œ8©±89ø¨þ7 7e7¤¡.9Éë8ŠP¡8ëÚe7�½Ä9Ñâ­8¶=æ9ÄoÚ8;²¾7hf`8–Ïš9Ÿ„7MÒ}8Íþ7³Ð-8Þ9v|k86ùp9f¾E7j‹®9C…S:W„9um9õF8(/#8NÈß7Œ|#:?‹µ8è
8R¬¶7¼-°9ïü¼7•i7Üß 9Úî8Z”¯7„�ž7»Q9±>Æ7PåT8O¶:.Ïé7î£É7

+ 2
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-bias/out View File

@@ -0,0 +1,2 @@
jò½?®c½Ø�½à?½ç] ½‡Š½ ‰è½ÁÛ¹»Ÿ=…½ï½�¨º½q~¼†o‚½3cj½ßN½ú:�½—Q¾½B—½£Š½½-®½ §p½,˜³½åFѽ×&³½“f€½áÙ¦¼ë¿"½•®ò¼ß�ò½Àï�½·L ½/X¾ÝcA½4{£½™Â�½¯~÷¼0Ô¾DÁp½D#}½B2½@6½¢%’½>K½tz2½©Ê ½ *¼¹'„½š}½½ÿoi½’ÍÀ½£Íi½æK½´Ô½œ¡L½$�½�ž¬¼ÌÇ»E(r½\¨½V¦‚½v±Ó¼÷ÅŽù-6½<WÔ½CCF½Œú½$|½­³¿½o¡½ˆ½)""½G¿0½/¬t½ù½U½ÔÌB½:’9½£¡½ ×û½÷ ½Çr†½“ɽ�ß ¼p“¼$ÌÔ½MDj½¹ ½î¼ò࿽™2½6¿#½[±¨½¾‚½"As¼°¾¢¼“Ƚž`¼K&½0j˽7ø½
`½

BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-weight-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-weight-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-2-weight/out View File


+ 1
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-bias-m/out View File

@@ -0,0 +1 @@
˘¹9ÛÛˆ:4]‡¹‹ãE¹îù:yTÏ:j…:Àf¿:©ß:Dr¤»¸¸Â%¨9lÖ«:Ïï9/é :(Gž:DAñ¹LŸ:Ô7–;rx: xæ:6>d;E"º¥T;Ó™¹Þ&Î:nú‡:hÉ¥;Õ%©;*À;œ5:�}Ì:�9Oº+U»|!ºÇúZ:›9Í ‰º J\:rz�:Þ`»å§¤¹à…ºÒ™¡:öW]ºÙ~ö:Ó’�;”üܺG•Ë::¶1:ë„|:Ô?;íÍ{:j^};@‹Òº�±:RÉݶû~ñ:dÃ?;w·z;�ʰ9d‘;°Œ9;¥½d9t6—9 Å�9¿ì¡º\L;cÊe;¹Jº?“õ:ɘœ;4½À:Ê:fá3<‰~;ôb¸¸!³;ö#O;©ÿ;B: A�ºU㺕ṹE¹iÓÅ; rº¿Ó;�w£9µ;üÃ:¤¦;áß]:ÀDº›>;?ߪ:µÿ3:ò\;:dè9

BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-bias-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-bias/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-weight-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-weight-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/conv-3-weight/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-bias-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-bias-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-bias/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-weight-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-weight-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-1-weight/out View File


+ 1
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-bias-m/out View File

@@ -0,0 +1 @@
$в&;в&»

+ 1
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-bias-v/out View File

@@ -0,0 +1 @@
Պ;֊;

+ 1
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-bias/out View File

@@ -0,0 +1 @@
و+ �ي+ <

BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-weight-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-weight-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/dense-2-weight/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/embedding-weight-m/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/embedding-weight-v/out View File


BIN
dubhe_data_process/of_model/textcnn_imdb_of_best_model/embedding-weight/out View File


+ 0
- 0
dubhe_data_process/of_model/textcnn_imdb_of_best_model/snapshot_done View File


Loading…
Cancel
Save