From e8c64bc9749b1f0a9f9e6bb48c31c2e66b1857a4 Mon Sep 17 00:00:00 2001 From: Ye Wenbin Date: Thu, 7 May 2026 13:14:59 +0800 Subject: [PATCH 1/2] feat(alibaba): support custom endpoint in OSSHook for VPC internal access OSSHook._get_client now reads extra.endpoint from the connection config, enabling VPC internal endpoints (e.g. oss-cn-hangzhou-internal.aliyuncs.com) instead of being limited to public endpoints. Fixes: scheduler/task pods in the same VPC as OSS can now use the lower-latency internal endpoint, avoiding unnecessary public traffic. Co-Authored-By: Claude Sonnet 4.6 --- .../providers/alibaba/cloud/hooks/oss.py | 4 ++- .../unit/alibaba/cloud/hooks/test_oss.py | 26 +++++++++++++++++++ .../unit/alibaba/cloud/utils/oss_mock.py | 21 +++++++-------- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py index c1e7b5b2924e1..8e82e1a827675 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py @@ -97,7 +97,9 @@ def get_conn(self) -> Connection: def _get_client(self) -> oss.Client: config = oss.config.load_default() config.region = self.region - config.endpoint = f"oss-{self.region}.aliyuncs.com" + # Prefer extra.endpoint (e.g. VPC internal endpoint) over default public endpoint + endpoint = self.oss_conn.extra_dejson.get("endpoint") + config.endpoint = endpoint or f"oss-{self.region}.aliyuncs.com" config.credentials_provider = self.get_credential() return oss.Client(config) diff --git a/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py b/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py index 19d95cae4e716..4ef60c92b54fa 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py @@ -58,6 +58,32 @@ def test_get_credential(self, mock_provider): self.hook.get_credential() mock_provider.assert_called_once_with("mock_access_key_id", "mock_access_key_secret") + @mock.patch(OSS_STRING.format("oss.Client")) + @mock.patch(OSS_STRING.format("oss.config.load_default")) + @mock.patch(OSS_STRING.format("OSSHook.get_credential")) + def test_get_client_default_endpoint(self, mock_cred, mock_config, mock_client): + self.hook._get_client() + _, kwargs = mock_config.call_args + assert kwargs["endpoint"] == "oss-mock_region.aliyuncs.com" + + @mock.patch(OSS_STRING.format("oss.Client")) + @mock.patch(OSS_STRING.format("oss.config.load_default")) + @mock.patch(OSS_STRING.format("OSSHook.get_credential")) + def test_get_client_custom_endpoint(self, mock_cred, mock_config, mock_client): + with mock.patch( + OSS_STRING.format("OSSHook.__init__"), + new=mock_oss_hook_default_project_id, + ): + hook = OSSHook( + oss_conn_id=MOCK_OSS_CONN_ID, + region="mock_region", + ) + # Simulate custom VPC internal endpoint via extra + hook.oss_conn.extra_dejson["endpoint"] = "oss-cn-hangzhou-internal.aliyuncs.com" + hook._get_client() + _, kwargs = mock_config.call_args + assert kwargs["endpoint"] == "oss-cn-hangzhou-internal.aliyuncs.com" + @mock.patch(OSS_STRING.format("OSSHook._get_client")) def test_get_bucket(self, mock_get_client): self.hook.get_bucket("mock_bucket_name") diff --git a/providers/alibaba/tests/unit/alibaba/cloud/utils/oss_mock.py b/providers/alibaba/tests/unit/alibaba/cloud/utils/oss_mock.py index f697d1e6960c6..7bef2636d8dab 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/utils/oss_mock.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/utils/oss_mock.py @@ -24,16 +24,15 @@ OSS_PROJECT_ID_HOOK_UNIT_TEST = "example-project" -def mock_oss_hook_default_project_id(self, oss_conn_id="mock_oss_default", region="mock_region"): +def mock_oss_hook_default_project_id(self, oss_conn_id="mock_oss_default", region="mock_region", endpoint=None): self.oss_conn_id = oss_conn_id - self.oss_conn = Connection( - extra=json.dumps( - { - "auth_type": "AK", - "access_key_id": "mock_access_key_id", - "access_key_secret": "mock_access_key_secret", - "region": "mock_region", - } - ) - ) + extra = { + "auth_type": "AK", + "access_key_id": "mock_access_key_id", + "access_key_secret": "mock_access_key_secret", + "region": region, + } + if endpoint is not None: + extra["endpoint"] = endpoint + self.oss_conn = Connection(extra=json.dumps(extra)) self.region = region From 6dcb655a1179cf9e7923a3d2dc0582284a9f8901 Mon Sep 17 00:00:00 2001 From: Ye Wenbin Date: Thu, 7 May 2026 16:47:53 +0800 Subject: [PATCH 2/2] fix(alibaba): add oss_log_exists and oss_read wrappers to OSSTaskHandler OSSTaskHandler._read() calls self.oss_log_exists() and self.oss_read() but these methods are only defined in OSSRemoteLogIO base class, not in OSSTaskHandler itself. This causes AttributeError when the UI tries to read task logs from OSS after a task completes. Add oss_log_exists() and oss_read() wrapper methods to OSSTaskHandler that proxy to self.io (OSSRemoteLogIO instance). Co-Authored-By: Claude Sonnet 4.6 --- .../alibaba/cloud/log/oss_task_handler.py | 8 +++++ .../cloud/log/test_oss_task_handler.py | 29 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py index 4a50538fc9aa5..ef7d533cab928 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -228,6 +228,14 @@ def close(self): # Mark closed so we don't double write if close is called twice self.closed = True + def oss_log_exists(self, remote_log_location: str) -> bool: + """Check if remote_log_location exists in OSS, proxy to self.io.""" + return self.io.oss_log_exists(remote_log_location) + + def oss_read(self, remote_log_location: str, return_error: bool = False) -> str: + """Read log content from OSS, proxy to self.io.""" + return self.io.oss_read(remote_log_location, return_error=return_error) + def _read(self, ti, try_number, metadata=None): """ Read logs of given task instance and try_number from OSS remote storage. diff --git a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py index cfdcdcab48077..dc3c4b1608174 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py @@ -205,6 +205,35 @@ def test_upload_passes_relative_path_to_oss_write(self, mock_oss_write, tmp_path handler.io.upload(str(log_file), self.ti) mock_oss_write.assert_called_once_with("test log content", relative_path) + @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSTaskHandler.oss_log_exists")) + @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSTaskHandler.oss_read")) + def test_read_uses_oss_log_exists_and_oss_read(self, mock_oss_read, mock_oss_log_exists): + """Test that _read calls oss_log_exists and oss_read on the handler itself.""" + mock_oss_log_exists.return_value = True + mock_oss_read.return_value = "log content" + + # _read should call self.oss_log_exists and self.oss_read (not self.io.*) + self.oss_task_handler._read(self.ti, try_number=1) + + mock_oss_log_exists.assert_called_once() + mock_oss_read.assert_called_once() + + @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSRemoteLogIO.oss_log_exists")) + def test_handler_oss_log_exists_proxies_to_io(self, mock_io_exists): + """Test that OSSTaskHandler.oss_log_exists proxies to self.io.oss_log_exists.""" + mock_io_exists.return_value = True + result = self.oss_task_handler.oss_log_exists("1.log") + mock_io_exists.assert_called_once_with("1.log") + assert result is True + + @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSRemoteLogIO.oss_read")) + def test_handler_oss_read_proxies_to_io(self, mock_io_read): + """Test that OSSTaskHandler.oss_read proxies to self.io.oss_read.""" + mock_io_read.return_value = "log content" + result = self.oss_task_handler.oss_read("1.log", return_error=True) + mock_io_read.assert_called_once_with("1.log", return_error=True) + assert result == "log content" + def test_filename_template_for_backward_compatibility(self): # filename_template arg support for running the latest provider on airflow 2 OSSTaskHandler(self.base_log_folder, self.oss_log_folder, filename_template=None)