Browse Source

[to #42322933]MsDataset upload and load supports directory.

上传和下载支持多文件操作
master
xingjun.wxj yingda.chen 3 years ago
parent
commit
1b4d5ccb9c
9 changed files with 250 additions and 51 deletions
  1. +22
    -12
      modelscope/hub/api.py
  2. +1
    -7
      modelscope/hub/utils/utils.py
  3. +37
    -15
      modelscope/msdatasets/ms_dataset.py
  4. +88
    -2
      modelscope/msdatasets/utils/dataset_utils.py
  5. +8
    -10
      modelscope/msdatasets/utils/download_utils.py
  6. +7
    -2
      modelscope/msdatasets/utils/oss_utils.py
  7. +39
    -1
      modelscope/msdatasets/utils/upload_utils.py
  8. +7
    -0
      modelscope/utils/constant.py
  9. +41
    -2
      tests/msdatasets/test_dataset_upload.py

+ 22
- 12
modelscope/hub/api.py View File

@@ -26,18 +26,15 @@ from modelscope.utils.logger import get_logger
from .errors import (InvalidParameter, NotExistError, RequestError,
datahub_raise_on_error, handle_http_post_error,
handle_http_response, is_ok, raise_on_error)
from .utils.utils import (get_dataset_hub_endpoint, get_endpoint,
model_id_to_group_owner_name)
from .utils.utils import get_endpoint, model_id_to_group_owner_name

logger = get_logger()


class HubApi:

def __init__(self, endpoint=None, dataset_endpoint=None):
def __init__(self, endpoint=None):
self.endpoint = endpoint if endpoint is not None else get_endpoint()
self.dataset_endpoint = dataset_endpoint if dataset_endpoint is not None else get_dataset_hub_endpoint(
)

def login(
self,
@@ -288,7 +285,7 @@ class HubApi:
return files

def list_datasets(self):
path = f'{self.dataset_endpoint}/api/v1/datasets'
path = f'{self.endpoint}/api/v1/datasets'
headers = None
params = {}
r = requests.get(path, params=params, headers=headers)
@@ -315,13 +312,13 @@ class HubApi:
cache_dir):
shutil.rmtree(cache_dir)
os.makedirs(cache_dir, exist_ok=True)
datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}'
datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}'
r = requests.get(datahub_url)
resp = r.json()
datahub_raise_on_error(datahub_url, resp)
dataset_id = resp['Data']['Id']
dataset_type = resp['Data']['Type']
datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{dataset_id}/repo/tree?Revision={revision}'
datahub_url = f'{self.endpoint}/api/v1/datasets/{dataset_id}/repo/tree?Revision={revision}'
r = requests.get(datahub_url)
resp = r.json()
datahub_raise_on_error(datahub_url, resp)
@@ -339,7 +336,7 @@ class HubApi:
file_path = file_info['Path']
extension = os.path.splitext(file_path)[-1]
if extension in dataset_meta_format:
datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \
datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \
f'Revision={revision}&FilePath={file_path}'
r = requests.get(datahub_url)
r.raise_for_status()
@@ -363,7 +360,7 @@ class HubApi:
namespace: str,
revision: Optional[str] = DEFAULT_DATASET_REVISION):
if file_name.endswith('.csv'):
file_name = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \
file_name = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \
f'Revision={revision}&FilePath={file_name}'
return file_name

@@ -372,7 +369,7 @@ class HubApi:
dataset_name: str,
namespace: str,
revision: Optional[str] = DEFAULT_DATASET_REVISION):
datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \
datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \
f'ststoken?Revision={revision}'
return self.datahub_remote_call(datahub_url)

@@ -383,7 +380,7 @@ class HubApi:
namespace: str,
revision: Optional[str] = DEFAULT_DATASET_REVISION):

datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \
datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \
f'ststoken?Revision={revision}'

cookies = requests.utils.dict_from_cookiejar(cookies)
@@ -392,6 +389,19 @@ class HubApi:
raise_on_error(resp)
return resp['Data']

