Browse Source

[to #42322933]move postprocess helper into utilities

master
Yingda Chen 3 years ago
parent
commit
4cc0395ac5
20 changed files with 210 additions and 242 deletions
  1. +0
    -21
      modelscope/models/cv/video_single_object_tracking/utils/utils.py
  2. +0
    -18
      modelscope/utils/cv/heatmap.py
  3. +136
    -0
      modelscope/utils/cv/image_utils.py
  4. +43
    -0
      modelscope/utils/nlp/nlp_utils.py
  5. +0
    -17
      tests/pipelines/test_action_recognition.py
  6. +2
    -59
      tests/pipelines/test_body_2d_keypoints.py
  7. +5
    -22
      tests/pipelines/test_conversational_text_to_sql.py
  8. +1
    -2
      tests/pipelines/test_crowd_counting.py
  9. +4
    -23
      tests/pipelines/test_dialog_state_tracking.py
  10. +7
    -35
      tests/pipelines/test_face_detection.py
  11. +0
    -1
      tests/pipelines/test_face_image_generation.py
  12. +0
    -1
      tests/pipelines/test_face_recognition.py
  13. +3
    -2
      tests/pipelines/test_image2image_generation.py
  14. +0
    -13
      tests/pipelines/test_image_matting.py
  15. +1
    -1
      tests/pipelines/test_image_style_transfer.py
  16. +0
    -1
      tests/pipelines/test_key_word_spotting_farfield.py
  17. +4
    -7
      tests/pipelines/test_ofa_tasks.py
  18. +0
    -13
      tests/pipelines/test_person_image_cartoon.py
  19. +1
    -2
      tests/pipelines/test_skin_retouching.py
  20. +3
    -4
      tests/pipelines/test_video_single_object_tracking.py

+ 0
- 21
modelscope/models/cv/video_single_object_tracking/utils/utils.py View File

@@ -238,24 +238,3 @@ def check_box(box: list, image_height, image_width) -> bool:
if box[3] < 0 or box[3] >= image_height:
return False
return True


def show_tracking_result(video_in_path, bboxes, video_save_path):
cap = cv2.VideoCapture(video_in_path)
for i in range(len(bboxes)):
box = bboxes[i]
success, frame = cap.read()
if success is False:
raise Exception(video_in_path,
' can not be correctly decoded by OpenCV.')
if i == 0:
size = (frame.shape[1], frame.shape[0])
fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
video_writer = cv2.VideoWriter(video_save_path, fourcc,
cap.get(cv2.CAP_PROP_FPS), size,
True)
cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0),
5)
video_writer.write(frame)
video_writer.release
cap.release()

+ 0
- 18
modelscope/utils/cv/heatmap.py View File

@@ -1,18 +0,0 @@
import cv2
import numpy as np


def numpy_to_cv2img(vis_img):
"""to convert a np.array Hotmap with shape(h, w) to cv2 img

Args:
vis_img (np.array): input data

Returns:
cv2 img
"""
vis_img = (vis_img - vis_img.min()) / (
vis_img.max() - vis_img.min() + 1e-5)
vis_img = (vis_img * 255).astype(np.uint8)
vis_img = cv2.applyColorMap(vis_img, cv2.COLORMAP_JET)
return vis_img

+ 136
- 0
modelscope/utils/cv/image_utils.py View File

@@ -0,0 +1,136 @@
import cv2
import numpy as np

from modelscope.outputs import OutputKeys
from modelscope.preprocessors.image import load_image


def numpy_to_cv2img(img_array):
"""to convert a np.array with shape(h, w) to cv2 img

Args:
img_array (np.array): input data

Returns:
cv2 img
"""
img_array = (img_array - img_array.min()) / (
img_array.max() - img_array.min() + 1e-5)
img_array = (img_array * 255).astype(np.uint8)
img_array = cv2.applyColorMap(img_array, cv2.COLORMAP_JET)
return img_array


