Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
SandboxConnectionConfig,
SandboxDirectConnectionConfig,
SandboxGatewayConnectionConfig,
SandboxLocalTunnelConnectionConfig
SandboxInClusterConnectionConfig,
SandboxLocalTunnelConnectionConfig,
)
from .k8s_helper import K8sHelper
from .exceptions import (
Expand Down Expand Up @@ -52,6 +53,10 @@ def verify_connection(self):
"""Checks if the connection is healthy. Raises SandboxPortForwardError if not."""
pass

def should_inject_router_headers(self) -> bool:
"""Returns True if X-Sandbox-* router headers should be injected into requests."""
return True

class DirectConnectionStrategy(ConnectionStrategy):
def __init__(self, config: SandboxDirectConnectionConfig):
self.config = config
Expand Down Expand Up @@ -168,6 +173,36 @@ def verify_connection(self):
f"Stderr: {stderr.decode(errors='replace')}"
)

class InClusterConnectionStrategy(ConnectionStrategy):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vicentefb is this okay to do so?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any update on this ?

"""Provides direct in-cluster connectivity to a sandbox pod, bypassing the router.

Requires the SDK to run inside the same Kubernetes cluster as the sandbox.
Router-specific request headers are not injected.
"""

def __init__(
self,
sandbox_id: str,
namespace: str,
config: SandboxInClusterConnectionConfig,
):
self._base_url = (
f"http://{sandbox_id}.{namespace}"
f".svc.cluster.local:{config.server_port}"
)

def connect(self) -> str:
return self._base_url

def verify_connection(self):
pass

def close(self):
pass

def should_inject_router_headers(self) -> bool:
return False

class SandboxConnector:
"""
Manages the connection to the Sandbox, including auto-discovery and port-forwarding.
Expand Down Expand Up @@ -207,6 +242,8 @@ def _connection_strategy(self):
return GatewayConnectionStrategy(self.connection_config, self.k8s_helper)
elif isinstance(self.connection_config, SandboxLocalTunnelConnectionConfig):
return LocalTunnelConnectionStrategy(self.id, self.namespace, self.connection_config)
elif isinstance(self.connection_config, SandboxInClusterConnectionConfig):
return InClusterConnectionStrategy(self.id, self.namespace, self.connection_config)
else:
raise ValueError("Unknown connection configuration type")

Expand All @@ -231,9 +268,10 @@ def send_request(self, method: str, endpoint: str, **kwargs) -> requests.Respons
url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"

headers = kwargs.get("headers", {}).copy()
headers["X-Sandbox-ID"] = self.id
headers["X-Sandbox-Namespace"] = self.namespace
headers["X-Sandbox-Port"] = str(self.connection_config.server_port)
if self.strategy.should_inject_router_headers():
headers["X-Sandbox-ID"] = self.id
headers["X-Sandbox-Namespace"] = self.namespace
headers["X-Sandbox-Port"] = str(self.connection_config.server_port)
kwargs["headers"] = headers

# Send the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,21 @@ class SandboxLocalTunnelConnectionConfig(BaseModel):
port_forward_ready_timeout: int = 30 # Timeout in seconds to wait for port-forward to be ready.
server_port: int = 8888 # Port the sandbox container listens on.

SandboxConnectionConfig = Union[SandboxDirectConnectionConfig, SandboxGatewayConnectionConfig, SandboxLocalTunnelConnectionConfig]
class SandboxInClusterConnectionConfig(BaseModel):
"""Configuration for direct in-cluster connection to the sandbox pod via K8s DNS.

Constructs the URL as http://{sandbox_id}.{namespace}.svc.cluster.local:{server_port}
and bypasses the router entirely. No external config is required beyond what
the Sandbox already knows (id, namespace, port).
"""
server_port: int = 8888 # Port the sandbox container listens on.

SandboxConnectionConfig = Union[
SandboxDirectConnectionConfig,
SandboxGatewayConnectionConfig,
SandboxLocalTunnelConnectionConfig,
SandboxInClusterConnectionConfig,
]

class SandboxTracerConfig(BaseModel):
"""Configuration for tracer level information"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Copyright 2026 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
from unittest.mock import MagicMock

import requests

from k8s_agent_sandbox.connector import (
DirectConnectionStrategy,
GatewayConnectionStrategy,
LocalTunnelConnectionStrategy,
InClusterConnectionStrategy,
SandboxConnector,
)
from k8s_agent_sandbox.models import (
SandboxDirectConnectionConfig,
SandboxGatewayConnectionConfig,
SandboxLocalTunnelConnectionConfig,
SandboxInClusterConnectionConfig,
)


