|
10 | 10 | load_dotenv() # take environment variables from .env.
|
11 | 11 | except:
|
12 | 12 | pass
|
13 |
| - |
| 13 | +import base64 |
14 | 14 | import logging
|
15 | 15 | import os
|
16 | 16 | import json
|
@@ -277,48 +277,6 @@ def _test_acquire_token_interactive(
|
277 | 277 | return result # For further testing
|
278 | 278 |
|
279 | 279 |
|
280 |
| -class SshCertTestCase(E2eTestCase): |
281 |
| - _JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}""" |
282 |
| - _JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}""" |
283 |
| - DATA1 = {"token_type": "ssh-cert", "key_id": "key1", "req_cnf": _JWK1} |
284 |
| - DATA2 = {"token_type": "ssh-cert", "key_id": "key2", "req_cnf": _JWK2} |
285 |
| - _SCOPE_USER = ["https://pas.windows.net/CheckMyAccess/Linux/user_impersonation"] |
286 |
| - _SCOPE_SP = ["https://pas.windows.net/CheckMyAccess/Linux/.default"] |
287 |
| - SCOPE = _SCOPE_SP # Historically there was a separation, at 2021 it is unified |
288 |
| - |
289 |
| - def test_ssh_cert_for_service_principal(self): |
290 |
| - # Any SP can obtain an ssh-cert. Here we use the lab app. |
291 |
| - result = get_lab_app().acquire_token_for_client(self.SCOPE, data=self.DATA1) |
292 |
| - self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format( |
293 |
| - result.get("error"), result.get("error_description"))) |
294 |
| - self.assertEqual("ssh-cert", result["token_type"]) |
295 |
| - |
296 |
| - def test_ssh_cert_for_user_should_work_with_any_account(self): |
297 |
| - result = self._test_acquire_token_interactive( |
298 |
| - client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is one |
299 |
| - # of the only 2 clients that are PreAuthz to use ssh cert feature |
300 |
| - authority="https://login.microsoftonline.com/common", |
301 |
| - scope=self.SCOPE, |
302 |
| - data=self.DATA1, |
303 |
| - username_uri="https://msidlab.com/api/user?usertype=cloud", |
304 |
| - prompt="none" if msal.application._is_running_in_cloud_shell() else None, |
305 |
| - ) # It already tests reading AT from cache, and using RT to refresh |
306 |
| - # acquire_token_silent() would work because we pass in the same key |
307 |
| - self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format( |
308 |
| - result.get("error"), result.get("error_description"))) |
309 |
| - self.assertEqual("ssh-cert", result["token_type"]) |
310 |
| - logger.debug("%s.cache = %s", |
311 |
| - self.id(), json.dumps(self.app.token_cache._cache, indent=4)) |
312 |
| - |
313 |
| - # refresh_token grant can fetch an ssh-cert bound to a different key |
314 |
| - account = self.app.get_accounts()[0] |
315 |
| - refreshed_ssh_cert = self.app.acquire_token_silent( |
316 |
| - self.SCOPE, account=account, data=self.DATA2) |
317 |
| - self.assertIsNotNone(refreshed_ssh_cert) |
318 |
| - self.assertEqual(refreshed_ssh_cert["token_type"], "ssh-cert") |
319 |
| - self.assertNotEqual(result["access_token"], refreshed_ssh_cert['access_token']) |
320 |
| - |
321 |
| - |
322 | 280 | @unittest.skipUnless(
|
323 | 281 | msal.application._is_running_in_cloud_shell(),
|
324 | 282 | "Manually run this test case from inside Cloud Shell")
|
@@ -697,6 +655,85 @@ def _test_acquire_token_by_client_secret(
|
697 | 655 | self.assertCacheWorksForApp(result, scope)
|
698 | 656 |
|
699 | 657 |
|
| 658 | +class PopWithExternalKeyTestCase(LabBasedTestCase): |
| 659 | + def _test_service_principal(self): |
| 660 | + # Any SP can obtain an ssh-cert. Here we use the lab app. |
| 661 | + result = get_lab_app().acquire_token_for_client(self.SCOPE, data=self.DATA1) |
| 662 | + self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format( |
| 663 | + result.get("error"), result.get("error_description"))) |
| 664 | + self.assertEqual(self.EXPECTED_TOKEN_TYPE, result["token_type"]) |
| 665 | + |
| 666 | + def _test_user_account(self): |
| 667 | + lab_user = self.get_lab_user(usertype="cloud") |
| 668 | + result = self._test_acquire_token_interactive( |
| 669 | + client_id="04b07795-8ddb-461a-bbee-02f9e1bf7b46", # Azure CLI is one |
| 670 | + # of the only 2 clients that are PreAuthz to use ssh cert feature |
| 671 | + authority="https://login.microsoftonline.com/common", |
| 672 | + scope=self.SCOPE, |
| 673 | + data=self.DATA1, |
| 674 | + username=lab_user["username"], |
| 675 | + lab_name=lab_user["lab_name"], |
| 676 | + prompt="none" if msal.application._is_running_in_cloud_shell() else None, |
| 677 | + ) # It already tests reading AT from cache, and using RT to refresh |
| 678 | + # acquire_token_silent() would work because we pass in the same key |
| 679 | + self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format( |
| 680 | + result.get("error"), result.get("error_description"))) |
| 681 | + self.assertEqual(self.EXPECTED_TOKEN_TYPE, result["token_type"]) |
| 682 | + logger.debug("%s.cache = %s", |
| 683 | + self.id(), json.dumps(self.app.token_cache._cache, indent=4)) |
| 684 | + |
| 685 | + # refresh_token grant can fetch an ssh-cert bound to a different key |
| 686 | + account = self.app.get_accounts()[0] |
| 687 | + refreshed_ssh_cert = self.app.acquire_token_silent( |
| 688 | + self.SCOPE, account=account, data=self.DATA2) |
| 689 | + self.assertIsNotNone(refreshed_ssh_cert) |
| 690 | + self.assertEqual(self.EXPECTED_TOKEN_TYPE, refreshed_ssh_cert["token_type"]) |
| 691 | + self.assertNotEqual(result["access_token"], refreshed_ssh_cert['access_token']) |
| 692 | + |
| 693 | + |
| 694 | +class SshCertTestCase(PopWithExternalKeyTestCase): |
| 695 | + EXPECTED_TOKEN_TYPE = "ssh-cert" |
| 696 | + _JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}""" |
| 697 | + _JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}""" |
| 698 | + DATA1 = {"token_type": "ssh-cert", "key_id": "key1", "req_cnf": _JWK1} |
| 699 | + DATA2 = {"token_type": "ssh-cert", "key_id": "key2", "req_cnf": _JWK2} |
| 700 | + _SCOPE_USER = ["https://pas.windows.net/CheckMyAccess/Linux/user_impersonation"] |
| 701 | + _SCOPE_SP = ["https://pas.windows.net/CheckMyAccess/Linux/.default"] |
| 702 | + SCOPE = _SCOPE_SP # Historically there was a separation, at 2021 it is unified |
| 703 | + |
| 704 | + def test_service_principal(self): |
| 705 | + self._test_service_principal() |
| 706 | + |
| 707 | + def test_user_account(self): |
| 708 | + self._test_user_account() |
| 709 | + |
| 710 | + |
| 711 | +def _data_for_pop(key): |
| 712 | + raw_req_cnf = json.dumps({"kid": key, "xms_ksl": "sw"}) |
| 713 | + return { # Sampled from Azure CLI's plugin connectedk8s |
| 714 | + 'token_type': 'pop', |
| 715 | + 'key_id': key, |
| 716 | + "req_cnf": base64.urlsafe_b64encode(raw_req_cnf.encode('utf-8')).decode('utf-8').rstrip('='), |
| 717 | + # Note: Sending raw_req_cnf without base64 encoding would result in an http 500 error |
| 718 | + } # See also https://github.com/Azure/azure-cli-extensions/blob/main/src/connectedk8s/azext_connectedk8s/_clientproxyutils.py#L86-L92 |
| 719 | + |
| 720 | + |
| 721 | +class AtPopWithExternalKeyTestCase(PopWithExternalKeyTestCase): |
| 722 | + EXPECTED_TOKEN_TYPE = "pop" |
| 723 | + DATA1 = _data_for_pop('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA-AAAAAAAA') # Fake key with a certain format and length |
| 724 | + DATA2 = _data_for_pop('BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB-BBBBBBBB') # Fake key with a certain format and length |
| 725 | + SCOPE = [ |
| 726 | + '6256c85f-0aad-4d50-b960-e6e9b21efe35/.default', # Azure CLI's connectedk8s plugin uses this |
| 727 | + # https://github.com/Azure/azure-cli-extensions/pull/4468/files#diff-a47efa3186c7eb4f1176e07d0b858ead0bf4a58bfd51e448ee3607a5b4ef47f6R116 |
| 728 | + ] |
| 729 | + |
| 730 | + def test_service_principal(self): |
| 731 | + self._test_service_principal() |
| 732 | + |
| 733 | + def test_user_account(self): |
| 734 | + self._test_user_account() |
| 735 | + |
| 736 | + |
700 | 737 | class WorldWideTestCase(LabBasedTestCase):
|
701 | 738 |
|
702 | 739 | def test_aad_managed_user(self): # Pure cloud
|
|
0 commit comments