def draw_joints(image, np_kps, score, threshold=0.2):
lst_parent_ids_17 = [0, 0, 0, 1, 2, 0, 0, 5, 6, 7, 8, 5, 6, 11, 12, 13, 14]
lst_left_ids_17 = [1, 3, 5, 7, 9, 11, 13, 15]
lst_right_ids_17 = [2, 4, 6, 8, 10, 12, 14, 16]

lst_parent_ids_15 = [0, 0, 1, 2, 3, 1, 5, 6, 14, 8, 9, 14, 11, 12, 1]
lst_left_ids_15 = [2, 3, 4, 8, 9, 10]
lst_right_ids_15 = [5, 6, 7, 11, 12, 13]

if np_kps.shape[0] == 17:
lst_parent_ids = lst_parent_ids_17
lst_left_ids = lst_left_ids_17
lst_right_ids = lst_right_ids_17

elif np_kps.shape[0] == 15:
lst_parent_ids = lst_parent_ids_15
lst_left_ids = lst_left_ids_15
lst_right_ids = lst_right_ids_15

for i in range(len(lst_parent_ids)):
pid = lst_parent_ids[i]
if i == pid:
continue

if (score[i] < threshold or score[1] < threshold):
continue

if i in lst_left_ids and pid in lst_left_ids:
color = (0, 255, 0)
elif i in lst_right_ids and pid in lst_right_ids:
color = (255, 0, 0)
else:
color = (0, 255, 255)

cv2.line(image, (int(np_kps[i, 0]), int(np_kps[i, 1])),
(int(np_kps[pid][0]), int(np_kps[pid, 1])), color, 3)

for i in range(np_kps.shape[0]):
if score[i] < threshold:
continue
cv2.circle(image, (int(np_kps[i, 0]), int(np_kps[i, 1])), 5,
(0, 0, 255), -1)


def draw_box(image, box):
cv2.rectangle(image, (int(box[0][0]), int(box[0][1])),
(int(box[1][0]), int(box[1][1])), (0, 0, 255), 2)


def draw_keypoints(output, original_image):
poses = np.array(output[OutputKeys.POSES])
scores = np.array(output[OutputKeys.SCORES])
boxes = np.array(output[OutputKeys.BOXES])
assert len(poses) == len(scores) and len(poses) == len(boxes)
image = cv2.imread(original_image, -1)
for i in range(len(poses)):
draw_box(image, np.array(boxes[i]))
draw_joints(image, np.array(poses[i]), np.array(scores[i]))
return image


def draw_face_detection_result(img_path, detection_result):
bboxes = np.array(detection_result[OutputKeys.BOXES])
kpss = np.array(detection_result[OutputKeys.KEYPOINTS])
scores = np.array(detection_result[OutputKeys.SCORES])
img = cv2.imread(img_path)
assert img is not None, f"Can't read img: {img_path}"
for i in range(len(scores)):
bbox = bboxes[i].astype(np.int32)
kps = kpss[i].reshape(-1, 2).astype(np.int32)
score = scores[i]
x1, y1, x2, y2 = bbox
cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
for kp in kps:
cv2.circle(img, tuple(kp), 1, (0, 0, 255), 1)
cv2.putText(
img,
f'{score:.2f}', (x1, y2),
1,
1.0, (0, 255, 0),
thickness=1,
lineType=8)
print(f'Found {len(scores)} faces')
return img


def created_boxed_image(image_in, box):
image = load_image(image_in)
img = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR)
cv2.rectangle(img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])),
(0, 255, 0), 3)
return image


def show_video_tracking_result(video_in_path, bboxes, video_save_path):
cap = cv2.VideoCapture(video_in_path)
for i in range(len(bboxes)):
box = bboxes[i]
success, frame = cap.read()
if success is False:
raise Exception(video_in_path,
' can not be correctly decoded by OpenCV.')
if i == 0:
size = (frame.shape[1], frame.shape[0])
fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
video_writer = cv2.VideoWriter(video_save_path, fourcc,
cap.get(cv2.CAP_PROP_FPS), size,
True)
cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0),
5)
video_writer.write(frame)
video_writer.release
cap.release()

