|
- """
- /**
- * Copyright 2020 Zhejiang Lab. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * =============================================================
- */
- """
- # coding:utf-8
-
- import os
- import json
- import threading
- import time
- import common.RedisUtil as f
- import common.config as config
- import luascript.starttaskscript as start_script
- import logging
- import traceback
- import ofrecord
- logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',level=logging.DEBUG)
-
- basePath = '/nfs/'
- descPath = 'ofrecord/train'
-
- if __name__ == '__main__':
- """Ofrecord algorithm entry."""
- jsonData = config.loadJsonData(config.configPath)
- redisClient = f.getRedisConnection(jsonData["ip"], jsonData["port"], jsonData["database"], jsonData["password"])
- logging.info('init redis client %s', redisClient)
- t = threading.Thread(target=ofrecord.delayKeyThread, args=(redisClient,))
- t.setDaemon(True)
- t.start()
- while 1:
- try:
- if config.loadJsonData(config.sign) == 0:
- logging.info('not to execute new task')
- time.sleep(1)
- else:
- element = redisClient.eval(start_script.startTaskLua, 1, config.ofrecordTaskQueue,
- config.ofrecordStartQueue, int(time.time()))
- if len(element) > 0:
- key = element[0].decode()
- detail = f.getByKey(redisClient, key.replace('"', ''))
- jsonStr = json.loads(detail.decode())
- label_map = {}
- index = 0
- for item in jsonStr["datasetLabels"].keys():
- if index >= 0 and item != '@type':
- label_map[item] = jsonStr["datasetLabels"][item]
- index += 1
- ofrecord.execute(os.path.join(basePath, jsonStr["datasetPath"]),
- os.path.join(basePath, jsonStr["datasetPath"], descPath),
- label_map,
- jsonStr["files"],
- jsonStr["partNum"],
- element[0])
- logging.info('save result to redis')
- f.pushToQueue(redisClient, config.ofrecordFinishQueue, key)
- redisClient.zrem(config.ofrecordStartQueue, element[0])
- else:
- logging.info('task queue is empty.')
- time.sleep(2)
- except Exception as e:
- logging.error('except:', e)
- redisClient.zrem(config.ofrecordStartQueue, element[0])
- time.sleep(1)
|