Browse Source

[to #42322933] Add delete datasets files and upload mode.

1. Add : MsDataset.delete() , support delete dataset file or dir.

2. Add: upload mode,  MsDataset.upload(xx, upload_mode=UploadMode.FORCE_UPLOAD), or  MsDataset.upload(xx, upload_mode=UploadMode.APPEND_UPLOAD)
     if upload_mode = UploadMode.APPEND_UPLOAD, then skip object in case of this object exists.

3. Add: support reload sts token automatically to avoid expire. (current expiration: 24h)

4. Fix: add cookies in api.py for downloading private datasets.
        Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/10524449
master
xingjun.wxj yingda.chen 2 years ago
parent
commit
78f29cf999
9 changed files with 320 additions and 52 deletions
  1. +39
    -12
      modelscope/hub/api.py
  2. +54
    -19
      modelscope/msdatasets/ms_dataset.py
  3. +1
    -1
      modelscope/msdatasets/utils/dataset_utils.py
  4. +32
    -0
      modelscope/msdatasets/utils/delete_utils.py
  5. +5
    -1
      modelscope/msdatasets/utils/download_utils.py
  6. +53
    -12
      modelscope/msdatasets/utils/oss_utils.py
  7. +15
    -7
      modelscope/msdatasets/utils/upload_utils.py
  8. +9
    -0
      modelscope/utils/constant.py
  9. +112
    -0
      tests/msdatasets/test_dataset_delete.py

+ 39
- 12
modelscope/hub/api.py View File

@@ -506,13 +506,14 @@ class HubApi:
shutil.rmtree(cache_dir)
os.makedirs(cache_dir, exist_ok=True)
datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}'
r = requests.get(datahub_url)
cookies = ModelScopeConfig.get_cookies()
r = requests.get(datahub_url, cookies=cookies)
resp = r.json()
datahub_raise_on_error(datahub_url, resp)
dataset_id = resp['Data']['Id']
dataset_type = resp['Data']['Type']
datahub_url = f'{self.endpoint}/api/v1/datasets/{dataset_id}/repo/tree?Revision={revision}'
r = requests.get(datahub_url, headers=self.headers)
r = requests.get(datahub_url, cookies=cookies, headers=self.headers)
resp = r.json()
datahub_raise_on_error(datahub_url, resp)
file_list = resp['Data']
@@ -531,7 +532,7 @@ class HubApi:
if extension in dataset_meta_format:
datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \
f'Revision={revision}&FilePath={file_path}'
r = requests.get(datahub_url)
r = requests.get(datahub_url, cookies=cookies)
raise_for_http_status(r)
local_path = os.path.join(cache_dir, file_path)
if os.path.exists(local_path):
@@ -576,9 +577,7 @@ class HubApi:
datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \
f'ststoken?Revision={revision}'

cookies = requests.utils.dict_from_cookiejar(cookies)
r = requests.get(
url=datahub_url, cookies=cookies, headers=self.headers)
r = requests.get(url=datahub_url, cookies=cookies, headers=self.headers)
resp = r.json()
raise_on_error(resp)
return resp['Data']
@@ -589,9 +588,6 @@ class HubApi:
f'MaxLimit={max_limit}&Revision={revision}&Recursive={is_recursive}&FilterDir={is_filter_dir}'

cookies = ModelScopeConfig.get_cookies()
if cookies:
cookies = requests.utils.dict_from_cookiejar(cookies)

resp = requests.get(url=url, cookies=cookies)
resp = resp.json()
raise_on_error(resp)
@@ -600,17 +596,48 @@ class HubApi:

def on_dataset_download(self, dataset_name: str, namespace: str) -> None:
url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/download/increase'
r = requests.post(url, headers=self.headers)
cookies = ModelScopeConfig.get_cookies()
r = requests.post(url, cookies=cookies, headers=self.headers)
raise_for_http_status(r)

def delete_oss_dataset_object(self, object_name: str, dataset_name: str,
namespace: str, revision: str) -> str:
if not object_name or not dataset_name or not namespace or not revision:
raise ValueError('Args cannot be empty!')

url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/oss?Path={object_name}&Revision={revision}'

cookies = self.check_local_cookies(use_cookies=True)
resp = requests.delete(url=url, cookies=cookies)
resp = resp.json()
raise_on_error(resp)
resp = resp['Message']
return resp

def delete_oss_dataset_dir(self, object_name: str, dataset_name: str,
namespace: str, revision: str) -> str:
if not object_name or not dataset_name or not namespace or not revision:
raise ValueError('Args cannot be empty!')

url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/oss/prefix?Prefix={object_name}/' \
f'&Revision={revision}'