+ 43
- 0
modelscope/utils/nlp/nlp_utils.py View File

@@ -0,0 +1,43 @@
from typing import List

from modelscope.outputs import OutputKeys
from modelscope.pipelines.nlp import (ConversationalTextToSqlPipeline,
DialogStateTrackingPipeline)


def text2sql_tracking_and_print_results(
test_case, pipelines: List[ConversationalTextToSqlPipeline]):
for p in pipelines:
last_sql, history = '', []
for item in test_case['utterance']:
case = {
'utterance': item,
'history': history,
'last_sql': last_sql,
'database_id': test_case['database_id'],
'local_db_path': test_case['local_db_path']
}
results = p(case)
print({'question': item})
print(results)
last_sql = results['text']
history.append(item)


def tracking_and_print_dialog_states(
test_case, pipelines: List[DialogStateTrackingPipeline]):
import json
pipelines_len = len(pipelines)
history_states = [{}]
utter = {}
for step, item in enumerate(test_case):
utter.update(item)
result = pipelines[step % pipelines_len]({
'utter':
utter,
'history_states':
history_states
})
print(json.dumps(result))

history_states.extend([result[OutputKeys.OUTPUT], {}])

+ 0
- 17
tests/pipelines/test_action_recognition.py View File

@@ -15,23 +15,6 @@ class ActionRecognitionTest(unittest.TestCase):
def setUp(self) -> None:
self.model_id = 'damo/cv_TAdaConv_action-recognition'

@unittest.skip('deprecated, download model from model hub instead')
def test_run_with_direct_file_download(self):
model_path = 'https://aquila2-online-models.oss-cn-shanghai.aliyuncs.com/maas_test/pytorch_model.pt'
config_path = 'https://aquila2-online-models.oss-cn-shanghai.aliyuncs.com/maas_test/configuration.json'
with tempfile.TemporaryDirectory() as tmp_dir:
model_file = osp.join(tmp_dir, ModelFile.TORCH_MODEL_FILE)
with open(model_file, 'wb') as ofile1:
ofile1.write(File.read(model_path))
config_file = osp.join(tmp_dir, ModelFile.CONFIGURATION)
with open(config_file, 'wb') as ofile2:
ofile2.write(File.read(config_path))
recognition_pipeline = pipeline(
Tasks.action_recognition, model=tmp_dir)
result = recognition_pipeline(
'data/test/videos/action_recognition_test_video.mp4')
print(f'recognition output: {result}.')

