From 52ed360376840d53d1d053ba3439ec2720f0e174 Mon Sep 17 00:00:00 2001 From: zhuxiaolong37 Date: Mon, 22 Sep 2025 15:44:51 +0800 Subject: [PATCH] Add progress to uploader --- alibabacloud_oss_v2/checkpoint.py | 2 +- alibabacloud_oss_v2/uploader.py | 12 +++ sample/progress_uploader_file.py | 58 +++++++++++ sample/progress_uploader_from.py | 60 +++++++++++ tests/unit/test_uploader.py | 160 ++++++++++++++++++++++++++++++ 5 files changed, 291 insertions(+), 1 deletion(-) create mode 100644 sample/progress_uploader_file.py create mode 100644 sample/progress_uploader_from.py create mode 100644 tests/unit/test_uploader.py diff --git a/alibabacloud_oss_v2/checkpoint.py b/alibabacloud_oss_v2/checkpoint.py index 675ee99..b1467f9 100644 --- a/alibabacloud_oss_v2/checkpoint.py +++ b/alibabacloud_oss_v2/checkpoint.py @@ -265,7 +265,7 @@ def __init__( h.update(absfilepath.encode()) src_hash = h.hexdigest() - if len(basedir) == 0: + if basedir is None or len(basedir) == 0: dirbase = gettempdir() else: dirbase = os.path.dirname(basedir) diff --git a/alibabacloud_oss_v2/uploader.py b/alibabacloud_oss_v2/uploader.py index 4829b5a..4677ccd 100644 --- a/alibabacloud_oss_v2/uploader.py +++ b/alibabacloud_oss_v2/uploader.py @@ -634,8 +634,20 @@ def _upload_part(self, part): except Exception as err: error = err + self._update_progress(size) + return part_number, etag, error, hash_crc64, size + def _update_progress(self, increment: int): + if self._progress_lock: + with self._progress_lock: + self._transferred += increment + if self._request.progress_fn is not None: + self._request.progress_fn(increment, self._transferred, self._total_size) + else: + self._transferred += increment + if self._request.progress_fn is not None: + self._request.progress_fn(increment, self._transferred, self._total_size) def _save_error(self, error) -> None: if self._upload_part_lock: diff --git a/sample/progress_uploader_file.py b/sample/progress_uploader_file.py new file mode 100644 index 0000000..a9c39b0 --- /dev/null +++ b/sample/progress_uploader_file.py @@ -0,0 +1,58 @@ +import argparse +import alibabacloud_oss_v2 as oss + +parser = argparse.ArgumentParser(description="progress upload file sample") +parser.add_argument('--region', help='The region in which the bucket is located.', required=True) +parser.add_argument('--bucket', help='The name of the bucket.', required=True) +parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS') +parser.add_argument('--key', help='The name of the object.', required=True) +parser.add_argument('--file_path', help='The path of Upload file.', required=True) + + +class UploadProgress: + def __init__(self): + self.bytes_transferred = 0 + + def __call__(self, increment, written, total): + self.bytes_transferred += increment + rate = int(100 * (float(written) / float(total))) + print(f'\r{rate}% ', end='') + + +def main(): + + args = parser.parse_args() + + # Loading credentials values from the environment variables + credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() + + # Using the SDK's default configuration + cfg = oss.config.load_default() + cfg.credentials_provider = credentials_provider + cfg.region = args.region + if args.endpoint is not None: + cfg.endpoint = args.endpoint + + client = oss.Client(cfg) + + up_loader = client.uploader(part_size=100*1024, + parallel_num=5, + leave_parts_on_error=True, + enable_checkpoint=True, + ) + + # Create progress tracker + progress_tracker = UploadProgress() + + result = up_loader.upload_file(oss.PutObjectRequest( + bucket=args.bucket, + key=args.key, + progress_fn=progress_tracker, + ), filepath=args.file_path) + + print(vars(result)) + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/sample/progress_uploader_from.py b/sample/progress_uploader_from.py new file mode 100644 index 0000000..b22c9fc --- /dev/null +++ b/sample/progress_uploader_from.py @@ -0,0 +1,60 @@ +import argparse +import alibabacloud_oss_v2 as oss + +parser = argparse.ArgumentParser(description="progress upload from sample") +parser.add_argument('--region', help='The region in which the bucket is located.', required=True) +parser.add_argument('--bucket', help='The name of the bucket.', required=True) +parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS') +parser.add_argument('--key', help='The name of the object.', required=True) +parser.add_argument('--file_path', help='The path of Upload file.', required=True) + + +class UploadProgress: + def __init__(self): + self.bytes_transferred = 0 + + def __call__(self, increment, written, total): + self.bytes_transferred += increment + rate = int(100 * (float(written) / float(total))) + print(f'\r{rate}% ') + + +def main(): + + args = parser.parse_args() + + # Loading credentials values from the environment variables + credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() + + # Using the SDK's default configuration + cfg = oss.config.load_default() + cfg.credentials_provider = credentials_provider + cfg.region = args.region + if args.endpoint is not None: + cfg.endpoint = args.endpoint + + client = oss.Client(cfg) + + up_loader = client.uploader() + + # up_loader = client.uploader(part_size=100*1024, + # parallel_num=5, + # leave_parts_on_error=True, + # enable_checkpoint=True, + # checkpoint_dir=args.file_path) + + # Create progress tracker + progress_tracker = UploadProgress() + + with open(file=args.file_path, mode='rb') as f: + result = up_loader.upload_from(oss.PutObjectRequest( + bucket=args.bucket, + key=args.key, + progress_fn=progress_tracker, + ), reader=f) + + print(vars(result)) + +if __name__ == "__main__": + main() + diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py new file mode 100644 index 0000000..7baf86c --- /dev/null +++ b/tests/unit/test_uploader.py @@ -0,0 +1,160 @@ +import unittest +from unittest.mock import Mock +from alibabacloud_oss_v2.uploader import _UploaderDelegate, Uploader, UploadAPIClient, UploaderOptions +from alibabacloud_oss_v2 import models + +class TestUpdateProgress(unittest.TestCase): + """Unit test class for _update_progress method""" + + def setUp(self): + """Test preparation""" + # Create required mock objects + base = Mock(spec=Uploader) + client = Mock(spec=UploadAPIClient) + + # Create actual UploaderOptions object instead of mock + options = UploaderOptions( + part_size=1024 * 1024, # 1MB + parallel_num=3, + leave_parts_on_error=False, + enable_checkpoint=False, + checkpoint_dir=None + ) + + # Create request object + request = models.PutObjectRequest( + bucket='test-bucket', + key='test-key' + ) + + # Create _UploaderDelegate instance + self.uploader_delegate = _UploaderDelegate( + base=base, + client=client, + request=request, + options=options + ) + + # Initialize necessary attributes + self.uploader_delegate._transferred = 0 + self.uploader_delegate._total_size = 1000 + self.uploader_delegate._request = request + self.uploader_delegate._request.progress_fn = None + + def test_update_progress_with_lock_and_callback(self): + """Test case with progress lock and callback function""" + # Set test data + increment = 100 + + # Create Mock lock object + mock_lock = Mock() + mock_lock.__enter__ = Mock(return_value=mock_lock) + mock_lock.__exit__ = Mock(return_value=None) + + # Set uploader delegate attributes + self.uploader_delegate._progress_lock = mock_lock + callback_mock = Mock() + self.uploader_delegate._request.progress_fn = callback_mock + + # Call the method under test + self.uploader_delegate._update_progress(increment) + + # Verify results + # 1. Verify lock usage + mock_lock.__enter__.assert_called_once() + mock_lock.__exit__.assert_called_once() + # 2. Verify transferred bytes update + self.assertEqual(self.uploader_delegate._transferred, increment) + # 3. Verify callback function is called + callback_mock.assert_called_once_with(increment, increment, self.uploader_delegate._total_size) + + def test_update_progress_with_lock_no_callback(self): + """Test case with progress lock but no callback function""" + # Set test data + increment = 50 + + # Create Mock lock object + mock_lock = Mock() + mock_lock.__enter__ = Mock(return_value=mock_lock) + mock_lock.__exit__ = Mock(return_value=None) + + # Set uploader delegate attributes + self.uploader_delegate._progress_lock = mock_lock + self.uploader_delegate._request.progress_fn = None + + # Call the method under test + self.uploader_delegate._update_progress(increment) + + # Verify results + # 1. Verify lock usage + mock_lock.__enter__.assert_called_once() + mock_lock.__exit__.assert_called_once() + # 2. Verify transferred bytes update + self.assertEqual(self.uploader_delegate._transferred, increment) + # 3. Verify callback function is not called + self.assertIsNone(self.uploader_delegate._request.progress_fn) + + def test_update_progress_without_lock_with_callback(self): + """Test case without progress lock but with callback function""" + # Set test data + increment = 200 + + # Set uploader delegate attributes + self.uploader_delegate._progress_lock = None + callback_mock = Mock() + self.uploader_delegate._request.progress_fn = callback_mock + + # Call the method under test + self.uploader_delegate._update_progress(increment) + + # Verify results + # 1. Verify transferred bytes update + self.assertEqual(self.uploader_delegate._transferred, increment) + # 2. Verify callback function is called + callback_mock.assert_called_once_with(increment, increment, self.uploader_delegate._total_size) + + def test_update_progress_without_lock_no_callback(self): + """Test case without progress lock and callback function""" + # Set test data + increment = 75 + + # Set uploader delegate attributes + self.uploader_delegate._progress_lock = None + self.uploader_delegate._request.progress_fn = None + + # Call the method under test + self.uploader_delegate._update_progress(increment) + + # Verify results + # 1. Verify transferred bytes update + self.assertEqual(self.uploader_delegate._transferred, increment) + # 2. Verify callback function is not called + self.assertIsNone(self.uploader_delegate._request.progress_fn) + + def test_update_progress_multiple_calls(self): + """Test multiple calls to _update_progress method""" + # Set uploader delegate attributes + self.uploader_delegate._progress_lock = None + callback_mock = Mock() + self.uploader_delegate._request.progress_fn = callback_mock + + # Call the method under test multiple times + increments = [100, 50, 25] + expected_transferred = 0 + + for increment in increments: + expected_transferred += increment + self.uploader_delegate._update_progress(increment) + + # Verify state after each call + self.assertEqual(self.uploader_delegate._transferred, expected_transferred) + + # Verify callback function is called the correct number of times + self.assertEqual(callback_mock.call_count, len(increments)) + + # Verify parameters of the last call + callback_mock.assert_called_with(increments[-1], expected_transferred, self.uploader_delegate._total_size) + + +if __name__ == '__main__': + unittest.main()