cookies = self.check_local_cookies(use_cookies=True)
resp = requests.delete(url=url, cookies=cookies)
resp = resp.json()
raise_on_error(resp)
resp = resp['Message']
return resp

@staticmethod
def datahub_remote_call(url):
r = requests.get(url, headers={'user-agent': ModelScopeConfig.get_user_agent()})
cookies = ModelScopeConfig.get_cookies()
r = requests.get(url, cookies=cookies, headers={'user-agent': ModelScopeConfig.get_user_agent()})
resp = r.json()
datahub_raise_on_error(url, resp)
return resp['Data']

def check_cookies_upload_data(self, use_cookies) -> CookieJar:
def check_local_cookies(self, use_cookies) -> CookieJar:
return self._check_cookie(use_cookies=use_cookies)




+ 54
- 19
modelscope/msdatasets/ms_dataset.py View File

@@ -20,13 +20,15 @@ from modelscope.msdatasets.task_datasets.builder import build_task_dataset
from modelscope.msdatasets.utils.dataset_builder import ExternalDataset
from modelscope.msdatasets.utils.dataset_utils import (
get_dataset_files, get_target_dataset_structure, load_dataset_builder)
from modelscope.msdatasets.utils.delete_utils import DatasetDeleteManager
from modelscope.msdatasets.utils.download_utils import DatasetDownloadManager
from modelscope.msdatasets.utils.upload_utils import DatasetUploadManager
from modelscope.utils.config import ConfigDict
from modelscope.utils.config_ds import MS_DATASETS_CACHE
from modelscope.utils.constant import (DEFAULT_DATASET_NAMESPACE,
DEFAULT_DATASET_REVISION,
DatasetFormations, DownloadMode, Hubs)
DatasetFormations, DownloadMode, Hubs,
UploadMode)
from modelscope.utils.logger import get_logger

logger = get_logger()
@@ -576,15 +578,17 @@ class MsDataset:
return self._hf_ds.rename_columns(column_mapping)

@staticmethod
def upload(object_name: str,
local_file_path: str,
dataset_name: str,
namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE,
version: Optional[str] = DEFAULT_DATASET_REVISION,
num_processes: Optional[int] = None,
chunksize: Optional[int] = 1,
filter_hidden_files: Optional[bool] = True) -> None:
"""Upload dataset file or directory to the ModelScope Hub. Please login to the ModelScope Hub first.
def upload(
object_name: str,
local_file_path: str,
dataset_name: str,
namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE,
version: Optional[str] = DEFAULT_DATASET_REVISION,
num_processes: Optional[int] = None,
chunksize: Optional[int] = 1,
filter_hidden_files: Optional[bool] = True,
upload_mode: Optional[UploadMode] = UploadMode.OVERWRITE) -> None:
"""Upload dataset file or directory to the ModelScope Hub. Please log in to the ModelScope Hub first.

Args:
object_name (str): The object name on ModelScope, in the form of your-dataset-name.zip or your-dataset-name
@@ -592,7 +596,7 @@ class MsDataset:
dataset_name (str): Name of the dataset
namespace(str, optional): Namespace of the dataset
version: Optional[str]: Version of the dataset
num_processes: Optional[int]: The number of processes used for multi-process uploading.
num_processes: Optional[int]: The number of processes used for multiprocess uploading.
This is only applicable when local_file_path is a directory, and we are uploading mutliple-files
insided the directory. When None provided, the number returned by os.cpu_count() is used as default.
chunksize: Optional[int]: The chunksize of objects to upload.
@@ -600,24 +604,34 @@ class MsDataset:
using the default value of 1. Available if local_file_path is a directory.
filter_hidden_files: Optional[bool]: Whether to filter hidden files.
Available if local_file_path is a directory.
upload_mode: Optional[UploadMode]: How to upload objects from local. Default: UploadMode.OVERWRITE, upload
all objects from local, existing remote objects may be overwritten.

Returns:
None