def list_oss_dataset_objects(self, dataset_name, namespace, max_limit,
is_recursive, is_filter_dir, revision,
cookies):
url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/oss/tree/?' \
f'MaxLimit={max_limit}&Revision={revision}&Recursive={is_recursive}&FilterDir={is_filter_dir}'
cookies = requests.utils.dict_from_cookiejar(cookies)

resp = requests.get(url=url, cookies=cookies)
resp = resp.json()
raise_on_error(resp)
resp = resp['Data']
return resp

def on_dataset_download(self, dataset_name: str, namespace: str) -> None:
url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/download/increase'
r = requests.post(url)


+ 1
- 7
modelscope/hub/utils/utils.py View File

@@ -4,8 +4,7 @@ import hashlib
import os
from typing import Optional

from modelscope.hub.constants import (DEFAULT_MODELSCOPE_DATA_ENDPOINT,
DEFAULT_MODELSCOPE_DOMAIN,
from modelscope.hub.constants import (DEFAULT_MODELSCOPE_DOMAIN,
DEFAULT_MODELSCOPE_GROUP,
MODEL_ID_SEPARATOR,
MODELSCOPE_URL_SCHEME)
@@ -44,11 +43,6 @@ def get_endpoint():
return MODELSCOPE_URL_SCHEME + modelscope_domain


def get_dataset_hub_endpoint():
return os.environ.get('HUB_DATASET_ENDPOINT',
DEFAULT_MODELSCOPE_DATA_ENDPOINT)


def compute_hash(file_path):
BUFFER_SIZE = 1024 * 64 # 64k buffer size
sha256_hash = hashlib.sha256()


+ 37
- 15
modelscope/msdatasets/ms_dataset.py View File

@@ -1,6 +1,5 @@
# Copyright (c) Alibaba, Inc. and its affiliates.

import math
import os
from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional,
Sequence, Union)
@@ -17,19 +16,18 @@ from datasets.utils.file_utils import (is_relative_path,
relative_to_absolute_path)

from modelscope.hub.repository import DatasetRepository
from modelscope.msdatasets.task_datasets.builder import build_task_dataset
from modelscope.msdatasets.utils.dataset_builder import ExternalDataset
from modelscope.msdatasets.utils.dataset_utils import (
get_dataset_files, get_target_dataset_structure, load_dataset_builder)
from modelscope.msdatasets.utils.download_utils import DatasetDownloadManager
from modelscope.msdatasets.utils.upload_utils import DatasetUploadManager
from modelscope.utils.config import ConfigDict
from modelscope.utils.config_ds import MS_DATASETS_CACHE
from modelscope.utils.constant import (DEFAULT_DATASET_NAMESPACE,
DEFAULT_DATASET_REVISION,
DatasetFormations, DownloadMode, Hubs)
from modelscope.utils.logger import get_logger
from .task_datasets.builder import build_task_dataset
from .utils.dataset_builder import ExternalDataset
from .utils.dataset_utils import (get_dataset_files,
get_target_dataset_structure,
load_dataset_builder)
from .utils.download_utils import DatasetDownloadManager
from .utils.upload_utils import DatasetUploadManager

logger = get_logger()

@@ -234,7 +232,6 @@ class MsDataset:
# dataset organized to be compatible with hf format
if dataset_formation == DatasetFormations.hf_compatible:
dataset_name = dataset_scripts['.py'][0]
download_dataset = dataset_name
else:
raise FileNotFoundError(
f"Couldn't find a dataset script at {relative_to_absolute_path(dataset_name)} "
@@ -270,7 +267,8 @@ class MsDataset:
raise TypeError('path must be a str or a list, but got'
f' {type(dataset_name)}')

if download_dataset:
is_ci_test = os.getenv('CI_TEST') == 'True'
if download_dataset and not is_ci_test:
try:
api.on_dataset_download(
dataset_name=download_dataset, namespace=namespace)
@@ -570,15 +568,26 @@ class MsDataset:
local_file_path: str,
dataset_name: str,
namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE,
version: Optional[str] = DEFAULT_DATASET_REVISION) -> None:
"""Upload dataset file to the ModelScope Hub. Please login to the ModelScope Hub first.
version: Optional[str] = DEFAULT_DATASET_REVISION,
num_processes: Optional[int] = None,
chunksize: Optional[int] = 1,
filter_hidden_files: Optional[bool] = True) -> None:
"""Upload dataset file or directory to the ModelScope Hub. Please login to the ModelScope Hub first.