@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_modelhub(self):
recognition_pipeline = pipeline(


+ 2
- 59
tests/pipelines/test_body_2d_keypoints.py View File

@@ -9,59 +9,9 @@ from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.pipelines.base import Pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.cv.image_utils import draw_keypoints
from modelscope.utils.test_utils import test_level

lst_parent_ids_17 = [0, 0, 0, 1, 2, 0, 0, 5, 6, 7, 8, 5, 6, 11, 12, 13, 14]
lst_left_ids_17 = [1, 3, 5, 7, 9, 11, 13, 15]
lst_right_ids_17 = [2, 4, 6, 8, 10, 12, 14, 16]
lst_spine_ids_17 = [0]

lst_parent_ids_15 = [0, 0, 1, 2, 3, 1, 5, 6, 14, 8, 9, 14, 11, 12, 1]
lst_left_ids_15 = [2, 3, 4, 8, 9, 10]
lst_right_ids_15 = [5, 6, 7, 11, 12, 13]
lst_spine_ids_15 = [0, 1, 14]


def draw_joints(image, np_kps, score, threshold=0.2):
if np_kps.shape[0] == 17:
lst_parent_ids = lst_parent_ids_17
lst_left_ids = lst_left_ids_17
lst_right_ids = lst_right_ids_17

elif np_kps.shape[0] == 15:
lst_parent_ids = lst_parent_ids_15
lst_left_ids = lst_left_ids_15
lst_right_ids = lst_right_ids_15

for i in range(len(lst_parent_ids)):
pid = lst_parent_ids[i]
if i == pid:
continue

if (score[i] < threshold or score[1] < threshold):
continue

if i in lst_left_ids and pid in lst_left_ids:
color = (0, 255, 0)
elif i in lst_right_ids and pid in lst_right_ids:
color = (255, 0, 0)
else:
color = (0, 255, 255)

cv2.line(image, (int(np_kps[i, 0]), int(np_kps[i, 1])),
(int(np_kps[pid][0]), int(np_kps[pid, 1])), color, 3)

for i in range(np_kps.shape[0]):
if score[i] < threshold:
continue
cv2.circle(image, (int(np_kps[i, 0]), int(np_kps[i, 1])), 5,
(0, 0, 255), -1)


def draw_box(image, box):
cv2.rectangle(image, (int(box[0][0]), int(box[0][1])),
(int(box[1][0]), int(box[1][1])), (0, 0, 255), 2)


class Body2DKeypointsTest(unittest.TestCase):

@@ -71,14 +21,7 @@ class Body2DKeypointsTest(unittest.TestCase):

def pipeline_inference(self, pipeline: Pipeline, pipeline_input):
output = pipeline(pipeline_input)
poses = np.array(output[OutputKeys.POSES])
scores = np.array(output[OutputKeys.SCORES])
boxes = np.array(output[OutputKeys.BOXES])
assert len(poses) == len(scores) and len(poses) == len(boxes)
image = cv2.imread(self.test_image, -1)
for i in range(len(poses)):
draw_box(image, np.array(boxes[i]))
draw_joints(image, np.array(poses[i]), np.array(scores[i]))
image = draw_keypoints(output, self.test_image)
cv2.imwrite('pose_keypoint.jpg', image)

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')


+ 5
- 22
tests/pipelines/test_conversational_text_to_sql.py View File

@@ -9,6 +9,7 @@ from modelscope.pipelines import pipeline
from modelscope.pipelines.nlp import ConversationalTextToSqlPipeline
from modelscope.preprocessors import ConversationalTextToSqlPreprocessor
from modelscope.utils.constant import Tasks
from modelscope.utils.nlp.nlp_utils import text2sql_tracking_and_print_results
from modelscope.utils.test_utils import test_level


@@ -25,24 +26,6 @@ class ConversationalTextToSql(unittest.TestCase):
]
}

def tracking_and_print_results(
self, pipelines: List[ConversationalTextToSqlPipeline]):
for my_pipeline in pipelines:
last_sql, history = '', []
for item in self.test_case['utterance']:
case = {
'utterance': item,
'history': history,
'last_sql': last_sql,
'database_id': self.test_case['database_id'],
'local_db_path': self.test_case['local_db_path']
}
results = my_pipeline(case)
print({'question': item})
print(results)
last_sql = results['text']
history.append(item)

@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_by_direct_model_download(self):
cache_path = snapshot_download(self.model_id)
@@ -61,7 +44,7 @@ class ConversationalTextToSql(unittest.TestCase):
model=model,
preprocessor=preprocessor)
]
self.tracking_and_print_results(pipelines)
text2sql_tracking_and_print_results(self.test_case, pipelines)

@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_with_model_from_modelhub(self):
@@ -77,7 +60,7 @@ class ConversationalTextToSql(unittest.TestCase):
model=model,
preprocessor=preprocessor)
]
self.tracking_and_print_results(pipelines)
text2sql_tracking_and_print_results(self.test_case, pipelines)

@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_with_model_name(self):
@@ -85,12 +68,12 @@ class ConversationalTextToSql(unittest.TestCase):
pipeline(
task=Tasks.conversational_text_to_sql, model=self.model_id)
]
self.tracking_and_print_results(pipelines)
text2sql_tracking_and_print_results(self.test_case, pipelines)

@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_with_default_model(self):
pipelines = [pipeline(task=Tasks.conversational_text_to_sql)]
self.tracking_and_print_results(pipelines)
text2sql_tracking_and_print_results(self.test_case, pipelines)


if __name__ == '__main__':