"""
if not object_name:
raise ValueError('object_name cannot be empty!')

_upload_manager = DatasetUploadManager(
dataset_name=dataset_name, namespace=namespace, version=version)

upload_mode = UploadMode(upload_mode or UploadMode.OVERWRITE)

if os.path.isfile(local_file_path):
_upload_manager.upload(
object_name=object_name, local_file_path=local_file_path)
object_name=object_name,
local_file_path=local_file_path,
upload_mode=upload_mode)
elif os.path.isdir(local_file_path):
_upload_manager.upload_dir(
object_dir_name=object_name,
local_dir_path=local_file_path,
num_processes=num_processes,
chunksize=chunksize,
filter_hidden_files=filter_hidden_files)
filter_hidden_files=filter_hidden_files,
upload_mode=upload_mode)
else:
raise ValueError(
f'{local_file_path} is not a valid file path or directory')
@@ -672,7 +686,7 @@ class MsDataset:
revision of the model you want to clone from. Can be any of a branch, tag or commit hash
auth_token(`Optional[str]`):
token obtained when calling `HubApi.login()`. Usually you can safely ignore the parameter
as the token is already saved when you login the first time, if None, we will use saved token.
as the token is already saved when you log in the first time, if None, we will use saved token.
git_path:(`Optional[str]`):
The git command line path, if None, we use 'git'
force (Optional[bool]): whether to use forced-push.
@@ -687,8 +701,29 @@ class MsDataset:
revision=revision,
auth_token=auth_token,
git_path=git_path)
_repo.push(
commit_message=commit_message,
local_branch=revision,
remote_branch=revision,
force=force)
_repo.push(commit_message=commit_message, branch=revision, force=force)

@staticmethod
def delete(object_name: str,
dataset_name: str,
namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE,
version: Optional[str] = DEFAULT_DATASET_REVISION) -> str:
""" Delete object of dataset. Please log in first and make sure you have permission to manage the dataset.

Args:
object_name (str): The object name of dataset to be deleted. Could be a name of file or directory. If it's
directory, then ends with `/`.
For example: your-data-name.zip, train/001/img_001.png, train/, ...
dataset_name (str): Path or name of the dataset.
namespace(str, optional): Namespace of the dataset.
version (str, optional): Version of the dataset.

Returns:
res_msg (str): Response message.

"""
_delete_manager = DatasetDeleteManager(
dataset_name=dataset_name, namespace=namespace, version=version)
resp_msg = _delete_manager.delete(object_name=object_name)
logger.info(f'Object {object_name} successfully removed!')
return resp_msg

+ 1
- 1
modelscope/msdatasets/utils/dataset_utils.py View File

@@ -82,7 +82,7 @@ def list_dataset_objects(hub_api: HubApi, max_limit: int, is_recursive: bool,
dataset_name: str, namespace: str,
version: str) -> list:
"""
List all of objects for specific dataset.
List all objects for specific dataset.

Args:
hub_api (class HubApi): HubApi instance.


+ 32
- 0
modelscope/msdatasets/utils/delete_utils.py View File

@@ -0,0 +1,32 @@
# Copyright (c) Alibaba, Inc. and its affiliates.

from modelscope.hub.api import HubApi


class DatasetDeleteManager(object):

def __init__(self, dataset_name: str, namespace: str, version: str):
self.api = HubApi()
self.dataset_name = dataset_name
self.namespace = namespace
self.version = version

def delete(self, object_name: str) -> str:

# single object
if not object_name.endswith('/'):
resp_msg = self.api.delete_oss_dataset_object(
object_name=object_name,
dataset_name=self.dataset_name,
namespace=self.namespace,
revision=self.version)
else:
# multiple objects
object_name = object_name.strip('/')
resp_msg = self.api.delete_oss_dataset_dir(
object_name=object_name,
dataset_name=self.dataset_name,
namespace=self.namespace,
revision=self.version)

return resp_msg

+ 5
- 1
modelscope/msdatasets/utils/download_utils.py View File

@@ -27,7 +27,11 @@ class DatasetDownloadManager(DownloadManager):
oss_config = api.get_dataset_access_config(self._dataset_name,
self._namespace,
self._version)
self.oss_utilities = OssUtilities(oss_config)
self.oss_utilities = OssUtilities(
oss_config=oss_config,
dataset_name=self._dataset_name,
namespace=self._namespace,
revision=self._version)

def _download(self, url_or_filename: str,
download_config: DownloadConfig) -> str:


+ 53
- 12
modelscope/msdatasets/utils/oss_utils.py View File

@@ -6,19 +6,28 @@ import os
import oss2
from datasets.utils.file_utils import hash_url_to_filename

from modelscope.hub.api import HubApi
from modelscope.utils.constant import UploadMode
from modelscope.utils.logger import get_logger

logger = get_logger()

ACCESS_ID = 'AccessId'
ACCESS_SECRET = 'AccessSecret'
SECURITY_TOKEN = 'SecurityToken'
BUCKET = 'Bucket'
BACK_DIR = 'BackupDir'
DIR = 'Dir'


class OssUtilities:

def __init__(self, oss_config):
self.key = oss_config['AccessId']
self.secret = oss_config['AccessSecret']
self.token = oss_config['SecurityToken']
self.endpoint = f"https://{oss_config['Region']}.aliyuncs.com"
self.bucket_name = oss_config['Bucket']
auth = oss2.StsAuth(self.key, self.secret, self.token)
self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
self.oss_dir = oss_config['Dir']
self.oss_backup_dir = oss_config['BackupDir']
def __init__(self, oss_config, dataset_name, namespace, revision):
self._do_init(oss_config=oss_config)

self.dataset_name = dataset_name
self.namespace = namespace
self.revision = revision

self.upload_resumable_tmp_store = '/tmp/modelscope/tmp_dataset'
self.upload_multipart_threshold = 50 * 1024 * 1024
@@ -26,6 +35,28 @@ class OssUtilities:
self.upload_num_threads = 4
self.upload_max_retries = 3

self.api = HubApi()

def _do_init(self, oss_config):
self.key = oss_config[ACCESS_ID]
self.secret = oss_config[ACCESS_SECRET]
self.token = oss_config[SECURITY_TOKEN]
self.endpoint = f"https://{oss_config['Region']}.aliyuncs.com"
self.bucket_name = oss_config[BUCKET]
auth = oss2.StsAuth(self.key, self.secret, self.token)
self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
self.oss_dir = oss_config[DIR]
self.oss_backup_dir = oss_config[BACK_DIR]

def _reload_sts(self):
cookies = self.api.check_local_cookies(use_cookies=True)
oss_config_refresh = self.api.get_dataset_access_config_session(
cookies=cookies,
dataset_name=self.dataset_name,
namespace=self.namespace,
revision=self.revision)
self._do_init(oss_config_refresh)

@staticmethod
def _percentage(consumed_bytes, total_bytes):
if total_bytes:
@@ -51,7 +82,8 @@ class OssUtilities:
return local_path

def upload(self, oss_object_name: str, local_file_path: str,
indicate_individual_progress: bool) -> str:
indicate_individual_progress: bool,
upload_mode: UploadMode) -> str:
retry_count = 0
object_key = os.path.join(self.oss_dir, oss_object_name)
resumable_store = oss2.ResumableStore(
@@ -64,6 +96,13 @@ class OssUtilities:
while True:
try:
retry_count += 1
exist = self.bucket.object_exists(object_key)
if upload_mode == UploadMode.APPEND and exist:
logger.info(
f'Skip {oss_object_name} in case of {upload_mode.value} mode.'
)
break

oss2.resumable_upload(
self.bucket,
object_key,
@@ -74,7 +113,9 @@ class OssUtilities:
progress_callback=progress_callback,
num_threads=self.upload_num_threads)
break
except Exception:
except Exception as e:
if e.__getattribute__('status') == 403:
self._reload_sts()
if retry_count >= self.upload_max_retries:
raise



+ 15
- 7
modelscope/msdatasets/utils/upload_utils.py View File

@@ -5,6 +5,7 @@ from multiprocessing.dummy import Pool as ThreadPool

from tqdm import tqdm

from modelscope.utils.constant import UploadMode
from .oss_utils import OssUtilities


@@ -13,38 +14,45 @@ class DatasetUploadManager(object):
def __init__(self, dataset_name: str, namespace: str, version: str):
from modelscope.hub.api import HubApi
_hub_api = HubApi()
_cookies = _hub_api.check_cookies_upload_data(use_cookies=True)
_cookies = _hub_api.check_local_cookies(use_cookies=True)
_oss_config = _hub_api.get_dataset_access_config_session(
cookies=_cookies,
dataset_name=dataset_name,
namespace=namespace,
revision=version)

self.oss_utilities = OssUtilities(_oss_config)
self.oss_utilities = OssUtilities(
oss_config=_oss_config,
dataset_name=dataset_name,
namespace=namespace,
revision=version)

def upload(self, object_name: str, local_file_path: str) -> str:
def upload(self, object_name: str, local_file_path: str,
upload_mode: UploadMode) -> str:
object_key = self.oss_utilities.upload(
oss_object_name=object_name,
local_file_path=local_file_path,
indicate_individual_progress=True)
indicate_individual_progress=True,
upload_mode=upload_mode)
return object_key

def upload_dir(self, object_dir_name: str, local_dir_path: str,
num_processes: int, chunksize: int,
filter_hidden_files: bool) -> int:
filter_hidden_files: bool, upload_mode: UploadMode) -> int:

def run_upload(args):
self.oss_utilities.upload(
oss_object_name=args[0],
local_file_path=args[1],
indicate_individual_progress=False)
indicate_individual_progress=False,
upload_mode=upload_mode)

