Browse Source

[to #42322933]MsDataset 支持上传数据集压缩包和meta

1. MsDataset支持upload数据文件(压缩包)
2. MsDataset支持clone和upload meta data
3. 使用MsDataset.load()下载数据集,支持web端显示数据集下载计数
        Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/9831232
master
xingjun.wxj yingda.chen 3 years ago
parent
commit
44033290d4
11 changed files with 407 additions and 29 deletions
  1. +1
    -0
      .dev_scripts/dockerci.sh
  2. +29
    -5
      modelscope/hub/api.py
  3. +111
    -9
      modelscope/hub/repository.py
  4. +7
    -1
      modelscope/hub/utils/utils.py
  5. +112
    -5
      modelscope/msdatasets/ms_dataset.py
  6. +26
    -7
      modelscope/msdatasets/utils/oss_utils.py
  7. +23
    -0
      modelscope/msdatasets/utils/upload_utils.py
  8. +0
    -0
      modelscope/utils/config_ds.py
  9. +1
    -0
      modelscope/utils/constant.py
  10. +95
    -0
      tests/msdatasets/test_dataset_upload.py
  11. +2
    -2
      tests/msdatasets/test_ms_dataset.py

+ 1
- 0
.dev_scripts/dockerci.sh View File

@@ -32,6 +32,7 @@ do
-e TEST_ACCESS_TOKEN_CITEST=$TEST_ACCESS_TOKEN_CITEST \
-e TEST_ACCESS_TOKEN_SDKDEV=$TEST_ACCESS_TOKEN_SDKDEV \
-e TEST_LEVEL=$TEST_LEVEL \
-e TEST_UPLOAD_MS_TOKEN=$TEST_UPLOAD_MS_TOKEN \
--workdir=$CODE_DIR_IN_CONTAINER \
--net host \
${IMAGE_NAME}:${IMAGE_VERSION} \


+ 29
- 5
modelscope/hub/api.py View File

@@ -1,7 +1,6 @@
import os
import pickle
import shutil
import subprocess
from collections import defaultdict
from http import HTTPStatus
from http.cookiejar import CookieJar
@@ -16,8 +15,7 @@ from modelscope.hub.constants import (API_RESPONSE_FIELD_DATA,
API_RESPONSE_FIELD_MESSAGE,
API_RESPONSE_FIELD_USERNAME,
DEFAULT_CREDENTIALS_PATH)
from modelscope.msdatasets.config import (DOWNLOADED_DATASETS_PATH,
HUB_DATASET_ENDPOINT)
from modelscope.utils.config_ds import DOWNLOADED_DATASETS_PATH
from modelscope.utils.constant import (DEFAULT_DATASET_REVISION,
DEFAULT_MODEL_REVISION,
DatasetFormations, DatasetMetaFormats,
@@ -26,7 +24,8 @@ from modelscope.utils.logger import get_logger
from .errors import (InvalidParameter, NotExistError, RequestError,
datahub_raise_on_error, handle_http_response, is_ok,
raise_on_error)
from .utils.utils import get_endpoint, model_id_to_group_owner_name
from .utils.utils import (get_dataset_hub_endpoint, get_endpoint,
model_id_to_group_owner_name)

logger = get_logger()

@@ -35,7 +34,8 @@ class HubApi:

def __init__(self, endpoint=None, dataset_endpoint=None):
self.endpoint = endpoint if endpoint is not None else get_endpoint()
self.dataset_endpoint = dataset_endpoint if dataset_endpoint is not None else HUB_DATASET_ENDPOINT
self.dataset_endpoint = dataset_endpoint if dataset_endpoint is not None else get_dataset_hub_endpoint(
)

def login(
self,
@@ -376,6 +376,27 @@ class HubApi:
f'ststoken?Revision={revision}'
return self.datahub_remote_call(datahub_url)

def get_dataset_access_config_session(
self,
cookies: CookieJar,
dataset_name: str,
namespace: str,
revision: Optional[str] = DEFAULT_DATASET_REVISION):

datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \
f'ststoken?Revision={revision}'

cookies = requests.utils.dict_from_cookiejar(cookies)
r = requests.get(url=datahub_url, cookies=cookies)
resp = r.json()
datahub_raise_on_error(datahub_url, resp)
return resp['Data']

def on_dataset_download(self, dataset_name: str, namespace: str) -> None:
url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/download/increase'
r = requests.post(url)
r.raise_for_status()

@staticmethod
def datahub_remote_call(url):
r = requests.get(url)
@@ -383,6 +404,9 @@ class HubApi:
datahub_raise_on_error(url, resp)
return resp['Data']

def check_cookies_upload_data(self, use_cookies) -> CookieJar:
return self._check_cookie(use_cookies=use_cookies)


class ModelScopeConfig:
path_credential = expanduser(DEFAULT_CREDENTIALS_PATH)


+ 111
- 9
modelscope/hub/repository.py View File

@@ -2,7 +2,8 @@ import os
from typing import Optional

from modelscope.hub.errors import GitError, InvalidParameter, NotLoginException
from modelscope.utils.constant import DEFAULT_MODEL_REVISION
from modelscope.utils.constant import (DEFAULT_DATASET_REVISION,
DEFAULT_MODEL_REVISION)
from modelscope.utils.logger import get_logger
from .api import ModelScopeConfig
from .git import GitCommandWrapper
@@ -15,14 +16,12 @@ class Repository:
"""A local representation of the model git repository.
"""

def __init__(
self,
model_dir: str,
clone_from: str,
revision: Optional[str] = DEFAULT_MODEL_REVISION,
auth_token: Optional[str] = None,
git_path: Optional[str] = None,
):
def __init__(self,
model_dir: str,
clone_from: str,
revision: Optional[str] = DEFAULT_MODEL_REVISION,
auth_token: Optional[str] = None,
git_path: Optional[str] = None):
"""
Instantiate a Repository object by cloning the remote ModelScopeHub repo
Args:
@@ -86,6 +85,7 @@ class Repository:
branch: Optional[str] = DEFAULT_MODEL_REVISION,
force: bool = False):
"""Push local files to remote, this method will do.
git pull
git add
git commit
git push
@@ -117,3 +117,105 @@ class Repository:
url=url,
local_branch=branch,
remote_branch=branch)


class DatasetRepository:
"""A local representation of the dataset (metadata) git repository.
"""

def __init__(self,
repo_work_dir: str,
dataset_id: str,
revision: Optional[str] = DEFAULT_DATASET_REVISION,
auth_token: Optional[str] = None,
git_path: Optional[str] = None):
"""
Instantiate a Dataset Repository object by cloning the remote ModelScope dataset repo
Args:
repo_work_dir(`str`):
The dataset repo root directory.
dataset_id:
dataset id in ModelScope from which git clone
revision(`Optional[str]`):
revision of the dataset you want to clone from. Can be any of a branch, tag or commit hash
auth_token(`Optional[str]`):
token obtained when calling `HubApi.login()`. Usually you can safely ignore the parameter
as the token is already saved when you login the first time, if None, we will use saved token.
git_path:(`Optional[str]`):
The git command line path, if None, we use 'git'
"""
self.dataset_id = dataset_id
self.repo_work_dir = repo_work_dir
self.repo_base_dir = os.path.dirname(repo_work_dir)
self.repo_name = os.path.basename(repo_work_dir)
self.revision = revision
if auth_token:
self.auth_token = auth_token
else:
self.auth_token = ModelScopeConfig.get_token()

self.git_wrapper = GitCommandWrapper(git_path)
os.makedirs(self.repo_work_dir, exist_ok=True)
self.repo_url = self._get_repo_url(dataset_id=dataset_id)

def clone(self) -> str:
# check local repo dir, directory not empty.
if os.listdir(self.repo_work_dir):
remote_url = self._get_remote_url()
remote_url = self.git_wrapper.remove_token_from_url(remote_url)
# no need clone again
if remote_url and remote_url == self.repo_url:
return ''

logger.info('Cloning repo from {} '.format(self.repo_url))
self.git_wrapper.clone(self.repo_base_dir, self.auth_token,
self.repo_url, self.repo_name, self.revision)
return self.repo_work_dir

def push(self,
commit_message: str,
branch: Optional[str] = DEFAULT_DATASET_REVISION,
force: bool = False):
"""Push local files to remote, this method will do.
git pull
git add
git commit
git push
Args:
commit_message (str): commit message
branch (Optional[str], optional): which branch to push.
force (Optional[bool]): whether to use forced-push.
"""
if commit_message is None or not isinstance(commit_message, str):
msg = 'commit_message must be provided!'
raise InvalidParameter(msg)

if not isinstance(force, bool):
raise InvalidParameter('force must be bool')

if not self.auth_token:
raise NotLoginException('Must login to push, please login first.')

self.git_wrapper.config_auth_token(self.repo_work_dir, self.auth_token)
self.git_wrapper.add_user_info(self.repo_base_dir, self.repo_name)

remote_url = self.git_wrapper.get_repo_remote_url(self.repo_work_dir)
self.git_wrapper.pull(self.repo_work_dir)
self.git_wrapper.add(self.repo_work_dir, all_files=True)
self.git_wrapper.commit(self.repo_work_dir, commit_message)
self.git_wrapper.push(
repo_dir=self.repo_work_dir,
token=self.auth_token,
url=remote_url,
local_branch=branch,
remote_branch=branch)

def _get_repo_url(self, dataset_id):
return f'{get_endpoint()}/datasets/{dataset_id}.git'

def _get_remote_url(self):
try:
remote = self.git_wrapper.get_repo_remote_url(self.repo_work_dir)
except GitError:
remote = None
return remote

+ 7
- 1
modelscope/hub/utils/utils.py View File

@@ -1,7 +1,8 @@
import hashlib
import os

from modelscope.hub.constants import (DEFAULT_MODELSCOPE_DOMAIN,
from modelscope.hub.constants import (DEFAULT_MODELSCOPE_DATA_ENDPOINT,
DEFAULT_MODELSCOPE_DOMAIN,
DEFAULT_MODELSCOPE_GROUP,
MODEL_ID_SEPARATOR,
MODELSCOPE_URL_SCHEME)
@@ -38,6 +39,11 @@ def get_endpoint():
return MODELSCOPE_URL_SCHEME + modelscope_domain


def get_dataset_hub_endpoint():
return os.environ.get('HUB_DATASET_ENDPOINT',
DEFAULT_MODELSCOPE_DATA_ENDPOINT)


def compute_hash(file_path):
BUFFER_SIZE = 1024 * 64 # 64k buffer size
sha256_hash = hashlib.sha256()


+ 112
- 5
modelscope/msdatasets/ms_dataset.py View File

@@ -12,9 +12,11 @@ from datasets.utils.download_manager import DownloadConfig
from datasets.utils.file_utils import (is_relative_path,
relative_to_absolute_path)

from modelscope.msdatasets.config import MS_DATASETS_CACHE
from modelscope.hub.repository import DatasetRepository
from modelscope.utils.config import ConfigDict
from modelscope.utils.constant import (DEFAULT_DATASET_REVISION,
from modelscope.utils.config_ds import MS_DATASETS_CACHE
from modelscope.utils.constant import (DEFAULT_DATASET_NAMESPACE,
DEFAULT_DATASET_REVISION,
DatasetFormations, DownloadMode, Hubs)
from modelscope.utils.logger import get_logger
from .task_datasets.builder import build_task_dataset
@@ -23,6 +25,7 @@ from .utils.dataset_utils import (get_dataset_files,
get_target_dataset_structure,
load_dataset_builder)
from .utils.download_utils import DatasetDownloadManager
from .utils.upload_utils import DatasetUploadManager

logger = get_logger()

@@ -97,7 +100,7 @@ class MsDataset:
@staticmethod
def load(
dataset_name: Union[str, list],
namespace: Optional[str] = 'modelscope',
namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE,
target: Optional[str] = None,
version: Optional[str] = DEFAULT_DATASET_REVISION,
hub: Optional[Hubs] = Hubs.modelscope,
@@ -171,15 +174,17 @@ class MsDataset:
Mapping[str, Union[str, Sequence[str]]]]] = None,
download_mode: Optional[DownloadMode] = None,
**config_kwargs) -> Union[dict, 'MsDataset']:
from modelscope.hub.api import HubApi
api = HubApi()
download_dataset = ''
if isinstance(dataset_name, str):
download_dataset = dataset_name
dataset_formation = DatasetFormations.native
if dataset_name in _PACKAGED_DATASETS_MODULES or os.path.isdir(dataset_name) or \
(os.path.isfile(dataset_name) and dataset_name.endswith('.py')):
dataset_formation = DatasetFormations.hf_compatible
elif is_relative_path(dataset_name) and dataset_name.count(
'/') == 0:
from modelscope.hub.api import HubApi
api = HubApi()
dataset_scripts, dataset_formation, download_dir = api.fetch_dataset_scripts(
dataset_name, namespace, download_mode, version)
# dataset organized to be compatible with hf format
@@ -219,6 +224,11 @@ class MsDataset:
else:
raise TypeError('path must be a str or a list, but got'
f' {type(dataset_name)}')

if download_dataset:
api.on_dataset_download(
dataset_name=download_dataset, namespace=namespace)

return MsDataset.from_hf_dataset(dataset, target=target)

@staticmethod
@@ -539,3 +549,100 @@ class MsDataset:
def to_hf_dataset(self) -> Dataset:
self._hf_ds.reset_format()
return self._hf_ds

@staticmethod
def upload(object_name: str,
local_file_path: str,
dataset_name: str,
namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE,
version: Optional[str] = DEFAULT_DATASET_REVISION) -> None:
"""Upload dataset file to the ModelScope Hub. Please login to the ModelScope Hub first.

Args:
object_name (str): The object name on ModelScope, in the form of your-dataset-name.zip
local_file_path (str): Local file to upload
dataset_name (str): Name of the dataset
namespace(str, optional): Namespace of the dataset
version: Optional[str]: Version of the dataset

Returns:
None

"""
from modelscope.hub.api import HubApi
_hub_api = HubApi()
cookies = _hub_api.check_cookies_upload_data(use_cookies=True)
_upload_manager = DatasetUploadManager(
dataset_name=dataset_name,
namespace=namespace,
version=version,
cookies=cookies)
_upload_manager.upload(object_name, local_file_path)

@staticmethod
def clone_meta(dataset_work_dir: str,
dataset_id: str,
revision: Optional[str] = DEFAULT_DATASET_REVISION,
auth_token: Optional[str] = None,
git_path: Optional[str] = None) -> None:
"""Clone meta-file of dataset from the ModelScope Hub.
Args:
dataset_work_dir (str): Current git working directory.
dataset_id (str): Dataset id, It should be like your-namespace/your-dataset-name .
revision(`Optional[str]`):
revision of the model you want to clone from. Can be any of a branch, tag or commit hash
auth_token(`Optional[str]`):
token obtained when calling `HubApi.login()`. Usually you can safely ignore the parameter
as the token is already saved when you login the first time, if None, we will use saved token.
git_path:(`Optional[str]`):
The git command line path, if None, we use 'git'
Returns:
None
"""

_repo = DatasetRepository(
repo_work_dir=dataset_work_dir,
dataset_id=dataset_id,
revision=revision,
auth_token=auth_token,
git_path=git_path)
clone_work_dir = _repo.clone()
if clone_work_dir:
logger.info('Already cloned repo to: {}'.format(clone_work_dir))
else:
logger.warning('The repo working dir is already ex.')

@staticmethod
def upload_meta(dataset_work_dir: str,
dataset_id: str,
commit_message: str,
revision: Optional[str] = DEFAULT_DATASET_REVISION,
auth_token: Optional[str] = None,
git_path: Optional[str] = None,
force: bool = False) -> None:
"""Upload meta-file of dataset to the ModelScope Hub. Please clone the meta-data from the ModelScope Hub first.

Args:
dataset_work_dir (str): Current working directory.
dataset_id (str): Dataset id, It should be like your-namespace/your-dataset-name .
commit_message (str): Commit message.
revision(`Optional[str]`):
revision of the model you want to clone from. Can be any of a branch, tag or commit hash
auth_token(`Optional[str]`):
token obtained when calling `HubApi.login()`. Usually you can safely ignore the parameter
as the token is already saved when you login the first time, if None, we will use saved token.
git_path:(`Optional[str]`):
The git command line path, if None, we use 'git'
force (Optional[bool]): whether to use forced-push.

Returns:
None

"""
_repo = DatasetRepository(
repo_work_dir=dataset_work_dir,
dataset_id=dataset_id,
revision=revision,
auth_token=auth_token,
git_path=git_path)
_repo.push(commit_message=commit_message, branch=revision, force=force)

+ 26
- 7
modelscope/msdatasets/utils/oss_utils.py View File

@@ -1,6 +1,5 @@
from __future__ import print_function
import os
import sys

import oss2
from datasets.utils.file_utils import hash_url_to_filename
@@ -19,6 +18,12 @@ class OssUtilities:
self.oss_dir = oss_config['Dir']
self.oss_backup_dir = oss_config['BackupDir']

@staticmethod
def _percentage(consumed_bytes, total_bytes):
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\r{0}% '.format(rate), end='', flush=True)

def download(self, oss_file_name, cache_dir):
candidate_key = os.path.join(self.oss_dir, oss_file_name)
candidate_key_backup = os.path.join(self.oss_backup_dir, oss_file_name)
@@ -27,11 +32,25 @@ class OssUtilities:
filename = hash_url_to_filename(file_oss_key, etag=None)
local_path = os.path.join(cache_dir, filename)

def percentage(consumed_bytes, total_bytes):
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\r{0}% '.format(rate), end='', flush=True)

self.bucket.get_object_to_file(
file_oss_key, local_path, progress_callback=percentage)
file_oss_key, local_path, progress_callback=self._percentage)
return local_path

def upload(self, oss_file_name: str, local_file_path: str) -> str:
max_retries = 3
retry_count = 0
object_key = os.path.join(self.oss_dir, oss_file_name)

while True:
try:
retry_count += 1
self.bucket.put_object_from_file(
object_key,
local_file_path,
progress_callback=self._percentage)
break
except Exception:
if retry_count >= max_retries:
raise

return object_key

+ 23
- 0
modelscope/msdatasets/utils/upload_utils.py View File

@@ -0,0 +1,23 @@
from http.cookiejar import CookieJar

from .oss_utils import OssUtilities


class DatasetUploadManager(object):

def __init__(self, dataset_name: str, namespace: str, version: str,
cookies: CookieJar):
from modelscope.hub.api import HubApi
api = HubApi()
oss_config = api.get_dataset_access_config_session(
cookies=cookies,
dataset_name=dataset_name,
namespace=namespace,
revision=version)

self.oss_utilities = OssUtilities(oss_config)

def upload(self, oss_file_name: str, local_file_path: str) -> str:
oss_object_key = self.oss_utilities.upload(
oss_file_name=oss_file_name, local_file_path=local_file_path)
return oss_object_key

modelscope/msdatasets/config.py → modelscope/utils/config_ds.py View File


+ 1
- 0
modelscope/utils/constant.py View File

@@ -254,6 +254,7 @@ class Frameworks(object):

DEFAULT_MODEL_REVISION = 'master'
DEFAULT_DATASET_REVISION = 'master'
DEFAULT_DATASET_NAMESPACE = 'modelscope'


class ModeKeys:


+ 95
- 0
tests/msdatasets/test_dataset_upload.py View File

@@ -0,0 +1,95 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import shutil
import tempfile
import unittest
import zipfile

from modelscope.msdatasets import MsDataset
from modelscope.utils.constant import ModelFile
from modelscope.utils.test_utils import test_level

KEY_EXTRACTED = 'extracted'


class DatasetUploadTest(unittest.TestCase):

def setUp(self):
self.old_dir = os.getcwd()
self.dataset_name = 'small_coco_for_test'
self.dataset_file_name = self.dataset_name
self.prepared_dataset_name = 'pets_small'
self.token = os.getenv('TEST_UPLOAD_MS_TOKEN')
error_msg = 'The modelscope token can not be empty, please set env variable: TEST_UPLOAD_MS_TOKEN'
self.assertIsNotNone(self.token, msg=error_msg)
from modelscope.hub.api import HubApi
from modelscope.hub.api import ModelScopeConfig
self.api = HubApi()
self.api.login(self.token)

# get user info
self.namespace, _ = ModelScopeConfig.get_user_info()

self.temp_dir = tempfile.mkdtemp()
self.test_work_dir = os.path.join(self.temp_dir, self.dataset_name)
self.test_meta_dir = os.path.join(self.test_work_dir, 'meta')
if not os.path.exists(self.test_work_dir):
os.makedirs(self.test_work_dir)

def tearDown(self):
os.chdir(self.old_dir)
shutil.rmtree(self.temp_dir, ignore_errors=True)
print('The test dir successfully removed!')

@staticmethod
def get_raw_downloaded_file_path(extracted_path):
raw_downloaded_file_path = ''
raw_data_dir = os.path.abspath(
os.path.join(extracted_path, '../../..'))
for root, dirs, files in os.walk(raw_data_dir):
if KEY_EXTRACTED in dirs:
for file in files:
curr_file_path = os.path.join(root, file)
if zipfile.is_zipfile(curr_file_path):
raw_downloaded_file_path = curr_file_path
return raw_downloaded_file_path

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ds_upload(self):
# Get the prepared data from hub, using default modelscope namespace
ms_ds_train = MsDataset.load(self.prepared_dataset_name, split='train')
config_res = ms_ds_train._hf_ds.config_kwargs
extracted_path = config_res.get('split_config').get('train')
raw_zipfile_path = self.get_raw_downloaded_file_path(extracted_path)

MsDataset.upload(
object_name=self.dataset_file_name + '.zip',
local_file_path=raw_zipfile_path,
dataset_name=self.dataset_name,
namespace=self.namespace)

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ds_clone_meta(self):
MsDataset.clone_meta(
dataset_work_dir=self.test_meta_dir,
dataset_id=os.path.join(self.namespace, self.dataset_name))

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ds_upload_meta(self):
# Clone dataset meta repo first.
MsDataset.clone_meta(
dataset_work_dir=self.test_meta_dir,
dataset_id=os.path.join(self.namespace, self.dataset_name))

with open(os.path.join(self.test_meta_dir, ModelFile.README),
'a') as f:
f.write('\nThis is a line for unit test.')

MsDataset.upload_meta(
dataset_work_dir=self.test_meta_dir,
dataset_id=os.path.join(self.namespace, self.dataset_name),
commit_message='Update for unit test.')


if __name__ == '__main__':
unittest.main()

+ 2
- 2
tests/msdatasets/test_ms_dataset.py View File

@@ -4,7 +4,7 @@ from modelscope.models import Model
from modelscope.msdatasets import MsDataset
from modelscope.preprocessors import SequenceClassificationPreprocessor
from modelscope.preprocessors.base import Preprocessor
from modelscope.utils.constant import DownloadMode
from modelscope.utils.constant import DEFAULT_DATASET_NAMESPACE, DownloadMode
from modelscope.utils.test_utils import require_tf, require_torch, test_level


@@ -35,7 +35,7 @@ class MsDatasetTest(unittest.TestCase):
def test_coco(self):
ms_ds_train = MsDataset.load(
'pets_small',
namespace='modelscope',
namespace=DEFAULT_DATASET_NAMESPACE,
split='train',
download_mode=DownloadMode.FORCE_REDOWNLOAD,
classes=('1', '2'))


Loading…
Cancel
Save