+ 1
- 2
tests/pipelines/test_crowd_counting.py View File

@@ -2,13 +2,12 @@
import unittest

import cv2
import numpy as np
from PIL import Image

from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.cv.heatmap import numpy_to_cv2img
from modelscope.utils.cv.image_utils import numpy_to_cv2img
from modelscope.utils.logger import get_logger
from modelscope.utils.test_utils import test_level



+ 4
- 23
tests/pipelines/test_dialog_state_tracking.py View File

@@ -1,15 +1,14 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import unittest
from typing import List

from modelscope.hub.snapshot_download import snapshot_download
from modelscope.models import Model
from modelscope.models.nlp import SpaceForDialogStateTracking
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.pipelines.nlp import DialogStateTrackingPipeline
from modelscope.preprocessors import DialogStateTrackingPreprocessor
from modelscope.utils.constant import Tasks
from modelscope.utils.nlp.nlp_utils import tracking_and_print_dialog_states
from modelscope.utils.test_utils import test_level


@@ -79,24 +78,6 @@ class DialogStateTrackingTest(unittest.TestCase):
'User-8': 'Thank you, goodbye',
}]

def tracking_and_print_dialog_states(
self, pipelines: List[DialogStateTrackingPipeline]):
import json
pipelines_len = len(pipelines)
history_states = [{}]
utter = {}
for step, item in enumerate(self.test_case):
utter.update(item)
result = pipelines[step % pipelines_len]({
'utter':
utter,
'history_states':
history_states
})
print(json.dumps(result))

history_states.extend([result[OutputKeys.OUTPUT], {}])

@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_by_direct_model_download(self):
cache_path = snapshot_download(self.model_id, revision='update')
@@ -111,7 +92,7 @@ class DialogStateTrackingTest(unittest.TestCase):
model=model,
preprocessor=preprocessor)
]
self.tracking_and_print_dialog_states(pipelines)
tracking_and_print_dialog_states(pipelines)

@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_with_model_from_modelhub(self):
@@ -128,7 +109,7 @@ class DialogStateTrackingTest(unittest.TestCase):
preprocessor=preprocessor)
]

self.tracking_and_print_dialog_states(pipelines)
tracking_and_print_dialog_states(pipelines)

@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_with_model_name(self):
@@ -138,7 +119,7 @@ class DialogStateTrackingTest(unittest.TestCase):
model=self.model_id,
model_revision='update')
]
self.tracking_and_print_dialog_states(pipelines)
tracking_and_print_dialog_states(pipelines)


if __name__ == '__main__':


+ 7
- 35
tests/pipelines/test_face_detection.py View File

@@ -9,6 +9,7 @@ from modelscope.msdatasets import MsDataset
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.cv.image_utils import draw_face_detection_result
from modelscope.utils.test_utils import test_level


@@ -17,46 +18,21 @@ class FaceDetectionTest(unittest.TestCase):
def setUp(self) -> None:
self.model_id = 'damo/cv_resnet_facedetection_scrfd10gkps'

def show_result(self, img_path, bboxes, kpss, scores):
bboxes = np.array(bboxes)
kpss = np.array(kpss)
scores = np.array(scores)
img = cv2.imread(img_path)
assert img is not None, f"Can't read img: {img_path}"
for i in range(len(scores)):
bbox = bboxes[i].astype(np.int32)
kps = kpss[i].reshape(-1, 2).astype(np.int32)
score = scores[i]
x1, y1, x2, y2 = bbox
cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
for kp in kps:
cv2.circle(img, tuple(kp), 1, (0, 0, 255), 1)
cv2.putText(
img,
f'{score:.2f}', (x1, y2),
1,
1.0, (0, 255, 0),
thickness=1,
lineType=8)
def show_result(self, img_path, detection_result):
img = draw_face_detection_result(img_path, detection_result)
cv2.imwrite('result.png', img)
print(
f'Found {len(scores)} faces, output written to {osp.abspath("result.png")}'
)
print(f'output written to {osp.abspath("result.png")}')

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_run_with_dataset(self):
input_location = ['data/test/images/face_detection.png']
# alternatively:
# input_location = '/dir/to/images'