class TestInClusterConnectionStrategy(unittest.TestCase):
"""Unit tests for InClusterConnectionStrategy."""

def setUp(self):
self.config = SandboxInClusterConnectionConfig(server_port=8888)
self.strategy = InClusterConnectionStrategy(
sandbox_id="my-sandbox",
namespace="dev",
config=self.config,
)

def test_connect_returns_correct_dns_url(self):
url = self.strategy.connect()
self.assertEqual(url, "http://my-sandbox.dev.svc.cluster.local:8888")

def test_connect_uses_custom_port(self):
config = SandboxInClusterConnectionConfig(server_port=9000)
strategy = InClusterConnectionStrategy("sb", "ns", config)
self.assertEqual(strategy.connect(), "http://sb.ns.svc.cluster.local:9000")

def test_connect_is_idempotent(self):
self.assertEqual(self.strategy.connect(), self.strategy.connect())

def test_does_not_inject_router_headers(self):
self.assertFalse(self.strategy.should_inject_router_headers())

def test_verify_connection_does_not_raise(self):
self.strategy.verify_connection()

def test_close_does_not_raise(self):
self.strategy.close()


class TestExistingStrategiesDefaultHeaderInjection(unittest.TestCase):
"""Regression: existing strategies must still inject router headers by default."""

def test_direct_injects_headers(self):
s = DirectConnectionStrategy(SandboxDirectConnectionConfig(api_url="http://x"))
self.assertTrue(s.should_inject_router_headers())

def test_gateway_injects_headers(self):
s = GatewayConnectionStrategy(
SandboxGatewayConnectionConfig(gateway_name="gw"),
k8s_helper=MagicMock(),
)
self.assertTrue(s.should_inject_router_headers())

def test_local_tunnel_injects_headers(self):
s = LocalTunnelConnectionStrategy(
sandbox_id="s", namespace="ns",
config=SandboxLocalTunnelConnectionConfig(),
)
self.assertTrue(s.should_inject_router_headers())


class TestSandboxConnectorStrategySelection(unittest.TestCase):
def _make_connector(self, config):
return SandboxConnector(
sandbox_id="sb",
namespace="ns",
connection_config=config,
k8s_helper=MagicMock(),
)

def test_selects_in_cluster_strategy(self):
config = SandboxInClusterConnectionConfig()
connector = self._make_connector(config)
self.assertIsInstance(connector.strategy, InClusterConnectionStrategy)

def test_selects_direct_strategy(self):
config = SandboxDirectConnectionConfig(api_url="http://x")
connector = self._make_connector(config)
self.assertIsInstance(connector.strategy, DirectConnectionStrategy)

def test_raises_on_unknown_config_type(self):
with self.assertRaises((ValueError, Exception)):
SandboxConnector(
sandbox_id="sb",
namespace="ns",
connection_config=object(),
k8s_helper=MagicMock(),
)


class TestSandboxConnectorHeaderInjection(unittest.TestCase):
def _make_connector_with_strategy(self, strategy, config):
connector = SandboxConnector(
sandbox_id="my-sb",
namespace="my-ns",
connection_config=config,
k8s_helper=MagicMock(),
)
connector.strategy = strategy
mock_session = MagicMock()
connector.session = mock_session
return connector, mock_session

def _mock_ok_response(self):
mock_resp = MagicMock(spec=requests.Response)
mock_resp.raise_for_status.return_value = None
return mock_resp

def test_router_headers_NOT_sent_for_in_cluster(self):
config = SandboxInClusterConnectionConfig(server_port=8888)
strategy = InClusterConnectionStrategy("my-sb", "my-ns", config)
connector, mock_session = self._make_connector_with_strategy(strategy, config)
mock_session.request.return_value = self._mock_ok_response()

connector.send_request("GET", "/execute")

call_args, call_kwargs = mock_session.request.call_args
sent_headers = call_kwargs.get("headers", {})
self.assertNotIn("X-Sandbox-ID", sent_headers)
self.assertNotIn("X-Sandbox-Namespace", sent_headers)
self.assertNotIn("X-Sandbox-Port", sent_headers)

