diff --git a/modelscope/hub/api.py b/modelscope/hub/api.py index 7337846f..5923319d 100644 --- a/modelscope/hub/api.py +++ b/modelscope/hub/api.py @@ -506,13 +506,14 @@ class HubApi: shutil.rmtree(cache_dir) os.makedirs(cache_dir, exist_ok=True) datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}' - r = requests.get(datahub_url) + cookies = ModelScopeConfig.get_cookies() + r = requests.get(datahub_url, cookies=cookies) resp = r.json() datahub_raise_on_error(datahub_url, resp) dataset_id = resp['Data']['Id'] dataset_type = resp['Data']['Type'] datahub_url = f'{self.endpoint}/api/v1/datasets/{dataset_id}/repo/tree?Revision={revision}' - r = requests.get(datahub_url, headers=self.headers) + r = requests.get(datahub_url, cookies=cookies, headers=self.headers) resp = r.json() datahub_raise_on_error(datahub_url, resp) file_list = resp['Data'] @@ -531,7 +532,7 @@ class HubApi: if extension in dataset_meta_format: datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \ f'Revision={revision}&FilePath={file_path}' - r = requests.get(datahub_url) + r = requests.get(datahub_url, cookies=cookies) raise_for_http_status(r) local_path = os.path.join(cache_dir, file_path) if os.path.exists(local_path): @@ -576,9 +577,7 @@ class HubApi: datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \ f'ststoken?Revision={revision}' - cookies = requests.utils.dict_from_cookiejar(cookies) - r = requests.get( - url=datahub_url, cookies=cookies, headers=self.headers) + r = requests.get(url=datahub_url, cookies=cookies, headers=self.headers) resp = r.json() raise_on_error(resp) return resp['Data'] @@ -589,9 +588,6 @@ class HubApi: f'MaxLimit={max_limit}&Revision={revision}&Recursive={is_recursive}&FilterDir={is_filter_dir}' cookies = ModelScopeConfig.get_cookies() - if cookies: - cookies = requests.utils.dict_from_cookiejar(cookies) - resp = requests.get(url=url, cookies=cookies) resp = resp.json() raise_on_error(resp) @@ -600,17 +596,48 @@ class HubApi: def on_dataset_download(self, dataset_name: str, namespace: str) -> None: url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/download/increase' - r = requests.post(url, headers=self.headers) + cookies = ModelScopeConfig.get_cookies() + r = requests.post(url, cookies=cookies, headers=self.headers) raise_for_http_status(r) + def delete_oss_dataset_object(self, object_name: str, dataset_name: str, + namespace: str, revision: str) -> str: + if not object_name or not dataset_name or not namespace or not revision: + raise ValueError('Args cannot be empty!') + + url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/oss?Path={object_name}&Revision={revision}' + + cookies = self.check_local_cookies(use_cookies=True) + resp = requests.delete(url=url, cookies=cookies) + resp = resp.json() + raise_on_error(resp) + resp = resp['Message'] + return resp + + def delete_oss_dataset_dir(self, object_name: str, dataset_name: str, + namespace: str, revision: str) -> str: + if not object_name or not dataset_name or not namespace or not revision: + raise ValueError('Args cannot be empty!') + + url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/oss/prefix?Prefix={object_name}/' \ + f'&Revision={revision}' + + cookies = self.check_local_cookies(use_cookies=True) + resp = requests.delete(url=url, cookies=cookies) + resp = resp.json() + raise_on_error(resp) + resp = resp['Message'] + return resp + @staticmethod def datahub_remote_call(url): - r = requests.get(url, headers={'user-agent': ModelScopeConfig.get_user_agent()}) + cookies = ModelScopeConfig.get_cookies() + r = requests.get(url, cookies=cookies, headers={'user-agent': ModelScopeConfig.get_user_agent()}) resp = r.json() datahub_raise_on_error(url, resp) return resp['Data'] - def check_cookies_upload_data(self, use_cookies) -> CookieJar: + def check_local_cookies(self, use_cookies) -> CookieJar: return self._check_cookie(use_cookies=use_cookies) diff --git a/modelscope/msdatasets/ms_dataset.py b/modelscope/msdatasets/ms_dataset.py index e90f397b..0c537df7 100644 --- a/modelscope/msdatasets/ms_dataset.py +++ b/modelscope/msdatasets/ms_dataset.py @@ -20,13 +20,15 @@ from modelscope.msdatasets.task_datasets.builder import build_task_dataset from modelscope.msdatasets.utils.dataset_builder import ExternalDataset from modelscope.msdatasets.utils.dataset_utils import ( get_dataset_files, get_target_dataset_structure, load_dataset_builder) +from modelscope.msdatasets.utils.delete_utils import DatasetDeleteManager from modelscope.msdatasets.utils.download_utils import DatasetDownloadManager from modelscope.msdatasets.utils.upload_utils import DatasetUploadManager from modelscope.utils.config import ConfigDict from modelscope.utils.config_ds import MS_DATASETS_CACHE from modelscope.utils.constant import (DEFAULT_DATASET_NAMESPACE, DEFAULT_DATASET_REVISION, - DatasetFormations, DownloadMode, Hubs) + DatasetFormations, DownloadMode, Hubs, + UploadMode) from modelscope.utils.logger import get_logger logger = get_logger() @@ -576,15 +578,17 @@ class MsDataset: return self._hf_ds.rename_columns(column_mapping) @staticmethod - def upload(object_name: str, - local_file_path: str, - dataset_name: str, - namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE, - version: Optional[str] = DEFAULT_DATASET_REVISION, - num_processes: Optional[int] = None, - chunksize: Optional[int] = 1, - filter_hidden_files: Optional[bool] = True) -> None: - """Upload dataset file or directory to the ModelScope Hub. Please login to the ModelScope Hub first. + def upload( + object_name: str, + local_file_path: str, + dataset_name: str, + namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE, + version: Optional[str] = DEFAULT_DATASET_REVISION, + num_processes: Optional[int] = None, + chunksize: Optional[int] = 1, + filter_hidden_files: Optional[bool] = True, + upload_mode: Optional[UploadMode] = UploadMode.OVERWRITE) -> None: + """Upload dataset file or directory to the ModelScope Hub. Please log in to the ModelScope Hub first. Args: object_name (str): The object name on ModelScope, in the form of your-dataset-name.zip or your-dataset-name @@ -592,7 +596,7 @@ class MsDataset: dataset_name (str): Name of the dataset namespace(str, optional): Namespace of the dataset version: Optional[str]: Version of the dataset - num_processes: Optional[int]: The number of processes used for multi-process uploading. + num_processes: Optional[int]: The number of processes used for multiprocess uploading. This is only applicable when local_file_path is a directory, and we are uploading mutliple-files insided the directory. When None provided, the number returned by os.cpu_count() is used as default. chunksize: Optional[int]: The chunksize of objects to upload. @@ -600,24 +604,34 @@ class MsDataset: using the default value of 1. Available if local_file_path is a directory. filter_hidden_files: Optional[bool]: Whether to filter hidden files. Available if local_file_path is a directory. + upload_mode: Optional[UploadMode]: How to upload objects from local. Default: UploadMode.OVERWRITE, upload + all objects from local, existing remote objects may be overwritten. Returns: None """ + if not object_name: + raise ValueError('object_name cannot be empty!') + _upload_manager = DatasetUploadManager( dataset_name=dataset_name, namespace=namespace, version=version) + upload_mode = UploadMode(upload_mode or UploadMode.OVERWRITE) + if os.path.isfile(local_file_path): _upload_manager.upload( - object_name=object_name, local_file_path=local_file_path) + object_name=object_name, + local_file_path=local_file_path, + upload_mode=upload_mode) elif os.path.isdir(local_file_path): _upload_manager.upload_dir( object_dir_name=object_name, local_dir_path=local_file_path, num_processes=num_processes, chunksize=chunksize, - filter_hidden_files=filter_hidden_files) + filter_hidden_files=filter_hidden_files, + upload_mode=upload_mode) else: raise ValueError( f'{local_file_path} is not a valid file path or directory') @@ -672,7 +686,7 @@ class MsDataset: revision of the model you want to clone from. Can be any of a branch, tag or commit hash auth_token(`Optional[str]`): token obtained when calling `HubApi.login()`. Usually you can safely ignore the parameter - as the token is already saved when you login the first time, if None, we will use saved token. + as the token is already saved when you log in the first time, if None, we will use saved token. git_path:(`Optional[str]`): The git command line path, if None, we use 'git' force (Optional[bool]): whether to use forced-push. @@ -687,8 +701,29 @@ class MsDataset: revision=revision, auth_token=auth_token, git_path=git_path) - _repo.push( - commit_message=commit_message, - local_branch=revision, - remote_branch=revision, - force=force) + _repo.push(commit_message=commit_message, branch=revision, force=force) + + @staticmethod + def delete(object_name: str, + dataset_name: str, + namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE, + version: Optional[str] = DEFAULT_DATASET_REVISION) -> str: + """ Delete object of dataset. Please log in first and make sure you have permission to manage the dataset. + + Args: + object_name (str): The object name of dataset to be deleted. Could be a name of file or directory. If it's + directory, then ends with `/`. + For example: your-data-name.zip, train/001/img_001.png, train/, ... + dataset_name (str): Path or name of the dataset. + namespace(str, optional): Namespace of the dataset. + version (str, optional): Version of the dataset. + + Returns: + res_msg (str): Response message. + + """ + _delete_manager = DatasetDeleteManager( + dataset_name=dataset_name, namespace=namespace, version=version) + resp_msg = _delete_manager.delete(object_name=object_name) + logger.info(f'Object {object_name} successfully removed!') + return resp_msg diff --git a/modelscope/msdatasets/utils/dataset_utils.py b/modelscope/msdatasets/utils/dataset_utils.py index c7aa7682..7a46b325 100644 --- a/modelscope/msdatasets/utils/dataset_utils.py +++ b/modelscope/msdatasets/utils/dataset_utils.py @@ -82,7 +82,7 @@ def list_dataset_objects(hub_api: HubApi, max_limit: int, is_recursive: bool, dataset_name: str, namespace: str, version: str) -> list: """ - List all of objects for specific dataset. + List all objects for specific dataset. Args: hub_api (class HubApi): HubApi instance. diff --git a/modelscope/msdatasets/utils/delete_utils.py b/modelscope/msdatasets/utils/delete_utils.py new file mode 100644 index 00000000..a5a6f53f --- /dev/null +++ b/modelscope/msdatasets/utils/delete_utils.py @@ -0,0 +1,32 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. + +from modelscope.hub.api import HubApi + + +class DatasetDeleteManager(object): + + def __init__(self, dataset_name: str, namespace: str, version: str): + self.api = HubApi() + self.dataset_name = dataset_name + self.namespace = namespace + self.version = version + + def delete(self, object_name: str) -> str: + + # single object + if not object_name.endswith('/'): + resp_msg = self.api.delete_oss_dataset_object( + object_name=object_name, + dataset_name=self.dataset_name, + namespace=self.namespace, + revision=self.version) + else: + # multiple objects + object_name = object_name.strip('/') + resp_msg = self.api.delete_oss_dataset_dir( + object_name=object_name, + dataset_name=self.dataset_name, + namespace=self.namespace, + revision=self.version) + + return resp_msg diff --git a/modelscope/msdatasets/utils/download_utils.py b/modelscope/msdatasets/utils/download_utils.py index b1c7a5ab..ebe9b8f5 100644 --- a/modelscope/msdatasets/utils/download_utils.py +++ b/modelscope/msdatasets/utils/download_utils.py @@ -27,7 +27,11 @@ class DatasetDownloadManager(DownloadManager): oss_config = api.get_dataset_access_config(self._dataset_name, self._namespace, self._version) - self.oss_utilities = OssUtilities(oss_config) + self.oss_utilities = OssUtilities( + oss_config=oss_config, + dataset_name=self._dataset_name, + namespace=self._namespace, + revision=self._version) def _download(self, url_or_filename: str, download_config: DownloadConfig) -> str: diff --git a/modelscope/msdatasets/utils/oss_utils.py b/modelscope/msdatasets/utils/oss_utils.py index d7d61e89..e27ff8c4 100644 --- a/modelscope/msdatasets/utils/oss_utils.py +++ b/modelscope/msdatasets/utils/oss_utils.py @@ -6,19 +6,28 @@ import os import oss2 from datasets.utils.file_utils import hash_url_to_filename +from modelscope.hub.api import HubApi +from modelscope.utils.constant import UploadMode +from modelscope.utils.logger import get_logger + +logger = get_logger() + +ACCESS_ID = 'AccessId' +ACCESS_SECRET = 'AccessSecret' +SECURITY_TOKEN = 'SecurityToken' +BUCKET = 'Bucket' +BACK_DIR = 'BackupDir' +DIR = 'Dir' + class OssUtilities: - def __init__(self, oss_config): - self.key = oss_config['AccessId'] - self.secret = oss_config['AccessSecret'] - self.token = oss_config['SecurityToken'] - self.endpoint = f"https://{oss_config['Region']}.aliyuncs.com" - self.bucket_name = oss_config['Bucket'] - auth = oss2.StsAuth(self.key, self.secret, self.token) - self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) - self.oss_dir = oss_config['Dir'] - self.oss_backup_dir = oss_config['BackupDir'] + def __init__(self, oss_config, dataset_name, namespace, revision): + self._do_init(oss_config=oss_config) + + self.dataset_name = dataset_name + self.namespace = namespace + self.revision = revision self.upload_resumable_tmp_store = '/tmp/modelscope/tmp_dataset' self.upload_multipart_threshold = 50 * 1024 * 1024 @@ -26,6 +35,28 @@ class OssUtilities: self.upload_num_threads = 4 self.upload_max_retries = 3 + self.api = HubApi() + + def _do_init(self, oss_config): + self.key = oss_config[ACCESS_ID] + self.secret = oss_config[ACCESS_SECRET] + self.token = oss_config[SECURITY_TOKEN] + self.endpoint = f"https://{oss_config['Region']}.aliyuncs.com" + self.bucket_name = oss_config[BUCKET] + auth = oss2.StsAuth(self.key, self.secret, self.token) + self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) + self.oss_dir = oss_config[DIR] + self.oss_backup_dir = oss_config[BACK_DIR] + + def _reload_sts(self): + cookies = self.api.check_local_cookies(use_cookies=True) + oss_config_refresh = self.api.get_dataset_access_config_session( + cookies=cookies, + dataset_name=self.dataset_name, + namespace=self.namespace, + revision=self.revision) + self._do_init(oss_config_refresh) + @staticmethod def _percentage(consumed_bytes, total_bytes): if total_bytes: @@ -51,7 +82,8 @@ class OssUtilities: return local_path def upload(self, oss_object_name: str, local_file_path: str, - indicate_individual_progress: bool) -> str: + indicate_individual_progress: bool, + upload_mode: UploadMode) -> str: retry_count = 0 object_key = os.path.join(self.oss_dir, oss_object_name) resumable_store = oss2.ResumableStore( @@ -64,6 +96,13 @@ class OssUtilities: while True: try: retry_count += 1 + exist = self.bucket.object_exists(object_key) + if upload_mode == UploadMode.APPEND and exist: + logger.info( + f'Skip {oss_object_name} in case of {upload_mode.value} mode.' + ) + break + oss2.resumable_upload( self.bucket, object_key, @@ -74,7 +113,9 @@ class OssUtilities: progress_callback=progress_callback, num_threads=self.upload_num_threads) break - except Exception: + except Exception as e: + if e.__getattribute__('status') == 403: + self._reload_sts() if retry_count >= self.upload_max_retries: raise diff --git a/modelscope/msdatasets/utils/upload_utils.py b/modelscope/msdatasets/utils/upload_utils.py index 2b4422b2..bbdcd9e9 100644 --- a/modelscope/msdatasets/utils/upload_utils.py +++ b/modelscope/msdatasets/utils/upload_utils.py @@ -5,6 +5,7 @@ from multiprocessing.dummy import Pool as ThreadPool from tqdm import tqdm +from modelscope.utils.constant import UploadMode from .oss_utils import OssUtilities @@ -13,38 +14,45 @@ class DatasetUploadManager(object): def __init__(self, dataset_name: str, namespace: str, version: str): from modelscope.hub.api import HubApi _hub_api = HubApi() - _cookies = _hub_api.check_cookies_upload_data(use_cookies=True) + _cookies = _hub_api.check_local_cookies(use_cookies=True) _oss_config = _hub_api.get_dataset_access_config_session( cookies=_cookies, dataset_name=dataset_name, namespace=namespace, revision=version) - self.oss_utilities = OssUtilities(_oss_config) + self.oss_utilities = OssUtilities( + oss_config=_oss_config, + dataset_name=dataset_name, + namespace=namespace, + revision=version) - def upload(self, object_name: str, local_file_path: str) -> str: + def upload(self, object_name: str, local_file_path: str, + upload_mode: UploadMode) -> str: object_key = self.oss_utilities.upload( oss_object_name=object_name, local_file_path=local_file_path, - indicate_individual_progress=True) + indicate_individual_progress=True, + upload_mode=upload_mode) return object_key def upload_dir(self, object_dir_name: str, local_dir_path: str, num_processes: int, chunksize: int, - filter_hidden_files: bool) -> int: + filter_hidden_files: bool, upload_mode: UploadMode) -> int: def run_upload(args): self.oss_utilities.upload( oss_object_name=args[0], local_file_path=args[1], - indicate_individual_progress=False) + indicate_individual_progress=False, + upload_mode=upload_mode) files_list = [] for root, dirs, files in os.walk(local_dir_path): for file_name in files: if filter_hidden_files and file_name.startswith('.'): continue - # Concatenate directory name and relative path into a oss object key. e.g., train/001/1_1230.png + # Concatenate directory name and relative path into oss object key. e.g., train/001/1_1230.png object_name = os.path.join( object_dir_name, root.replace(local_dir_path, '', 1).strip('/'), file_name) diff --git a/modelscope/utils/constant.py b/modelscope/utils/constant.py index 6394ad8a..2729b75a 100644 --- a/modelscope/utils/constant.py +++ b/modelscope/utils/constant.py @@ -238,6 +238,15 @@ class DownloadMode(enum.Enum): FORCE_REDOWNLOAD = 'force_redownload' +class UploadMode(enum.Enum): + """ How to upload object to remote. + """ + # Upload all objects from local, existing remote objects may be overwritten. (Default) + OVERWRITE = 'overwrite' + # Upload local objects in append mode, skipping all existing remote objects. + APPEND = 'append' + + class DatasetFormations(enum.Enum): """ How a dataset is organized and interpreted """ diff --git a/tests/msdatasets/test_dataset_delete.py b/tests/msdatasets/test_dataset_delete.py new file mode 100644 index 00000000..8b3c2426 --- /dev/null +++ b/tests/msdatasets/test_dataset_delete.py @@ -0,0 +1,112 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. +import os +import shutil +import tempfile +import unittest +import zipfile + +from modelscope.msdatasets import MsDataset +from modelscope.utils import logger as logging +from modelscope.utils.test_utils import test_level + +logger = logging.get_logger(__name__) + +KEY_EXTRACTED = 'extracted' +EXPECTED_MSG = 'success' + + +class DatasetDeleteTest(unittest.TestCase): + + def setUp(self): + self.old_dir = os.getcwd() + self.dataset_name = 'small_coco_for_test' + self.dataset_file_name = self.dataset_name + self.prepared_dataset_name = 'pets_small' + self.token = os.getenv('TEST_UPLOAD_MS_TOKEN') + error_msg = 'The modelscope token can not be empty, please set env variable: TEST_UPLOAD_MS_TOKEN' + self.assertIsNotNone(self.token, msg=error_msg) + from modelscope.hub.api import HubApi + from modelscope.hub.api import ModelScopeConfig + self.api = HubApi() + self.api.login(self.token) + + # get user info + self.namespace, _ = ModelScopeConfig.get_user_info() + + self.temp_dir = tempfile.mkdtemp() + self.test_work_dir = os.path.join(self.temp_dir, self.dataset_name) + if not os.path.exists(self.test_work_dir): + os.makedirs(self.test_work_dir) + + def tearDown(self): + os.chdir(self.old_dir) + shutil.rmtree(self.temp_dir, ignore_errors=True) + logger.info( + f'Temporary directory {self.temp_dir} successfully removed!') + + @staticmethod + def get_raw_downloaded_file_path(extracted_path): + raw_downloaded_file_path = '' + raw_data_dir = os.path.abspath( + os.path.join(extracted_path, '../../..')) + for root, dirs, files in os.walk(raw_data_dir): + if KEY_EXTRACTED in dirs: + for file in files: + curr_file_path = os.path.join(root, file) + if zipfile.is_zipfile(curr_file_path): + raw_downloaded_file_path = curr_file_path + return raw_downloaded_file_path + + def upload_test_file(self): + # Get the prepared data from hub, using default modelscope namespace + ms_ds_train = MsDataset.load(self.prepared_dataset_name, split='train') + config_res = ms_ds_train._hf_ds.config_kwargs + extracted_path = config_res.get('split_config').get('train') + raw_zipfile_path = self.get_raw_downloaded_file_path(extracted_path) + + object_name = self.dataset_file_name + '_for_del.zip' + MsDataset.upload( + object_name=object_name, + local_file_path=raw_zipfile_path, + dataset_name=self.dataset_name, + namespace=self.namespace) + + return object_name + + def upload_test_dir(self): + ms_ds_train = MsDataset.load(self.prepared_dataset_name, split='train') + config_train = ms_ds_train._hf_ds.config_kwargs + extracted_path_train = config_train.get('split_config').get('train') + + object_name = 'train_for_del' + MsDataset.upload( + object_name=object_name, + local_file_path=os.path.join(extracted_path_train, + 'Pets/images/train'), + dataset_name=self.dataset_name, + namespace=self.namespace) + + return object_name + '/' + + @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') + def test_ds_delete_object(self): + + # upload prepared data + file_name = self.upload_test_file() + dir_name = self.upload_test_dir() + + # delete object + del_file_msg = MsDataset.delete( + object_name=file_name, + dataset_name=self.dataset_name, + namespace=self.namespace) + del_dir_msg = MsDataset.delete( + object_name=dir_name, + dataset_name=self.dataset_name, + namespace=self.namespace) + + assert all([del_file_msg == EXPECTED_MSG, del_dir_msg == EXPECTED_MSG]) + + +if __name__ == '__main__': + unittest.main()