dataset = MsDataset.load(input_location, target='image')
face_detection = pipeline(Tasks.face_detection, model=self.model_id)
# note that for dataset output, the inference-output is a Generator that can be iterated.
result = face_detection(dataset)
result = next(result)
self.show_result(input_location[0], result[OutputKeys.BOXES],
result[OutputKeys.KEYPOINTS],
result[OutputKeys.SCORES])
self.show_result(input_location[0], result)

@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_modelhub(self):
@@ -64,18 +40,14 @@ class FaceDetectionTest(unittest.TestCase):
img_path = 'data/test/images/face_detection.png'

result = face_detection(img_path)
self.show_result(img_path, result[OutputKeys.BOXES],
result[OutputKeys.KEYPOINTS],
result[OutputKeys.SCORES])
self.show_result(img_path, result)

@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_modelhub_default_model(self):
face_detection = pipeline(Tasks.face_detection)
img_path = 'data/test/images/face_detection.png'
result = face_detection(img_path)
self.show_result(img_path, result[OutputKeys.BOXES],
result[OutputKeys.KEYPOINTS],
result[OutputKeys.SCORES])
self.show_result(img_path, result)


if __name__ == '__main__':


+ 0
- 1
tests/pipelines/test_face_image_generation.py View File

@@ -1,5 +1,4 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import os.path as osp
import unittest



+ 0
- 1
tests/pipelines/test_face_recognition.py View File

@@ -21,7 +21,6 @@ class FaceRecognitionTest(unittest.TestCase):

face_recognition = pipeline(
Tasks.face_recognition, model=self.model_id)
# note that for dataset output, the inference-output is a Generator that can be iterated.
emb1 = face_recognition(img1)[OutputKeys.IMG_EMBEDDING]
emb2 = face_recognition(img2)[OutputKeys.IMG_EMBEDDING]
sim = np.dot(emb1[0], emb2[0])


+ 3
- 2
tests/pipelines/test_image2image_generation.py View File

@@ -3,6 +3,7 @@ import unittest

from torchvision.utils import save_image

from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.test_utils import test_level
@@ -27,13 +28,13 @@ class Image2ImageGenerationTest(unittest.TestCase):
result2 = img2img_gen_pipeline(('data/test/images/img2img_input.jpg',
'data/test/images/img2img_style.jpg'))
save_image(
result1['output_img'].clamp(-1, 1),
result1[OutputKeys.OUTPUT_IMG].clamp(-1, 1),
'result1.jpg',
range=(-1, 1),
normalize=True,
nrow=4)
save_image(
result2['output_img'].clamp(-1, 1),
result2[OutputKeys.OUTPUT_IMG].clamp(-1, 1),
'result2.jpg',
range=(-1, 1),
normalize=True,


+ 0
- 13
tests/pipelines/test_image_matting.py View File

@@ -18,19 +18,6 @@ class ImageMattingTest(unittest.TestCase):
def setUp(self) -> None:
self.model_id = 'damo/cv_unet_image-matting'

@unittest.skip('deprecated, download model from model hub instead')
def test_run_with_direct_file_download(self):
model_path = 'http://pai-vision-data-hz.oss-cn-zhangjiakou.aliyuncs' \
'.com/data/test/maas/image_matting/matting_person.pb'
with tempfile.TemporaryDirectory() as tmp_dir:
model_file = osp.join(tmp_dir, ModelFile.TF_GRAPH_FILE)
with open(model_file, 'wb') as ofile:
ofile.write(File.read(model_path))
img_matting = pipeline(Tasks.portrait_matting, model=tmp_dir)

result = img_matting('data/test/images/image_matting.png')
cv2.imwrite('result.png', result[OutputKeys.OUTPUT_IMG])

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_run_with_dataset(self):
input_location = ['data/test/images/image_matting.png']


+ 1
- 1
tests/pipelines/test_image_style_transfer.py View File

@@ -15,7 +15,7 @@ class ImageStyleTransferTest(unittest.TestCase):
def setUp(self) -> None:
self.model_id = 'damo/cv_aams_style-transfer_damo'

@unittest.skip('deprecated, download model from model hub instead')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_by_direct_model_download(self):
snapshot_path = snapshot_download(self.model_id)
print('snapshot_path: {}'.format(snapshot_path))


+ 0
- 1
tests/pipelines/test_key_word_spotting_farfield.py View File

@@ -1,7 +1,6 @@
import os.path
import unittest

from modelscope.fileio import File
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.test_utils import test_level


+ 4
- 7
tests/pipelines/test_ofa_tasks.py View File

@@ -4,14 +4,13 @@ import unittest
from os import path as osp

import cv2
import numpy as np
from PIL import Image

from modelscope.models import Model
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.preprocessors.image import load_image
from modelscope.utils.constant import Tasks
from modelscope.utils.cv.image_utils import created_boxed_image
from modelscope.utils.test_utils import test_level


@@ -22,11 +21,9 @@ class OfaTasksTest(unittest.TestCase):
os.makedirs(self.output_dir, exist_ok=True)

def save_img(self, image_in, box, image_out):
image = load_image(image_in)
img = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR)
cv2.rectangle(img, (int(box[0]), int(box[1])),
(int(box[2]), int(box[3])), (0, 255, 0), 3)
cv2.imwrite(osp.join(self.output_dir, image_out), img)
cv2.imwrite(
osp.join(self.output_dir, image_out),
created_boxed_image(image_in, box))

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_run_with_image_captioning_with_model(self):


