You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

redis_storage.py 2.9 kB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. # !/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. """
  4. Copyright 2020 Tianshu AI Platform. All Rights Reserved.
  5. Licensed under the Apache License, Version 2.0 (the "License");
  6. you may not use this file except in compliance with the License.
  7. You may obtain a copy of the License at
  8. http://www.apache.org/licenses/LICENSE-2.0
  9. Unless required by applicable law or agreed to in writing, software
  10. distributed under the License is distributed on an "AS IS" BASIS,
  11. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. See the License for the specific language governing permissions and
  13. limitations under the License.
  14. =============================================================
  15. """
  16. import json
  17. import uuid
  18. from abc import ABC
  19. from program.abstract.storage import Storage
  20. import common.util.public.RedisUtil as f
  21. import common.config.config as config
  22. import logging
  23. import time
  24. from program.thread.delay_schedule import Redis_thread
  25. logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
  26. level=logging.DEBUG)
  27. class RedisStorage(Storage, ABC):
  28. """
  29. 基于redis实现的任务存储
  30. """
  31. def init_client(path):
  32. """
  33. init method
  34. """
  35. json_data = config.loadJsonData(path)
  36. redis_client = f.getRedisConnection(json_data["ip"], json_data["port"], json_data["database"],
  37. json_data["password"])
  38. logging.info("redis client init success %s", redis_client)
  39. return redis_client
  40. pass
  41. def get_one_task(*args):
  42. logging.debug("Parameter: %s", args)
  43. time.sleep(1)
  44. redis_client = RedisStorage.init_client(args[4])
  45. task_id = redis_client.eval(args[0], args[1], args[2], args[3], int(time.time()))
  46. if len(task_id) > 0 and task_id[0] is not None:
  47. Redis_thread.redis_client_thread = redis_client
  48. Redis_thread.processing_key = task_id[0].decode()
  49. logging.info("------------processing_key = %s------------------", Redis_thread.processing_key)
  50. return task_id[0].decode(), json.loads((redis_client.get(task_id[0].decode().replace('"', ''))).decode())
  51. return 0
  52. def save_result(*args):
  53. """
  54. Save the results
  55. """
  56. redis_client = RedisStorage.init_client(args[2])
  57. uuid_key = str(uuid.uuid1())
  58. uuid_detail_key = "\"" + uuid_key + "\""
  59. if args[4] is True:
  60. redis_client.zrem(Redis_thread.processing_queue, Redis_thread.processing_key)
  61. redis_client.set(uuid_key, json.dumps(args[3]))
  62. f.pushToQueue(redis_client, args[0], uuid_detail_key)
  63. else:
  64. redis_client.zrem(Redis_thread.processing_queue, Redis_thread.processing_key)
  65. redis_client.set(uuid_key, json.dumps(args[3]))
  66. f.pushToQueue(redis_client, args[1], uuid_detail_key)

一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库、视觉模型炼知平台、数据可视化分析平台等一系列平台及工具,在模型高效分布式训练、数据处理和可视分析、模型炼知和轻量化等技术上形成独特优势,目前已在产学研等各领域近千家单位及个人提供AI应用赋能