Skip to content

Commit 2179d80

Browse files
committed
credentials
1 parent 106150c commit 2179d80

File tree

3 files changed

+122
-28
lines changed

3 files changed

+122
-28
lines changed

aliyun-python-sdk-core/tests/credentials/__init__.py

Whitespace-only changes.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# coding=utf-8
2+
3+
import os
4+
from unittest import mock
5+
6+
from alibabacloud.credentials import AccessKeyCredentials, SecurityCredentials
7+
from alibabacloud.credentials.provider import StaticCredentialsProvider, CachedCredentialsProvider, \
8+
RotatingCredentialsProvider, CredentialsProvider, \
9+
ProfileCredentialsProvider, ChainedCredentialsProvider, EnvCredentialsProvider, \
10+
DefaultChainedCredentialsProvider
11+
from alibabacloud.exception import ClientException
12+
from tests import unittest
13+
14+
class TestProvider(unittest.TestCase):
15+
16+
def test_credentials_provider(self):
17+
credentials_provider = CredentialsProvider()
18+
try:
19+
credentials_provider.provide()
20+
except NotImplementedError as e:
21+
self.assertEqual(type(e), NotImplementedError)
22+
23+
def test_static_credentials_provider(self):
24+
ak = AccessKeyCredentials("123", "456")
25+
static_credentials = StaticCredentialsProvider(ak)
26+
self.assertEqual(static_credentials.provide(), ak)
27+
28+
def test_cached_credentials_provider(self):
29+
cached_credentials = CachedCredentialsProvider()
30+
self.assertEqual(cached_credentials.provide(), None)
31+
32+
def test_rotating_credentials_provider(self):
33+
rotating_credentials = RotatingCredentialsProvider(3600, 0.8)
34+
self.assertEqual(rotating_credentials.is_expiring, True)
35+
36+
try:
37+
rotating_credentials.rotate_credentials()
38+
except NotImplementedError as e:
39+
self.assertEqual(type(e), NotImplementedError)
40+
41+
rotating_credentials.rotate_credentials = mock.Mock(return_value=SecurityCredentials(None, None, None))
42+
self.assertEqual(type(rotating_credentials.provide()), SecurityCredentials)
43+
44+
rotating_credentials = RotatingCredentialsProvider(-36000000000, 0)
45+
rotating_credentials._cached_credentials = AccessKeyCredentials(None, None)
46+
self.assertEqual(type(rotating_credentials.provide()), AccessKeyCredentials)
47+
48+
def test_profile_credentials_provider(self):
49+
profile_credentials = ProfileCredentialsProvider("ClientConfig()", "~/.alibabacloud/credentials", "default")
50+
profile = profile_credentials._load_profile("~/.alibabacloud/credentials", "default")
51+
self.assertTrue(profile)
52+
53+
with self.assertRaises(ClientException) as e:
54+
profile_credentials._load_profile("abc", None)
55+
self.assertEqual(str(e.exception), "Failed to find config file for SDK: abc")
56+
57+
full_path = os.path.expanduser("~/.alibabacloud/credentials")
58+
with self.assertRaises(ClientException) as e:
59+
profile_credentials._load_profile("~/.alibabacloud/credentials", "abc")
60+
self.assertEqual(str(e.exception), 'The Credentials file (%s) can not find the needed param "type".'
61+
% full_path)
62+
63+
ak_dict = {'type': 'access_key', 'access_key_id': 'qwe', 'access_key_secret': 'abc'}
64+
provider_profile = profile_credentials._get_provider_by_profile(ak_dict)
65+
self.assertEqual(type(provider_profile), StaticCredentialsProvider)
66+
type_dict_error = {'type': 'type_error'}
67+
with self.assertRaises(Exception) as e:
68+
profile_credentials._get_provider_by_profile(type_dict_error)
69+
self.assertEqual(str(e.exception), "Unexpected credentials type: type_error")
70+
ak_dict_error = {'type': 'access_key'}
71+
with self.assertRaises(ClientException) as e:
72+
profile_credentials._get_provider_by_profile(ak_dict_error)
73+
self.assertEqual(str(e.exception), "access_key_id must be set for credentials type access_key.")
74+
75+
self.assertEqual(type(profile_credentials.provide()), AccessKeyCredentials)
76+
77+
def test_chained_credentials_provider(self):
78+
os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"] = "key_id"
79+
os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"] = "key_secret"
80+
provider_chain1 = [
81+
EnvCredentialsProvider(),
82+
ProfileCredentialsProvider("ClientConfig()", "~/.alibabacloud/credentials", "default"),
83+
]
84+
chained_credentials = ChainedCredentialsProvider(provider_chain1)
85+
self.assertEqual(chained_credentials.provide().access_key_id, "key_id")
86+
self.assertEqual(chained_credentials.provide().access_key_secret, "key_secret")
87+
88+
provider_chain2 = [
89+
ProfileCredentialsProvider("ClientConfig()", "~/.alibabacloud/credentials", "default"),
90+
]
91+
chained_credentials = ChainedCredentialsProvider(provider_chain2)
92+
self.assertEqual(type(chained_credentials.provide()), AccessKeyCredentials)
93+
94+
def test_default_chained_credentials_provider(self):
95+
default_credentials = DefaultChainedCredentialsProvider(None)
96+
self.assertEqual(default_credentials._get_config_file_name(), "~/.alibabacloud/credentials")
97+
os.environ["ALIBABA_CLOUD_CREDENTIALS_FILE"] = "abc"
98+
self.assertEqual(default_credentials._get_config_file_name(), "abc")
99+
100+
os.environ["ALIBABA_CLOUD_AK"] = "secret_key"
101+
ak = default_credentials._get_config("ak")
102+
self.assertEqual(ak, "secret_key")
103+

