Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/9856286master
| @@ -238,24 +238,3 @@ def check_box(box: list, image_height, image_width) -> bool: | |||||
| if box[3] < 0 or box[3] >= image_height: | if box[3] < 0 or box[3] >= image_height: | ||||
| return False | return False | ||||
| return True | return True | ||||
| def show_tracking_result(video_in_path, bboxes, video_save_path): | |||||
| cap = cv2.VideoCapture(video_in_path) | |||||
| for i in range(len(bboxes)): | |||||
| box = bboxes[i] | |||||
| success, frame = cap.read() | |||||
| if success is False: | |||||
| raise Exception(video_in_path, | |||||
| ' can not be correctly decoded by OpenCV.') | |||||
| if i == 0: | |||||
| size = (frame.shape[1], frame.shape[0]) | |||||
| fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G') | |||||
| video_writer = cv2.VideoWriter(video_save_path, fourcc, | |||||
| cap.get(cv2.CAP_PROP_FPS), size, | |||||
| True) | |||||
| cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), | |||||
| 5) | |||||
| video_writer.write(frame) | |||||
| video_writer.release | |||||
| cap.release() | |||||
| @@ -1,18 +0,0 @@ | |||||
| import cv2 | |||||
| import numpy as np | |||||
| def numpy_to_cv2img(vis_img): | |||||
| """to convert a np.array Hotmap with shape(h, w) to cv2 img | |||||
| Args: | |||||
| vis_img (np.array): input data | |||||
| Returns: | |||||
| cv2 img | |||||
| """ | |||||
| vis_img = (vis_img - vis_img.min()) / ( | |||||
| vis_img.max() - vis_img.min() + 1e-5) | |||||
| vis_img = (vis_img * 255).astype(np.uint8) | |||||
| vis_img = cv2.applyColorMap(vis_img, cv2.COLORMAP_JET) | |||||
| return vis_img | |||||
| @@ -0,0 +1,136 @@ | |||||
| import cv2 | |||||
| import numpy as np | |||||
| from modelscope.outputs import OutputKeys | |||||
| from modelscope.preprocessors.image import load_image | |||||
| def numpy_to_cv2img(img_array): | |||||
| """to convert a np.array with shape(h, w) to cv2 img | |||||
| Args: | |||||
| img_array (np.array): input data | |||||
| Returns: | |||||
| cv2 img | |||||
| """ | |||||
| img_array = (img_array - img_array.min()) / ( | |||||
| img_array.max() - img_array.min() + 1e-5) | |||||
| img_array = (img_array * 255).astype(np.uint8) | |||||
| img_array = cv2.applyColorMap(img_array, cv2.COLORMAP_JET) | |||||
| return img_array | |||||
| def draw_joints(image, np_kps, score, threshold=0.2): | |||||
| lst_parent_ids_17 = [0, 0, 0, 1, 2, 0, 0, 5, 6, 7, 8, 5, 6, 11, 12, 13, 14] | |||||
| lst_left_ids_17 = [1, 3, 5, 7, 9, 11, 13, 15] | |||||
| lst_right_ids_17 = [2, 4, 6, 8, 10, 12, 14, 16] | |||||
| lst_parent_ids_15 = [0, 0, 1, 2, 3, 1, 5, 6, 14, 8, 9, 14, 11, 12, 1] | |||||
| lst_left_ids_15 = [2, 3, 4, 8, 9, 10] | |||||
| lst_right_ids_15 = [5, 6, 7, 11, 12, 13] | |||||
| if np_kps.shape[0] == 17: | |||||
| lst_parent_ids = lst_parent_ids_17 | |||||
| lst_left_ids = lst_left_ids_17 | |||||
| lst_right_ids = lst_right_ids_17 | |||||
| elif np_kps.shape[0] == 15: | |||||
| lst_parent_ids = lst_parent_ids_15 | |||||
| lst_left_ids = lst_left_ids_15 | |||||
| lst_right_ids = lst_right_ids_15 | |||||
| for i in range(len(lst_parent_ids)): | |||||
| pid = lst_parent_ids[i] | |||||
| if i == pid: | |||||
| continue | |||||
| if (score[i] < threshold or score[1] < threshold): | |||||
| continue | |||||
| if i in lst_left_ids and pid in lst_left_ids: | |||||
| color = (0, 255, 0) | |||||
| elif i in lst_right_ids and pid in lst_right_ids: | |||||
| color = (255, 0, 0) | |||||
| else: | |||||
| color = (0, 255, 255) | |||||
| cv2.line(image, (int(np_kps[i, 0]), int(np_kps[i, 1])), | |||||
| (int(np_kps[pid][0]), int(np_kps[pid, 1])), color, 3) | |||||
| for i in range(np_kps.shape[0]): | |||||
| if score[i] < threshold: | |||||
| continue | |||||
| cv2.circle(image, (int(np_kps[i, 0]), int(np_kps[i, 1])), 5, | |||||
| (0, 0, 255), -1) | |||||
| def draw_box(image, box): | |||||
| cv2.rectangle(image, (int(box[0][0]), int(box[0][1])), | |||||
| (int(box[1][0]), int(box[1][1])), (0, 0, 255), 2) | |||||
| def draw_keypoints(output, original_image): | |||||
| poses = np.array(output[OutputKeys.POSES]) | |||||
| scores = np.array(output[OutputKeys.SCORES]) | |||||
| boxes = np.array(output[OutputKeys.BOXES]) | |||||
| assert len(poses) == len(scores) and len(poses) == len(boxes) | |||||
| image = cv2.imread(original_image, -1) | |||||
| for i in range(len(poses)): | |||||
| draw_box(image, np.array(boxes[i])) | |||||
| draw_joints(image, np.array(poses[i]), np.array(scores[i])) | |||||
| return image | |||||
| def draw_face_detection_result(img_path, detection_result): | |||||
| bboxes = np.array(detection_result[OutputKeys.BOXES]) | |||||
| kpss = np.array(detection_result[OutputKeys.KEYPOINTS]) | |||||
| scores = np.array(detection_result[OutputKeys.SCORES]) | |||||
| img = cv2.imread(img_path) | |||||
| assert img is not None, f"Can't read img: {img_path}" | |||||
| for i in range(len(scores)): | |||||
| bbox = bboxes[i].astype(np.int32) | |||||
| kps = kpss[i].reshape(-1, 2).astype(np.int32) | |||||
| score = scores[i] | |||||
| x1, y1, x2, y2 = bbox | |||||
| cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) | |||||
| for kp in kps: | |||||
| cv2.circle(img, tuple(kp), 1, (0, 0, 255), 1) | |||||
| cv2.putText( | |||||
| img, | |||||
| f'{score:.2f}', (x1, y2), | |||||
| 1, | |||||
| 1.0, (0, 255, 0), | |||||
| thickness=1, | |||||
| lineType=8) | |||||
| print(f'Found {len(scores)} faces') | |||||
| return img | |||||
| def created_boxed_image(image_in, box): | |||||
| image = load_image(image_in) | |||||
| img = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR) | |||||
| cv2.rectangle(img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), | |||||
| (0, 255, 0), 3) | |||||
| return img | |||||
| def show_video_tracking_result(video_in_path, bboxes, video_save_path): | |||||
| cap = cv2.VideoCapture(video_in_path) | |||||
| for i in range(len(bboxes)): | |||||
| box = bboxes[i] | |||||
| success, frame = cap.read() | |||||
| if success is False: | |||||
| raise Exception(video_in_path, | |||||
| ' can not be correctly decoded by OpenCV.') | |||||
| if i == 0: | |||||
| size = (frame.shape[1], frame.shape[0]) | |||||
| fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G') | |||||
| video_writer = cv2.VideoWriter(video_save_path, fourcc, | |||||
| cap.get(cv2.CAP_PROP_FPS), size, | |||||
| True) | |||||
| cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), | |||||
| 5) | |||||
| video_writer.write(frame) | |||||
| video_writer.release | |||||
| cap.release() | |||||
| @@ -0,0 +1,43 @@ | |||||
| from typing import List | |||||
| from modelscope.outputs import OutputKeys | |||||
| from modelscope.pipelines.nlp import (ConversationalTextToSqlPipeline, | |||||
| DialogStateTrackingPipeline) | |||||
| def text2sql_tracking_and_print_results( | |||||
| test_case, pipelines: List[ConversationalTextToSqlPipeline]): | |||||
| for p in pipelines: | |||||
| last_sql, history = '', [] | |||||
| for item in test_case['utterance']: | |||||
| case = { | |||||
| 'utterance': item, | |||||
| 'history': history, | |||||
| 'last_sql': last_sql, | |||||
| 'database_id': test_case['database_id'], | |||||
| 'local_db_path': test_case['local_db_path'] | |||||
| } | |||||
| results = p(case) | |||||
| print({'question': item}) | |||||
| print(results) | |||||
| last_sql = results['text'] | |||||
| history.append(item) | |||||
| def tracking_and_print_dialog_states( | |||||
| test_case, pipelines: List[DialogStateTrackingPipeline]): | |||||
| import json | |||||
| pipelines_len = len(pipelines) | |||||
| history_states = [{}] | |||||
| utter = {} | |||||
| for step, item in enumerate(test_case): | |||||
| utter.update(item) | |||||
| result = pipelines[step % pipelines_len]({ | |||||
| 'utter': | |||||
| utter, | |||||
| 'history_states': | |||||
| history_states | |||||
| }) | |||||
| print(json.dumps(result)) | |||||
| history_states.extend([result[OutputKeys.OUTPUT], {}]) | |||||
| @@ -15,23 +15,6 @@ class ActionRecognitionTest(unittest.TestCase): | |||||
| def setUp(self) -> None: | def setUp(self) -> None: | ||||
| self.model_id = 'damo/cv_TAdaConv_action-recognition' | self.model_id = 'damo/cv_TAdaConv_action-recognition' | ||||
| @unittest.skip('deprecated, download model from model hub instead') | |||||
| def test_run_with_direct_file_download(self): | |||||
| model_path = 'https://aquila2-online-models.oss-cn-shanghai.aliyuncs.com/maas_test/pytorch_model.pt' | |||||
| config_path = 'https://aquila2-online-models.oss-cn-shanghai.aliyuncs.com/maas_test/configuration.json' | |||||
| with tempfile.TemporaryDirectory() as tmp_dir: | |||||
| model_file = osp.join(tmp_dir, ModelFile.TORCH_MODEL_FILE) | |||||
| with open(model_file, 'wb') as ofile1: | |||||
| ofile1.write(File.read(model_path)) | |||||
| config_file = osp.join(tmp_dir, ModelFile.CONFIGURATION) | |||||
| with open(config_file, 'wb') as ofile2: | |||||
| ofile2.write(File.read(config_path)) | |||||
| recognition_pipeline = pipeline( | |||||
| Tasks.action_recognition, model=tmp_dir) | |||||
| result = recognition_pipeline( | |||||
| 'data/test/videos/action_recognition_test_video.mp4') | |||||
| print(f'recognition output: {result}.') | |||||
| @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | ||||
| def test_run_modelhub(self): | def test_run_modelhub(self): | ||||
| recognition_pipeline = pipeline( | recognition_pipeline = pipeline( | ||||
| @@ -9,59 +9,9 @@ from modelscope.outputs import OutputKeys | |||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.pipelines.base import Pipeline | from modelscope.pipelines.base import Pipeline | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.cv.image_utils import draw_keypoints | |||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| lst_parent_ids_17 = [0, 0, 0, 1, 2, 0, 0, 5, 6, 7, 8, 5, 6, 11, 12, 13, 14] | |||||
| lst_left_ids_17 = [1, 3, 5, 7, 9, 11, 13, 15] | |||||
| lst_right_ids_17 = [2, 4, 6, 8, 10, 12, 14, 16] | |||||
| lst_spine_ids_17 = [0] | |||||
| lst_parent_ids_15 = [0, 0, 1, 2, 3, 1, 5, 6, 14, 8, 9, 14, 11, 12, 1] | |||||
| lst_left_ids_15 = [2, 3, 4, 8, 9, 10] | |||||
| lst_right_ids_15 = [5, 6, 7, 11, 12, 13] | |||||
| lst_spine_ids_15 = [0, 1, 14] | |||||
| def draw_joints(image, np_kps, score, threshold=0.2): | |||||
| if np_kps.shape[0] == 17: | |||||
| lst_parent_ids = lst_parent_ids_17 | |||||
| lst_left_ids = lst_left_ids_17 | |||||
| lst_right_ids = lst_right_ids_17 | |||||
| elif np_kps.shape[0] == 15: | |||||
| lst_parent_ids = lst_parent_ids_15 | |||||
| lst_left_ids = lst_left_ids_15 | |||||
| lst_right_ids = lst_right_ids_15 | |||||
| for i in range(len(lst_parent_ids)): | |||||
| pid = lst_parent_ids[i] | |||||
| if i == pid: | |||||
| continue | |||||
| if (score[i] < threshold or score[1] < threshold): | |||||
| continue | |||||
| if i in lst_left_ids and pid in lst_left_ids: | |||||
| color = (0, 255, 0) | |||||
| elif i in lst_right_ids and pid in lst_right_ids: | |||||
| color = (255, 0, 0) | |||||
| else: | |||||
| color = (0, 255, 255) | |||||
| cv2.line(image, (int(np_kps[i, 0]), int(np_kps[i, 1])), | |||||
| (int(np_kps[pid][0]), int(np_kps[pid, 1])), color, 3) | |||||
| for i in range(np_kps.shape[0]): | |||||
| if score[i] < threshold: | |||||
| continue | |||||
| cv2.circle(image, (int(np_kps[i, 0]), int(np_kps[i, 1])), 5, | |||||
| (0, 0, 255), -1) | |||||
| def draw_box(image, box): | |||||
| cv2.rectangle(image, (int(box[0][0]), int(box[0][1])), | |||||
| (int(box[1][0]), int(box[1][1])), (0, 0, 255), 2) | |||||
| class Body2DKeypointsTest(unittest.TestCase): | class Body2DKeypointsTest(unittest.TestCase): | ||||
| @@ -71,14 +21,7 @@ class Body2DKeypointsTest(unittest.TestCase): | |||||
| def pipeline_inference(self, pipeline: Pipeline, pipeline_input): | def pipeline_inference(self, pipeline: Pipeline, pipeline_input): | ||||
| output = pipeline(pipeline_input) | output = pipeline(pipeline_input) | ||||
| poses = np.array(output[OutputKeys.POSES]) | |||||
| scores = np.array(output[OutputKeys.SCORES]) | |||||
| boxes = np.array(output[OutputKeys.BOXES]) | |||||
| assert len(poses) == len(scores) and len(poses) == len(boxes) | |||||
| image = cv2.imread(self.test_image, -1) | |||||
| for i in range(len(poses)): | |||||
| draw_box(image, np.array(boxes[i])) | |||||
| draw_joints(image, np.array(poses[i]), np.array(scores[i])) | |||||
| image = draw_keypoints(output, self.test_image) | |||||
| cv2.imwrite('pose_keypoint.jpg', image) | cv2.imwrite('pose_keypoint.jpg', image) | ||||
| @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | ||||
| @@ -9,6 +9,7 @@ from modelscope.pipelines import pipeline | |||||
| from modelscope.pipelines.nlp import ConversationalTextToSqlPipeline | from modelscope.pipelines.nlp import ConversationalTextToSqlPipeline | ||||
| from modelscope.preprocessors import ConversationalTextToSqlPreprocessor | from modelscope.preprocessors import ConversationalTextToSqlPreprocessor | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.nlp.nlp_utils import text2sql_tracking_and_print_results | |||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -25,24 +26,6 @@ class ConversationalTextToSql(unittest.TestCase): | |||||
| ] | ] | ||||
| } | } | ||||
| def tracking_and_print_results( | |||||
| self, pipelines: List[ConversationalTextToSqlPipeline]): | |||||
| for my_pipeline in pipelines: | |||||
| last_sql, history = '', [] | |||||
| for item in self.test_case['utterance']: | |||||
| case = { | |||||
| 'utterance': item, | |||||
| 'history': history, | |||||
| 'last_sql': last_sql, | |||||
| 'database_id': self.test_case['database_id'], | |||||
| 'local_db_path': self.test_case['local_db_path'] | |||||
| } | |||||
| results = my_pipeline(case) | |||||
| print({'question': item}) | |||||
| print(results) | |||||
| last_sql = results['text'] | |||||
| history.append(item) | |||||
| @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | ||||
| def test_run_by_direct_model_download(self): | def test_run_by_direct_model_download(self): | ||||
| cache_path = snapshot_download(self.model_id) | cache_path = snapshot_download(self.model_id) | ||||
| @@ -61,7 +44,7 @@ class ConversationalTextToSql(unittest.TestCase): | |||||
| model=model, | model=model, | ||||
| preprocessor=preprocessor) | preprocessor=preprocessor) | ||||
| ] | ] | ||||
| self.tracking_and_print_results(pipelines) | |||||
| text2sql_tracking_and_print_results(self.test_case, pipelines) | |||||
| @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | ||||
| def test_run_with_model_from_modelhub(self): | def test_run_with_model_from_modelhub(self): | ||||
| @@ -77,7 +60,7 @@ class ConversationalTextToSql(unittest.TestCase): | |||||
| model=model, | model=model, | ||||
| preprocessor=preprocessor) | preprocessor=preprocessor) | ||||
| ] | ] | ||||
| self.tracking_and_print_results(pipelines) | |||||
| text2sql_tracking_and_print_results(self.test_case, pipelines) | |||||
| @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | ||||
| def test_run_with_model_name(self): | def test_run_with_model_name(self): | ||||
| @@ -85,12 +68,12 @@ class ConversationalTextToSql(unittest.TestCase): | |||||
| pipeline( | pipeline( | ||||
| task=Tasks.conversational_text_to_sql, model=self.model_id) | task=Tasks.conversational_text_to_sql, model=self.model_id) | ||||
| ] | ] | ||||
| self.tracking_and_print_results(pipelines) | |||||
| text2sql_tracking_and_print_results(self.test_case, pipelines) | |||||
| @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | ||||
| def test_run_with_default_model(self): | def test_run_with_default_model(self): | ||||
| pipelines = [pipeline(task=Tasks.conversational_text_to_sql)] | pipelines = [pipeline(task=Tasks.conversational_text_to_sql)] | ||||
| self.tracking_and_print_results(pipelines) | |||||
| text2sql_tracking_and_print_results(self.test_case, pipelines) | |||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||
| @@ -2,13 +2,12 @@ | |||||
| import unittest | import unittest | ||||
| import cv2 | import cv2 | ||||
| import numpy as np | |||||
| from PIL import Image | from PIL import Image | ||||
| from modelscope.outputs import OutputKeys | from modelscope.outputs import OutputKeys | ||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.cv.heatmap import numpy_to_cv2img | |||||
| from modelscope.utils.cv.image_utils import numpy_to_cv2img | |||||
| from modelscope.utils.logger import get_logger | from modelscope.utils.logger import get_logger | ||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -1,15 +1,14 @@ | |||||
| # Copyright (c) Alibaba, Inc. and its affiliates. | # Copyright (c) Alibaba, Inc. and its affiliates. | ||||
| import unittest | import unittest | ||||
| from typing import List | |||||
| from modelscope.hub.snapshot_download import snapshot_download | from modelscope.hub.snapshot_download import snapshot_download | ||||
| from modelscope.models import Model | from modelscope.models import Model | ||||
| from modelscope.models.nlp import SpaceForDialogStateTracking | from modelscope.models.nlp import SpaceForDialogStateTracking | ||||
| from modelscope.outputs import OutputKeys | |||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.pipelines.nlp import DialogStateTrackingPipeline | from modelscope.pipelines.nlp import DialogStateTrackingPipeline | ||||
| from modelscope.preprocessors import DialogStateTrackingPreprocessor | from modelscope.preprocessors import DialogStateTrackingPreprocessor | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.nlp.nlp_utils import tracking_and_print_dialog_states | |||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -79,24 +78,6 @@ class DialogStateTrackingTest(unittest.TestCase): | |||||
| 'User-8': 'Thank you, goodbye', | 'User-8': 'Thank you, goodbye', | ||||
| }] | }] | ||||
| def tracking_and_print_dialog_states( | |||||
| self, pipelines: List[DialogStateTrackingPipeline]): | |||||
| import json | |||||
| pipelines_len = len(pipelines) | |||||
| history_states = [{}] | |||||
| utter = {} | |||||
| for step, item in enumerate(self.test_case): | |||||
| utter.update(item) | |||||
| result = pipelines[step % pipelines_len]({ | |||||
| 'utter': | |||||
| utter, | |||||
| 'history_states': | |||||
| history_states | |||||
| }) | |||||
| print(json.dumps(result)) | |||||
| history_states.extend([result[OutputKeys.OUTPUT], {}]) | |||||
| @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | ||||
| def test_run_by_direct_model_download(self): | def test_run_by_direct_model_download(self): | ||||
| cache_path = snapshot_download(self.model_id, revision='update') | cache_path = snapshot_download(self.model_id, revision='update') | ||||
| @@ -111,7 +92,7 @@ class DialogStateTrackingTest(unittest.TestCase): | |||||
| model=model, | model=model, | ||||
| preprocessor=preprocessor) | preprocessor=preprocessor) | ||||
| ] | ] | ||||
| self.tracking_and_print_dialog_states(pipelines) | |||||
| tracking_and_print_dialog_states(self.test_case, pipelines) | |||||
| @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | ||||
| def test_run_with_model_from_modelhub(self): | def test_run_with_model_from_modelhub(self): | ||||
| @@ -128,7 +109,7 @@ class DialogStateTrackingTest(unittest.TestCase): | |||||
| preprocessor=preprocessor) | preprocessor=preprocessor) | ||||
| ] | ] | ||||
| self.tracking_and_print_dialog_states(pipelines) | |||||
| tracking_and_print_dialog_states(self.test_case, pipelines) | |||||
| @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | ||||
| def test_run_with_model_name(self): | def test_run_with_model_name(self): | ||||
| @@ -138,7 +119,7 @@ class DialogStateTrackingTest(unittest.TestCase): | |||||
| model=self.model_id, | model=self.model_id, | ||||
| model_revision='update') | model_revision='update') | ||||
| ] | ] | ||||
| self.tracking_and_print_dialog_states(pipelines) | |||||
| tracking_and_print_dialog_states(self.test_case, pipelines) | |||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||
| @@ -9,6 +9,7 @@ from modelscope.msdatasets import MsDataset | |||||
| from modelscope.outputs import OutputKeys | from modelscope.outputs import OutputKeys | ||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.cv.image_utils import draw_face_detection_result | |||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -17,46 +18,21 @@ class FaceDetectionTest(unittest.TestCase): | |||||
| def setUp(self) -> None: | def setUp(self) -> None: | ||||
| self.model_id = 'damo/cv_resnet_facedetection_scrfd10gkps' | self.model_id = 'damo/cv_resnet_facedetection_scrfd10gkps' | ||||
| def show_result(self, img_path, bboxes, kpss, scores): | |||||
| bboxes = np.array(bboxes) | |||||
| kpss = np.array(kpss) | |||||
| scores = np.array(scores) | |||||
| img = cv2.imread(img_path) | |||||
| assert img is not None, f"Can't read img: {img_path}" | |||||
| for i in range(len(scores)): | |||||
| bbox = bboxes[i].astype(np.int32) | |||||
| kps = kpss[i].reshape(-1, 2).astype(np.int32) | |||||
| score = scores[i] | |||||
| x1, y1, x2, y2 = bbox | |||||
| cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) | |||||
| for kp in kps: | |||||
| cv2.circle(img, tuple(kp), 1, (0, 0, 255), 1) | |||||
| cv2.putText( | |||||
| img, | |||||
| f'{score:.2f}', (x1, y2), | |||||
| 1, | |||||
| 1.0, (0, 255, 0), | |||||
| thickness=1, | |||||
| lineType=8) | |||||
| def show_result(self, img_path, detection_result): | |||||
| img = draw_face_detection_result(img_path, detection_result) | |||||
| cv2.imwrite('result.png', img) | cv2.imwrite('result.png', img) | ||||
| print( | |||||
| f'Found {len(scores)} faces, output written to {osp.abspath("result.png")}' | |||||
| ) | |||||
| print(f'output written to {osp.abspath("result.png")}') | |||||
| @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | ||||
| def test_run_with_dataset(self): | def test_run_with_dataset(self): | ||||
| input_location = ['data/test/images/face_detection.png'] | input_location = ['data/test/images/face_detection.png'] | ||||
| # alternatively: | |||||
| # input_location = '/dir/to/images' | |||||
| dataset = MsDataset.load(input_location, target='image') | dataset = MsDataset.load(input_location, target='image') | ||||
| face_detection = pipeline(Tasks.face_detection, model=self.model_id) | face_detection = pipeline(Tasks.face_detection, model=self.model_id) | ||||
| # note that for dataset output, the inference-output is a Generator that can be iterated. | # note that for dataset output, the inference-output is a Generator that can be iterated. | ||||
| result = face_detection(dataset) | result = face_detection(dataset) | ||||
| result = next(result) | result = next(result) | ||||
| self.show_result(input_location[0], result[OutputKeys.BOXES], | |||||
| result[OutputKeys.KEYPOINTS], | |||||
| result[OutputKeys.SCORES]) | |||||
| self.show_result(input_location[0], result) | |||||
| @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | ||||
| def test_run_modelhub(self): | def test_run_modelhub(self): | ||||
| @@ -64,18 +40,14 @@ class FaceDetectionTest(unittest.TestCase): | |||||
| img_path = 'data/test/images/face_detection.png' | img_path = 'data/test/images/face_detection.png' | ||||
| result = face_detection(img_path) | result = face_detection(img_path) | ||||
| self.show_result(img_path, result[OutputKeys.BOXES], | |||||
| result[OutputKeys.KEYPOINTS], | |||||
| result[OutputKeys.SCORES]) | |||||
| self.show_result(img_path, result) | |||||
| @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | ||||
| def test_run_modelhub_default_model(self): | def test_run_modelhub_default_model(self): | ||||
| face_detection = pipeline(Tasks.face_detection) | face_detection = pipeline(Tasks.face_detection) | ||||
| img_path = 'data/test/images/face_detection.png' | img_path = 'data/test/images/face_detection.png' | ||||
| result = face_detection(img_path) | result = face_detection(img_path) | ||||
| self.show_result(img_path, result[OutputKeys.BOXES], | |||||
| result[OutputKeys.KEYPOINTS], | |||||
| result[OutputKeys.SCORES]) | |||||
| self.show_result(img_path, result) | |||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||
| @@ -1,5 +1,4 @@ | |||||
| # Copyright (c) Alibaba, Inc. and its affiliates. | # Copyright (c) Alibaba, Inc. and its affiliates. | ||||
| import os | |||||
| import os.path as osp | import os.path as osp | ||||
| import unittest | import unittest | ||||
| @@ -21,7 +21,6 @@ class FaceRecognitionTest(unittest.TestCase): | |||||
| face_recognition = pipeline( | face_recognition = pipeline( | ||||
| Tasks.face_recognition, model=self.model_id) | Tasks.face_recognition, model=self.model_id) | ||||
| # note that for dataset output, the inference-output is a Generator that can be iterated. | |||||
| emb1 = face_recognition(img1)[OutputKeys.IMG_EMBEDDING] | emb1 = face_recognition(img1)[OutputKeys.IMG_EMBEDDING] | ||||
| emb2 = face_recognition(img2)[OutputKeys.IMG_EMBEDDING] | emb2 = face_recognition(img2)[OutputKeys.IMG_EMBEDDING] | ||||
| sim = np.dot(emb1[0], emb2[0]) | sim = np.dot(emb1[0], emb2[0]) | ||||
| @@ -3,6 +3,7 @@ import unittest | |||||
| from torchvision.utils import save_image | from torchvision.utils import save_image | ||||
| from modelscope.outputs import OutputKeys | |||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -27,13 +28,13 @@ class Image2ImageGenerationTest(unittest.TestCase): | |||||
| result2 = img2img_gen_pipeline(('data/test/images/img2img_input.jpg', | result2 = img2img_gen_pipeline(('data/test/images/img2img_input.jpg', | ||||
| 'data/test/images/img2img_style.jpg')) | 'data/test/images/img2img_style.jpg')) | ||||
| save_image( | save_image( | ||||
| result1['output_img'].clamp(-1, 1), | |||||
| result1[OutputKeys.OUTPUT_IMG].clamp(-1, 1), | |||||
| 'result1.jpg', | 'result1.jpg', | ||||
| range=(-1, 1), | range=(-1, 1), | ||||
| normalize=True, | normalize=True, | ||||
| nrow=4) | nrow=4) | ||||
| save_image( | save_image( | ||||
| result2['output_img'].clamp(-1, 1), | |||||
| result2[OutputKeys.OUTPUT_IMG].clamp(-1, 1), | |||||
| 'result2.jpg', | 'result2.jpg', | ||||
| range=(-1, 1), | range=(-1, 1), | ||||
| normalize=True, | normalize=True, | ||||
| @@ -18,19 +18,6 @@ class ImageMattingTest(unittest.TestCase): | |||||
| def setUp(self) -> None: | def setUp(self) -> None: | ||||
| self.model_id = 'damo/cv_unet_image-matting' | self.model_id = 'damo/cv_unet_image-matting' | ||||
| @unittest.skip('deprecated, download model from model hub instead') | |||||
| def test_run_with_direct_file_download(self): | |||||
| model_path = 'http://pai-vision-data-hz.oss-cn-zhangjiakou.aliyuncs' \ | |||||
| '.com/data/test/maas/image_matting/matting_person.pb' | |||||
| with tempfile.TemporaryDirectory() as tmp_dir: | |||||
| model_file = osp.join(tmp_dir, ModelFile.TF_GRAPH_FILE) | |||||
| with open(model_file, 'wb') as ofile: | |||||
| ofile.write(File.read(model_path)) | |||||
| img_matting = pipeline(Tasks.portrait_matting, model=tmp_dir) | |||||
| result = img_matting('data/test/images/image_matting.png') | |||||
| cv2.imwrite('result.png', result[OutputKeys.OUTPUT_IMG]) | |||||
| @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | ||||
| def test_run_with_dataset(self): | def test_run_with_dataset(self): | ||||
| input_location = ['data/test/images/image_matting.png'] | input_location = ['data/test/images/image_matting.png'] | ||||
| @@ -15,7 +15,7 @@ class ImageStyleTransferTest(unittest.TestCase): | |||||
| def setUp(self) -> None: | def setUp(self) -> None: | ||||
| self.model_id = 'damo/cv_aams_style-transfer_damo' | self.model_id = 'damo/cv_aams_style-transfer_damo' | ||||
| @unittest.skip('deprecated, download model from model hub instead') | |||||
| @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | |||||
| def test_run_by_direct_model_download(self): | def test_run_by_direct_model_download(self): | ||||
| snapshot_path = snapshot_download(self.model_id) | snapshot_path = snapshot_download(self.model_id) | ||||
| print('snapshot_path: {}'.format(snapshot_path)) | print('snapshot_path: {}'.format(snapshot_path)) | ||||
| @@ -1,7 +1,6 @@ | |||||
| import os.path | import os.path | ||||
| import unittest | import unittest | ||||
| from modelscope.fileio import File | |||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -4,14 +4,13 @@ import unittest | |||||
| from os import path as osp | from os import path as osp | ||||
| import cv2 | import cv2 | ||||
| import numpy as np | |||||
| from PIL import Image | from PIL import Image | ||||
| from modelscope.models import Model | from modelscope.models import Model | ||||
| from modelscope.outputs import OutputKeys | from modelscope.outputs import OutputKeys | ||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.preprocessors.image import load_image | |||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.cv.image_utils import created_boxed_image | |||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -22,11 +21,9 @@ class OfaTasksTest(unittest.TestCase): | |||||
| os.makedirs(self.output_dir, exist_ok=True) | os.makedirs(self.output_dir, exist_ok=True) | ||||
| def save_img(self, image_in, box, image_out): | def save_img(self, image_in, box, image_out): | ||||
| image = load_image(image_in) | |||||
| img = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR) | |||||
| cv2.rectangle(img, (int(box[0]), int(box[1])), | |||||
| (int(box[2]), int(box[3])), (0, 255, 0), 3) | |||||
| cv2.imwrite(osp.join(self.output_dir, image_out), img) | |||||
| cv2.imwrite( | |||||
| osp.join(self.output_dir, image_out), | |||||
| created_boxed_image(image_in, box)) | |||||
| @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | ||||
| def test_run_with_image_captioning_with_model(self): | def test_run_with_image_captioning_with_model(self): | ||||
| @@ -24,19 +24,6 @@ class ImageCartoonTest(unittest.TestCase): | |||||
| cv2.imwrite('result.png', result[OutputKeys.OUTPUT_IMG]) | cv2.imwrite('result.png', result[OutputKeys.OUTPUT_IMG]) | ||||
| print(f'Output written to {osp.abspath("result.png")}') | print(f'Output written to {osp.abspath("result.png")}') | ||||
| @unittest.skip('deprecated, download model from model hub instead') | |||||
| def test_run_by_direct_model_download(self): | |||||
| model_dir = './assets' | |||||
| if not os.path.exists(model_dir): | |||||
| os.system( | |||||
| 'wget https://invi-label.oss-cn-shanghai.aliyuncs.com/label/model/cartoon/assets.zip' | |||||
| ) | |||||
| os.system('unzip assets.zip') | |||||
| img_cartoon = pipeline( | |||||
| Tasks.image_portrait_stylization, model=model_dir) | |||||
| self.pipeline_inference(img_cartoon, self.test_image) | |||||
| @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 0, 'skip test in current test level') | ||||
| def test_run_modelhub(self): | def test_run_modelhub(self): | ||||
| img_cartoon = pipeline( | img_cartoon = pipeline( | ||||
| @@ -23,10 +23,9 @@ class SkinRetouchingTest(unittest.TestCase): | |||||
| cv2.imwrite('result_skinretouching.png', result[OutputKeys.OUTPUT_IMG]) | cv2.imwrite('result_skinretouching.png', result[OutputKeys.OUTPUT_IMG]) | ||||
| print(f'Output written to {osp.abspath("result_skinretouching.png")}') | print(f'Output written to {osp.abspath("result_skinretouching.png")}') | ||||
| @unittest.skip('deprecated, download model from model hub instead') | |||||
| @unittest.skipUnless(test_level() >= 2, 'skip test in current test level') | |||||
| def test_run_by_direct_model_download(self): | def test_run_by_direct_model_download(self): | ||||
| model_dir = snapshot_download(self.model_id) | model_dir = snapshot_download(self.model_id) | ||||
| skin_retouching = pipeline(Tasks.skin_retouching, model=model_dir) | skin_retouching = pipeline(Tasks.skin_retouching, model=model_dir) | ||||
| self.pipeline_inference(skin_retouching, self.test_image) | self.pipeline_inference(skin_retouching, self.test_image) | ||||
| @@ -1,11 +1,10 @@ | |||||
| # Copyright (c) Alibaba, Inc. and its affiliates. | # Copyright (c) Alibaba, Inc. and its affiliates. | ||||
| import unittest | import unittest | ||||
| from modelscope.models.cv.video_single_object_tracking.utils.utils import \ | |||||
| show_tracking_result | |||||
| from modelscope.outputs import OutputKeys | from modelscope.outputs import OutputKeys | ||||
| from modelscope.pipelines import pipeline | from modelscope.pipelines import pipeline | ||||
| from modelscope.utils.constant import Tasks | from modelscope.utils.constant import Tasks | ||||
| from modelscope.utils.cv.image_utils import show_video_tracking_result | |||||
| from modelscope.utils.test_utils import test_level | from modelscope.utils.test_utils import test_level | ||||
| @@ -22,8 +21,8 @@ class SingleObjectTracking(unittest.TestCase): | |||||
| init_bbox = [414, 343, 514, 449] # [x1, y1, x2, y2] | init_bbox = [414, 343, 514, 449] # [x1, y1, x2, y2] | ||||
| result = video_single_object_tracking((video_path, init_bbox)) | result = video_single_object_tracking((video_path, init_bbox)) | ||||
| print('result is : ', result[OutputKeys.BOXES]) | print('result is : ', result[OutputKeys.BOXES]) | ||||
| show_tracking_result(video_path, result[OutputKeys.BOXES], | |||||
| './tracking_result.avi') | |||||
| show_video_tracking_result(video_path, result[OutputKeys.BOXES], | |||||
| './tracking_result.avi') | |||||
| @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') | ||||
| def test_run_modelhub_default_model(self): | def test_run_modelhub_default_model(self): | ||||