files_list = []
for root, dirs, files in os.walk(local_dir_path):
for file_name in files:
if filter_hidden_files and file_name.startswith('.'):
continue
# Concatenate directory name and relative path into a oss object key. e.g., train/001/1_1230.png
# Concatenate directory name and relative path into oss object key. e.g., train/001/1_1230.png
object_name = os.path.join(
object_dir_name,
root.replace(local_dir_path, '', 1).strip('/'), file_name)


+ 9
- 0
modelscope/utils/constant.py View File

@@ -238,6 +238,15 @@ class DownloadMode(enum.Enum):
FORCE_REDOWNLOAD = 'force_redownload'


class UploadMode(enum.Enum):
""" How to upload object to remote.
"""
# Upload all objects from local, existing remote objects may be overwritten. (Default)
OVERWRITE = 'overwrite'
# Upload local objects in append mode, skipping all existing remote objects.
APPEND = 'append'


class DatasetFormations(enum.Enum):
""" How a dataset is organized and interpreted
"""


+ 112
- 0
tests/msdatasets/test_dataset_delete.py View File

@@ -0,0 +1,112 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import shutil
import tempfile
import unittest
import zipfile

from modelscope.msdatasets import MsDataset
from modelscope.utils import logger as logging
from modelscope.utils.test_utils import test_level

logger = logging.get_logger(__name__)

KEY_EXTRACTED = 'extracted'
EXPECTED_MSG = 'success'


class DatasetDeleteTest(unittest.TestCase):

def setUp(self):
self.old_dir = os.getcwd()
self.dataset_name = 'small_coco_for_test'
self.dataset_file_name = self.dataset_name
self.prepared_dataset_name = 'pets_small'
self.token = os.getenv('TEST_UPLOAD_MS_TOKEN')
error_msg = 'The modelscope token can not be empty, please set env variable: TEST_UPLOAD_MS_TOKEN'
self.assertIsNotNone(self.token, msg=error_msg)
from modelscope.hub.api import HubApi
from modelscope.hub.api import ModelScopeConfig
self.api = HubApi()
self.api.login(self.token)

# get user info
self.namespace, _ = ModelScopeConfig.get_user_info()

self.temp_dir = tempfile.mkdtemp()
self.test_work_dir = os.path.join(self.temp_dir, self.dataset_name)
if not os.path.exists(self.test_work_dir):
os.makedirs(self.test_work_dir)

def tearDown(self):
os.chdir(self.old_dir)
shutil.rmtree(self.temp_dir, ignore_errors=True)
logger.info(
f'Temporary directory {self.temp_dir} successfully removed!')

@staticmethod
def get_raw_downloaded_file_path(extracted_path):
raw_downloaded_file_path = ''
raw_data_dir = os.path.abspath(
os.path.join(extracted_path, '../../..'))
for root, dirs, files in os.walk(raw_data_dir):
if KEY_EXTRACTED in dirs:
for file in files:
curr_file_path = os.path.join(root, file)
if zipfile.is_zipfile(curr_file_path):
raw_downloaded_file_path = curr_file_path
return raw_downloaded_file_path

def upload_test_file(self):
# Get the prepared data from hub, using default modelscope namespace
ms_ds_train = MsDataset.load(self.prepared_dataset_name, split='train')
config_res = ms_ds_train._hf_ds.config_kwargs
extracted_path = config_res.get('split_config').get('train')
raw_zipfile_path = self.get_raw_downloaded_file_path(extracted_path)

object_name = self.dataset_file_name + '_for_del.zip'
MsDataset.upload(
object_name=object_name,
local_file_path=raw_zipfile_path,
dataset_name=self.dataset_name,
namespace=self.namespace)

return object_name

def upload_test_dir(self):
ms_ds_train = MsDataset.load(self.prepared_dataset_name, split='train')
config_train = ms_ds_train._hf_ds.config_kwargs
extracted_path_train = config_train.get('split_config').get('train')

object_name = 'train_for_del'
MsDataset.upload(
object_name=object_name,
local_file_path=os.path.join(extracted_path_train,
'Pets/images/train'),
dataset_name=self.dataset_name,
namespace=self.namespace)

return object_name + '/'

@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ds_delete_object(self):

# upload prepared data
file_name = self.upload_test_file()
dir_name = self.upload_test_dir()

# delete object
del_file_msg = MsDataset.delete(
object_name=file_name,
dataset_name=self.dataset_name,
namespace=self.namespace)
del_dir_msg = MsDataset.delete(
object_name=dir_name,
dataset_name=self.dataset_name,
namespace=self.namespace)

assert all([del_file_msg == EXPECTED_MSG, del_dir_msg == EXPECTED_MSG])


if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save