Args:
object_name (str): The object name on ModelScope, in the form of your-dataset-name.zip
local_file_path (str): Local file to upload
object_name (str): The object name on ModelScope, in the form of your-dataset-name.zip or your-dataset-name
local_file_path (str): Local file or directory to upload
dataset_name (str): Name of the dataset
namespace(str, optional): Namespace of the dataset
version: Optional[str]: Version of the dataset
num_processes: Optional[int]: The number of processes used for multi-process uploading.
This is only applicable when local_file_path is a directory, and we are uploading mutliple-files
insided the directory. When None provided, the number returned by os.cpu_count() is used as default.
chunksize: Optional[int]: The chunksize of objects to upload.
For very long iterables using a large value for chunksize can make the job complete much faster than
using the default value of 1. Available if local_file_path is a directory.
filter_hidden_files: Optional[bool]: Whether to filter hidden files.
Available if local_file_path is a directory.

Returns:
None
@@ -586,7 +595,20 @@ class MsDataset:
"""
_upload_manager = DatasetUploadManager(
dataset_name=dataset_name, namespace=namespace, version=version)
_upload_manager.upload(object_name, local_file_path)

if os.path.isfile(local_file_path):
_upload_manager.upload(
object_name=object_name, local_file_path=local_file_path)
elif os.path.isdir(local_file_path):
_upload_manager.upload_dir(
object_dir_name=object_name,
local_dir_path=local_file_path,
num_processes=num_processes,
chunksize=chunksize,
filter_hidden_files=filter_hidden_files)
else:
raise ValueError(
f'{local_file_path} is not a valid file path or directory')

@staticmethod
def clone_meta(dataset_work_dir: str,


+ 88
- 2
modelscope/msdatasets/utils/dataset_utils.py View File

@@ -6,7 +6,8 @@ from typing import Any, Mapping, Optional, Sequence, Union

from datasets.builder import DatasetBuilder

from modelscope.utils.constant import DEFAULT_DATASET_REVISION
from modelscope.hub.api import HubApi
from modelscope.utils.constant import DEFAULT_DATASET_REVISION, DownloadParams
from modelscope.utils.logger import get_logger
from .dataset_builder import MsCsvDatasetBuilder, TaskSpecificDatasetBuilder

@@ -77,6 +78,81 @@ def get_target_dataset_structure(dataset_structure: dict,
return target_subset_name, target_dataset_structure


def list_dataset_objects(hub_api: HubApi, max_limit: int, is_recursive: bool,
dataset_name: str, namespace: str,
version: str) -> list:
"""
List all of objects for specific dataset.

Args:
hub_api (class HubApi): HubApi instance.
max_limit (int): Max number of objects.
is_recursive (bool): Whether to list objects recursively.
dataset_name (str): Dataset name.
namespace (str): Namespace.
version (str): Dataset version.
Returns:
res (list): List of objects, i.e., ['train/images/001.png', 'train/images/002.png', 'val/images/001.png', ...]
"""
res = []
cookies = hub_api.check_cookies_upload_data(use_cookies=True)
objects = hub_api.list_oss_dataset_objects(
dataset_name=dataset_name,
namespace=namespace,
max_limit=max_limit,
is_recursive=is_recursive,
is_filter_dir=True,
revision=version,
cookies=cookies)

for item in objects:
object_key = item.get('Key')
res.append(object_key)

return res


def contains_dir(file_map) -> bool:
"""
To check whether input contains at least one directory.