+ 0
- 13
tests/pipelines/test_person_image_cartoon.py View File

@@ -24,19 +24,6 @@ class ImageCartoonTest(unittest.TestCase):
cv2.imwrite('result.png', result[OutputKeys.OUTPUT_IMG])
print(f'Output written to {osp.abspath("result.png")}')

@unittest.skip('deprecated, download model from model hub instead')
def test_run_by_direct_model_download(self):
model_dir = './assets'
if not os.path.exists(model_dir):
os.system(
'wget https://invi-label.oss-cn-shanghai.aliyuncs.com/label/model/cartoon/assets.zip'
)
os.system('unzip assets.zip')

img_cartoon = pipeline(
Tasks.image_portrait_stylization, model=model_dir)
self.pipeline_inference(img_cartoon, self.test_image)

@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_modelhub(self):
img_cartoon = pipeline(


+ 1
- 2
tests/pipelines/test_skin_retouching.py View File

@@ -23,10 +23,9 @@ class SkinRetouchingTest(unittest.TestCase):
cv2.imwrite('result_skinretouching.png', result[OutputKeys.OUTPUT_IMG])
print(f'Output written to {osp.abspath("result_skinretouching.png")}')

@unittest.skip('deprecated, download model from model hub instead')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_by_direct_model_download(self):
model_dir = snapshot_download(self.model_id)

skin_retouching = pipeline(Tasks.skin_retouching, model=model_dir)
self.pipeline_inference(skin_retouching, self.test_image)



+ 3
- 4
tests/pipelines/test_video_single_object_tracking.py View File

@@ -1,11 +1,10 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import unittest

from modelscope.models.cv.video_single_object_tracking.utils.utils import \
show_tracking_result
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.cv.image_utils import show_video_tracking_result
from modelscope.utils.test_utils import test_level


@@ -22,8 +21,8 @@ class SingleObjectTracking(unittest.TestCase):
init_bbox = [414, 343, 514, 449] # [x1, y1, x2, y2]
result = video_single_object_tracking((video_path, init_bbox))
print('result is : ', result[OutputKeys.BOXES])
show_tracking_result(video_path, result[OutputKeys.BOXES],
'./tracking_result.avi')
show_video_tracking_result(video_path, result[OutputKeys.BOXES],
'./tracking_result.avi')

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_run_modelhub_default_model(self):


Loading…
Cancel
Save