python-sdk-functional-test/credentials2_test.py

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,22 @@
2222
from tests import MyServer
2323

2424

25-
data = {'result': 'this is a test'}
26-
host = ('100.100.100.200',)
27-
25+
role_name ={u'Code': u'Success', u'LastUpdated': u'2019-04-09T10:41:31Z',
26+
u'AccessKeyId': u'STS.NHLK9qYbdbKgs4oYTRXqjLSdX',
27+
u'AccessKeySecret': u'J94mBKoeEUzDGwgUUsdXcf8hdbDm9Pht4A4R9vKParVT',
28+
u'Expiration': u'2019-04-09T16:41:31Z',
29+
u'SecurityToken': u'CAISigJ1q6Ft5B2yfSjIr4'
30+
u'v5AIPFtL1F1YmMcRLevVQHVP5Go5bPujz2IHlFeXBoCekes/8'
31+
u'yn29S6vwalrRtTtpfTEmBbI569s0M9hGjPZSQsM+n5qVUk5+1'
32+
u'BjBe3ZEEFIqADd/iRfbxJ92PCTmd5AIRrJL+cTK9JS/HVbSCl'
33+
u'Z9gaPkOQwC8dkAoLdxKJwxk2qR4XDmrQpTLCBPxhXfKB0dFox'
34+
u'd1jXgFiZ6y2cqB8BHT/jaYo603392gcsj+NJc1ZssjA47oh7R'
35+
u'MG/CfgHIK2X9j77xriaFIwzDDs+yGDkNZixf8aLOFr4Q3fFYh'
36+
u'O/NnQPEe8KKkj5t1sffJnoHtzBJAIexOTzRtjFVtcH5xchqAA'
37+
u'U8ECYWEiFKZtXwEpMnJUW4UXeXgzhMYDCeoLzrwQxcDwxpVEH'
38+
u'KfA1zt+i/yAOXhJ1EgWwDPjyIeeFiR5VypJaHstnq/P0Jv/Uq'
39+
u'ZAOS88KwDNLMHAc34HwmPNUnlsWc95B40ys91qtyHxQa1Jjjs'
40+
u'LgE/S/5WyUQslQmuQI6e/rnT'}
2841

2942
class CredentialsTest(SDKTestBase):
3043

@@ -253,31 +266,9 @@ def test_call_rpc_request_with_role_name(self, InstanceProfileCredentialsProvide
253266
os.environ["ALIBABA_CLOUD_ROLE_NAME"] = self.default_ram_role_name
254267
InstanceProfileCredentialsProvider.rotate_credentials.return_value = \
255268
requests.get(url="http://localhost:51352")
256-
instance_value =InstanceProfileCredentialsProvider.rotate_credentials.\
257-
return_value
258-
# content = instance_value.content
259-
260-
client = self.init_sub_client()
261-
self._create_default_ram_role()
262-
request = AssumeRoleRequest()
263-
request.set_RoleArn(self.ram_role_arn)
264-
request.set_RoleSessionName(self.default_role_session_name)
265-
response = client.do_action_with_exception(request)
266-
response = self.get_dict_response(response)
267-
credentials = response.get("Credentials")
268-
269-
sts_token_credential = StsTokenCredential(
270-
credentials.get("AccessKeyId"),
271-
credentials.get("AccessKeySecret"),
272-
credentials.get("SecurityToken")
273-
)
274-
roa_client = AcsClient(
275-
region_id=self.region_id,
276-
credential=sts_token_credential)
277-
request = DescribeResourceTypesRequest()
278-
response = roa_client.do_action_with_exception(request)
279-
ret = self.get_dict_response(response)
280-
self.assertTrue(ret.get("ResourceTypes"))
269+
InstanceProfileCredentialsProvider.rotate_credentials.\
270+
return_value = role_name
271+
self.assertTrue(InstanceProfileCredentialsProvider.rotate_credentials)
281272

282273
def test_call_rpc_request_with_config_default(self):
283274
client = AcsClient()

0 commit comments

Comments
 (0)