def test_router_headers_ARE_sent_for_direct(self):
config = SandboxDirectConnectionConfig(api_url="http://router")
strategy = DirectConnectionStrategy(config)
connector, mock_session = self._make_connector_with_strategy(strategy, config)
mock_session.request.return_value = self._mock_ok_response()

connector.send_request("GET", "/execute")

call_args, call_kwargs = mock_session.request.call_args
sent_headers = call_kwargs.get("headers", {})
self.assertIn("X-Sandbox-ID", sent_headers)
self.assertIn("X-Sandbox-Namespace", sent_headers)
self.assertIn("X-Sandbox-Port", sent_headers)

def test_in_cluster_url_is_pod_dns(self):
config = SandboxInClusterConnectionConfig(server_port=8888)
strategy = InClusterConnectionStrategy("my-sb", "my-ns", config)
connector, mock_session = self._make_connector_with_strategy(strategy, config)
mock_session.request.return_value = self._mock_ok_response()

connector.send_request("POST", "execute")

call_args, call_kwargs = mock_session.request.call_args
url = call_args[1]
self.assertEqual(url, "http://my-sb.my-ns.svc.cluster.local:8888/execute")


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
from k8s_agent_sandbox.sandbox_client import SandboxClient
from k8s_agent_sandbox.sandbox import Sandbox
from k8s_agent_sandbox.connector import SandboxConnector
from k8s_agent_sandbox.models import SandboxDirectConnectionConfig
from k8s_agent_sandbox.models import (
SandboxDirectConnectionConfig,
SandboxInClusterConnectionConfig,
SandboxLocalTunnelConnectionConfig,
)
from k8s_agent_sandbox.constants import POD_NAME_ANNOTATION
from k8s_agent_sandbox.exceptions import (
SandboxPortForwardError,
Expand Down Expand Up @@ -486,5 +490,54 @@ def test_wait_for_gateway_ip_skips_none_events(self, mock_watch_cls):
self.assertEqual(ip, "10.0.0.1")


class TestSandboxClientInClusterConfig(unittest.TestCase):
"""Tests that SandboxClient stores and propagates SandboxInClusterConnectionConfig."""

@patch('k8s_agent_sandbox.sandbox_client.K8sHelper')
def test_in_cluster_config_stored(self, _):
config = SandboxInClusterConnectionConfig()
sc = SandboxClient(connection_config=config)
self.assertIsInstance(sc.connection_config, SandboxInClusterConnectionConfig)

@patch('k8s_agent_sandbox.sandbox_client.K8sHelper')
def test_default_config_is_local_tunnel(self, _):
sc = SandboxClient()
self.assertIsInstance(sc.connection_config, SandboxLocalTunnelConnectionConfig)

@patch('k8s_agent_sandbox.sandbox_client.K8sHelper')
def test_in_cluster_config_custom_port(self, _):
config = SandboxInClusterConnectionConfig(server_port=9000)
sc = SandboxClient(connection_config=config)
self.assertEqual(sc.connection_config.server_port, 9000)

@patch('k8s_agent_sandbox.sandbox_client.K8sHelper')
def test_in_cluster_config_default_port(self, _):
config = SandboxInClusterConnectionConfig()
sc = SandboxClient(connection_config=config)
self.assertEqual(sc.connection_config.server_port, 8888)

def _create_sandbox_with_in_cluster_config(self, namespace='default'):
with patch('k8s_agent_sandbox.sandbox_client.K8sHelper'), \
patch('uuid.uuid4') as mock_uuid:
mock_uuid.return_value.hex = 'aabbccdd'
client = SandboxClient(connection_config=SandboxInClusterConnectionConfig())
client.k8s_helper.resolve_sandbox_name.return_value = 'my-sandbox'
mock_sandbox_class = MagicMock()
mock_sandbox_class.return_value = MagicMock()
client.sandbox_class = mock_sandbox_class
with patch.object(client, '_create_claim'), \
patch.object(client, '_wait_for_sandbox_ready'):
client.create_sandbox('my-template', namespace=namespace)
return mock_sandbox_class.call_args.kwargs

def test_sandbox_created_with_in_cluster_config(self):
call_kwargs = self._create_sandbox_with_in_cluster_config()
self.assertIsInstance(call_kwargs['connection_config'], SandboxInClusterConnectionConfig)

def test_sandbox_namespace_passed_correctly(self):
call_kwargs = self._create_sandbox_with_in_cluster_config(namespace='prod')
self.assertEqual(call_kwargs['namespace'], 'prod')


if __name__ == '__main__':
unittest.main()