Args:
file_map (dict): Structure of data files. e.g., {'train': 'train.zip', 'validation': 'val.zip'}
Returns:
True if input contains at least one directory, False otherwise.
"""
res = False
for k, v in file_map.items():
if isinstance(v, str) and not v.endswith('.zip'):
res = True
break
return res


def get_split_objects_map(file_map, objects):
"""
Get the map between dataset split and oss objects.

Args:
file_map (dict): Structure of data files. e.g., {'train': 'train', 'validation': 'val'}, both of train and val
are dirs.
objects (list): List of oss objects. e.g., ['train/001/1_123.png', 'train/001/1_124.png', 'val/003/3_38.png']
Returns:
A map of split-objects. e.g., {'train': ['train/001/1_123.png', 'train/001/1_124.png'],
'validation':['val/003/3_38.png']}
"""
res = {}
for k, v in file_map.items():
res[k] = []

for obj_key in objects:
for k, v in file_map.items():
if obj_key.startswith(v):
res[k].append(obj_key)

return res


def get_dataset_files(subset_split_into: dict,
dataset_name: str,
namespace: str,
@@ -95,14 +171,24 @@ def get_dataset_files(subset_split_into: dict,
meta_map = defaultdict(dict)
file_map = defaultdict(dict)
args_map = defaultdict(dict)
from modelscope.hub.api import HubApi
modelscope_api = HubApi()
objects = list_dataset_objects(
hub_api=modelscope_api,
max_limit=DownloadParams.MAX_LIST_OBJECTS_NUM.value,
is_recursive=True,
dataset_name=dataset_name,
namespace=namespace,
version=revision)

for split, info in subset_split_into.items():
meta_map[split] = modelscope_api.get_dataset_file_url(
info.get('meta', ''), dataset_name, namespace, revision)
if info.get('file'):
file_map[split] = info['file']
args_map[split] = info.get('args')

if contains_dir(file_map):
file_map = get_split_objects_map(file_map, objects)
return meta_map, file_map, args_map




+ 8
- 10
modelscope/msdatasets/utils/download_utils.py View File

@@ -10,16 +10,14 @@ from .oss_utils import OssUtilities

class DatasetDownloadManager(DownloadManager):

def __init__(
self,
dataset_name: str,
namespace: str,
version: str,
data_dir: Optional[str] = None,
download_config: Optional[DownloadConfig] = None,
base_path: Optional[str] = None,
record_checksums=True,
):
def __init__(self,
dataset_name: str,
namespace: str,
version: str,
data_dir: Optional[str] = None,
download_config: Optional[DownloadConfig] = None,
base_path: Optional[str] = None,
record_checksums=True):
super().__init__(dataset_name, data_dir, download_config, base_path,
record_checksums)
self._namespace = namespace


+ 7
- 2
modelscope/msdatasets/utils/oss_utils.py View File

@@ -50,11 +50,16 @@ class OssUtilities:
progress_callback=self._percentage)
return local_path

def upload(self, oss_object_name: str, local_file_path: str) -> str:
def upload(self, oss_object_name: str, local_file_path: str,
indicate_individual_progress: bool) -> str:
retry_count = 0
object_key = os.path.join(self.oss_dir, oss_object_name)
resumable_store = oss2.ResumableStore(
root=self.upload_resumable_tmp_store)
if indicate_individual_progress:
progress_callback = self._percentage
else:
progress_callback = None

while True:
try:
@@ -66,7 +71,7 @@ class OssUtilities:
store=resumable_store,
multipart_threshold=self.upload_multipart_threshold,
part_size=self.upload_part_size,
progress_callback=self._percentage,
progress_callback=progress_callback,
num_threads=self.upload_num_threads)
break
except Exception:


+ 39
- 1
modelscope/msdatasets/utils/upload_utils.py View File

@@ -1,5 +1,10 @@
# Copyright (c) Alibaba, Inc. and its affiliates.

import os
from multiprocessing.dummy import Pool as ThreadPool

from tqdm import tqdm

from .oss_utils import OssUtilities


@@ -19,5 +24,38 @@ class DatasetUploadManager(object):

def upload(self, object_name: str, local_file_path: str) -> str:
object_key = self.oss_utilities.upload(
oss_object_name=object_name, local_file_path=local_file_path)
oss_object_name=object_name,
local_file_path=local_file_path,
indicate_individual_progress=True)
return object_key

def upload_dir(self, object_dir_name: str, local_dir_path: str,
num_processes: int, chunksize: int,
filter_hidden_files: bool) -> int:

def run_upload(args):
self.oss_utilities.upload(
oss_object_name=args[0],
local_file_path=args[1],
indicate_individual_progress=False)

files_list = []
for root, dirs, files in os.walk(local_dir_path):
for file_name in files:
if filter_hidden_files and file_name.startswith('.'):
continue
# Concatenate directory name and relative path into a oss object key. e.g., train/001/1_1230.png
object_name = os.path.join(
object_dir_name,
root.replace(local_dir_path, '', 1).strip('/'), file_name)

local_file_path = os.path.join(root, file_name)
files_list.append((object_name, local_file_path))

with ThreadPool(processes=num_processes) as pool:
result = list(
tqdm(
pool.imap(run_upload, files_list, chunksize=chunksize),
total=len(files_list)))

return len(result)

+ 7
- 0
modelscope/utils/constant.py View File

@@ -227,6 +227,13 @@ class DownloadMode(enum.Enum):
FORCE_REDOWNLOAD = 'force_redownload'


class DownloadParams(enum.Enum):
"""
Parameters for downloading dataset.
"""
MAX_LIST_OBJECTS_NUM = 50000


class DatasetFormations(enum.Enum):
""" How a dataset is organized and interpreted
"""


+ 41
- 2
tests/msdatasets/test_dataset_upload.py View File

@@ -6,9 +6,13 @@ import unittest
import zipfile

from modelscope.msdatasets import MsDataset
from modelscope.utils.constant import ModelFile
from modelscope.msdatasets.utils.dataset_utils import list_dataset_objects
from modelscope.utils import logger as logging
from modelscope.utils.constant import DEFAULT_DATASET_REVISION, ModelFile
from modelscope.utils.test_utils import test_level

logger = logging.get_logger(__name__)

KEY_EXTRACTED = 'extracted'


@@ -39,7 +43,8 @@ class DatasetUploadTest(unittest.TestCase):
def tearDown(self):
os.chdir(self.old_dir)
shutil.rmtree(self.temp_dir, ignore_errors=True)
print('The test dir successfully removed!')
logger.info(
f'Temporary directory {self.temp_dir} successfully removed!')

@staticmethod
def get_raw_downloaded_file_path(extracted_path):
@@ -68,6 +73,40 @@ class DatasetUploadTest(unittest.TestCase):
dataset_name=self.dataset_name,
namespace=self.namespace)

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ds_upload_dir(self):
ms_ds_train = MsDataset.load(self.prepared_dataset_name, split='train')
config_train = ms_ds_train._hf_ds.config_kwargs
extracted_path_train = config_train.get('split_config').get('train')

MsDataset.upload(
object_name='train',
local_file_path=os.path.join(extracted_path_train,
'Pets/images/train'),
dataset_name=self.dataset_name,
namespace=self.namespace)
MsDataset.upload(
object_name='val',
local_file_path=os.path.join(extracted_path_train,
'Pets/images/val'),
dataset_name=self.dataset_name,
namespace=self.namespace)

objects = list_dataset_objects(
hub_api=self.api,
max_limit=-1,
is_recursive=True,
dataset_name=self.dataset_name,
namespace=self.namespace,
version=DEFAULT_DATASET_REVISION)

logger.info(f'{len(objects)} objects have been uploaded: {objects}')

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ds_download_dir(self):
test_ds = MsDataset.load(self.dataset_name, self.namespace)
assert test_ds.config_kwargs['split_config'].values()

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ds_clone_meta(self):
MsDataset.clone_meta(


Loading…
Cancel
Save