Skip to content

Commit 496c320

Browse files
onematchfoxclaude
andcommitted
refactor(controller): invoke agents directly in MCP handler
Replace the HTTP round-trip through the controller's own A2A listener with direct invocation via a new `AgentClientRegistry`. The registry is owned by `A2ARegistrar`, which already maintains an `A2AClient` per agent for its HTTP mux — the registry gives the MCP handler access to those same clients without an extra network hop. The old approach routed through the controller's public A2A endpoint, meaning requests could traverse the external network (and any ingress or load-balancer in front of it) unnecessarily. The new path stays in-process. The old handler also cached its own `A2AClient` per agent in a `sync.Map` with no eviction, so clients for deleted agents would remain indefinitely. The registry is kept consistent by the registrar's add/update/delete lifecycle, eliminating that staleness. `A2ARegistrar.upsertAgentHandler` writes to both the HTTP mux (for inbound /api/a2a/<ns>/<name>/ routing) and the registry (for direct invocation). The registry is exposed via `ClientRegistry()` and passed to `NewMCPHandler` in app.go. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com>
1 parent 164d194 commit 496c320

5 files changed

Lines changed: 115 additions & 86 deletions

File tree

go/core/internal/a2a/a2a_registrar.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
type A2ARegistrar struct {
2727
cache crcache.Cache
2828
handlerMux A2AHandlerMux
29+
clientRegistry *AgentClientRegistry
2930
a2aBaseURL string
3031
sandboxA2AURL string
3132
authenticator auth.AuthProvider
@@ -45,11 +46,12 @@ func NewA2ARegistrar(
4546
streamingTimeout time.Duration,
4647
) *A2ARegistrar {
4748
reg := &A2ARegistrar{
48-
cache: cache,
49-
handlerMux: mux,
50-
a2aBaseURL: a2aBaseUrl,
51-
sandboxA2AURL: sandboxA2ABaseURL,
52-
authenticator: authenticator,
49+
cache: cache,
50+
handlerMux: mux,
51+
clientRegistry: NewAgentClientRegistry(),
52+
a2aBaseURL: a2aBaseUrl,
53+
sandboxA2AURL: sandboxA2ABaseURL,
54+
authenticator: authenticator,
5355
a2aBaseOptions: []a2aclient.Option{
5456
a2aclient.WithTimeout(streamingTimeout),
5557
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
@@ -60,6 +62,12 @@ func NewA2ARegistrar(
6062
return reg
6163
}
6264

65+
// ClientRegistry returns the registry of A2A clients for direct agent
66+
// invocation, populated as agents are registered and deregistered.
67+
func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry {
68+
return a.clientRegistry
69+
}
70+
6371
func (a *A2ARegistrar) NeedLeaderElection() bool {
6472
return false
6573
}
@@ -117,6 +125,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
117125
}
118126
ref := a2aRouteKey(agent)
119127
a.handlerMux.RemoveAgentHandler(ref)
128+
a.clientRegistry.delete(ref)
120129
log.V(1).Info("removed A2A handler", "agent", ref)
121130
},
122131
}); err != nil {
@@ -182,10 +191,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag
182191
cardCopy := *card
183192
cardCopy.URL = a.a2aRouteURL(agent)
184193

185-
if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
194+
routeRef := a2aRouteKey(agent)
195+
if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
186196
return fmt.Errorf("set handler for %s: %w", agentRef, err)
187197
}
188198

199+
a.clientRegistry.set(routeRef, client)
200+
189201
log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
190202
return nil
191203
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package a2a
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
9+
"trpc.group/trpc-go/trpc-a2a-go/protocol"
10+
)
11+
12+
// AgentClientRegistry maps agent route keys to their A2A clients.
13+
// The A2ARegistrar populates it; the MCP handler reads from it to invoke
14+
// agents without an HTTP round trip through the controller's own A2A listener.
15+
type AgentClientRegistry struct {
16+
mu sync.RWMutex
17+
clients map[string]*a2aclient.A2AClient
18+
}
19+
20+
func NewAgentClientRegistry() *AgentClientRegistry {
21+
return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)}
22+
}
23+
24+
// set stores the client under the agent's route key (e.g. "namespace/name" or
25+
// "sandboxes/namespace/name").
26+
func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) {
27+
r.mu.Lock()
28+
defer r.mu.Unlock()
29+
r.clients[agentRef] = c
30+
}
31+
32+
// delete removes the client for the given agent route key.
33+
func (r *AgentClientRegistry) delete(agentRef string) {
34+
r.mu.Lock()
35+
defer r.mu.Unlock()
36+
delete(r.clients, agentRef)
37+
}
38+
39+
// Register adds or replaces the A2A client for the given agent. It is the
40+
// exported counterpart of set, intended for use in tests and explicit
41+
// registrations outside the A2ARegistrar lifecycle.
42+
func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) {
43+
r.set(namespace+"/"+name, c)
44+
}
45+
46+
// SendMessage invokes an agent directly via its cached A2A client.
47+
// namespace and name must identify a non-sandbox agent; sandbox agents use a
48+
// different route key and are not yet reachable via this method.
49+
func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) {
50+
key := namespace + "/" + name
51+
r.mu.RLock()
52+
c, ok := r.clients[key]
53+
r.mu.RUnlock()
54+
if !ok {
55+
return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name)
56+
}
57+
return c.SendMessage(ctx, params)
58+
}

go/core/internal/mcp/mcp_handler.go

Lines changed: 8 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,24 @@ import (
55
"fmt"
66
"net/http"
77
"strings"
8-
"sync"
9-
"time"
108

119
"github.com/kagent-dev/kagent/go/api/v1alpha2"
1210
"github.com/kagent-dev/kagent/go/core/internal/a2a"
13-
authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth"
1411
"github.com/kagent-dev/kagent/go/core/internal/version"
1512
"github.com/kagent-dev/kagent/go/core/pkg/auth"
1613
mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp"
17-
"k8s.io/apimachinery/pkg/types"
1814
"sigs.k8s.io/controller-runtime/pkg/client"
1915
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
20-
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
2116
"trpc.group/trpc-go/trpc-a2a-go/protocol"
2217
)
2318

2419
// MCPHandler handles MCP requests and bridges them to A2A endpoints
2520
type MCPHandler struct {
2621
kubeClient client.Client
27-
a2aBaseURL string
28-
a2aTimeout time.Duration
22+
agentClients *a2a.AgentClientRegistry
2923
authenticator auth.AuthProvider
3024
httpHandler *mcpsdk.StreamableHTTPHandler
3125
server *mcpsdk.Server
32-
a2aClients sync.Map
3326
}
3427

3528
// Input types for MCP tools
@@ -56,20 +49,12 @@ type InvokeAgentOutput struct {
5649
ContextID string `json:"context_id,omitempty"`
5750
}
5851

59-
// defaultA2ATimeout is the fallback timeout for A2A client calls and should match
60-
// the configured default streaming timeout.
61-
const defaultA2ATimeout = 10 * time.Minute
62-
63-
// NewMCPHandler creates a new MCP handler
64-
// Wraps the StreamableHTTPHandler and adds A2A bridging and context management.
65-
func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider, a2aTimeout time.Duration) (*MCPHandler, error) {
66-
if a2aTimeout <= 0 {
67-
a2aTimeout = defaultA2ATimeout
68-
}
52+
// NewMCPHandler creates a new MCP handler that bridges MCP tool calls directly
53+
// to agent A2A clients, bypassing the controller's own HTTP A2A listener.
54+
func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegistry, authenticator auth.AuthProvider) (*MCPHandler, error) {
6955
handler := &MCPHandler{
7056
kubeClient: kubeClient,
71-
a2aBaseURL: a2aBaseURL,
72-
a2aTimeout: a2aTimeout,
57+
agentClients: agentClients,
7358
authenticator: authenticator,
7459
}
7560

@@ -204,7 +189,6 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool
204189
}, InvokeAgentOutput{}, nil
205190
}
206191
agentRef := agentNS + "/" + agentName
207-
agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName}
208192

209193
// Get context ID from client request (stateless mode)
210194
// If not provided, contextIDPtr will be nil and a new conversation will start
@@ -214,47 +198,9 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool
214198
log.V(1).Info("Using context_id from client request", "context_id", input.ContextID)
215199
}
216200

217-
// Get or create cached A2A client for this agent
218-
a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef)
219-
var a2aClient *a2aclient.A2AClient
220-
221-
if cached, ok := h.a2aClients.Load(agentRef); ok {
222-
if client, ok := cached.(*a2aclient.A2AClient); ok {
223-
a2aClient = client
224-
}
225-
}
226-
227-
// Create new client if not cached
228-
if a2aClient == nil {
229-
// Build A2A client options with authentication propagation
230-
a2aOpts := []a2aclient.Option{
231-
a2aclient.WithTimeout(h.a2aTimeout),
232-
a2aclient.WithHTTPReqHandler(
233-
authimpl.A2ARequestHandler(
234-
h.authenticator,
235-
agentNns,
236-
),
237-
),
238-
}
239-
240-
newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...)
241-
if err != nil {
242-
log.Error(err, "Failed to create A2A client", "agent", agentRef)
243-
return &mcpsdk.CallToolResult{
244-
Content: []mcpsdk.Content{
245-
&mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)},
246-
},
247-
IsError: true,
248-
}, InvokeAgentOutput{}, nil
249-
}
250-
251-
// Cache the client
252-
h.a2aClients.Store(agentRef, newClient)
253-
a2aClient = newClient
254-
}
255-
256-
// Send message via A2A
257-
result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{
201+
// Send message directly via the agent's A2A client, bypassing the
202+
// controller's own HTTP A2A listener.
203+
result, err := h.agentClients.SendMessage(ctx, agentNS, agentName, protocol.SendMessageParams{
258204
Message: protocol.Message{
259205
Kind: protocol.KindMessage,
260206
Role: protocol.MessageRoleUser,

go/core/internal/mcp/mcp_handler_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@ import (
88
"net/url"
99
"sync"
1010
"testing"
11-
"time"
1211

12+
"github.com/kagent-dev/kagent/go/core/internal/a2a"
13+
authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth"
1314
"github.com/kagent-dev/kagent/go/core/pkg/auth"
1415
mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp"
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
18+
"k8s.io/apimachinery/pkg/types"
19+
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
1720
)
1821

1922
// fakeSession is a minimal auth.Session for testing.
@@ -89,31 +92,41 @@ func (a *authRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
8992
return a.base.RoundTrip(r)
9093
}
9194

95+
// newTestRegistry builds an AgentClientRegistry with a single agent pre-registered,
96+
// wired to send A2A requests to backendURL and propagate auth via authProvider.
97+
func newTestRegistry(t *testing.T, namespace, name string, backendURL string, authProvider auth.AuthProvider) *a2a.AgentClientRegistry {
98+
t.Helper()
99+
agentRef := types.NamespacedName{Namespace: namespace, Name: name}
100+
c, err := a2aclient.NewA2AClient(
101+
backendURL+"/"+namespace+"/"+name+"/",
102+
a2aclient.WithHTTPReqHandler(authimpl.A2ARequestHandler(authProvider, agentRef)),
103+
)
104+
require.NoError(t, err)
105+
106+
registry := a2a.NewAgentClientRegistry()
107+
registry.Register(namespace, name, c)
108+
return registry
109+
}
110+
92111
// TestInvokeAgent_AuthPropagation exercises the full MCP HTTP stack:
93112
// the MCP client sends a request with an Authorization header, the handler
94113
// recovers the auth session from RequestExtra, and the A2A backend receives
95114
// the token produced by UpstreamAuth.
96115
func TestInvokeAgent_AuthPropagation(t *testing.T) {
97-
// Fake A2A backend — records the Authorization header it receives.
98116
backend := newA2ABackend(t)
99-
100117
authProvider := &fakeAuthProvider{session: &fakeSession{}}
101118

102-
// Real MCP handler (kubeClient is nil; invoke_agent does not use it).
103-
mcpHandler, err := NewMCPHandler(nil, backend.server.URL, authProvider, 5*time.Second)
119+
registry := newTestRegistry(t, "default", "test-agent", backend.server.URL, authProvider)
120+
mcpHandler, err := NewMCPHandler(nil, registry, authProvider)
104121
require.NoError(t, err)
105122

106123
mcpServer := httptest.NewServer(mcpHandler)
107124
t.Cleanup(mcpServer.Close)
108125

109-
// MCP client whose HTTP transport injects an Authorization header on every request.
110126
transport := &mcpsdk.StreamableClientTransport{
111127
Endpoint: mcpServer.URL,
112128
HTTPClient: &http.Client{
113-
Transport: &authRoundTripper{
114-
base: http.DefaultTransport,
115-
token: "test-token",
116-
},
129+
Transport: &authRoundTripper{base: http.DefaultTransport, token: "test-token"},
117130
},
118131
DisableStandaloneSSE: true,
119132
}
@@ -141,10 +154,10 @@ func TestInvokeAgent_AuthPropagation(t *testing.T) {
141154
// propagated to the A2A backend.
142155
func TestInvokeAgent_NoAuthPropagationWithoutHeader(t *testing.T) {
143156
backend := newA2ABackend(t)
144-
145157
authProvider := &fakeAuthProvider{session: &fakeSession{}}
146158

147-
mcpHandler, err := NewMCPHandler(nil, backend.server.URL, authProvider, 5*time.Second)
159+
registry := newTestRegistry(t, "default", "test-agent", backend.server.URL, authProvider)
160+
mcpHandler, err := NewMCPHandler(nil, registry, authProvider)
148161
require.NoError(t, err)
149162

150163
mcpServer := httptest.NewServer(mcpHandler)

go/core/pkg/app/app.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -613,8 +613,7 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne
613613

614614
// Register A2A handlers on all replicas
615615
a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A, httpserver.APIPathA2ASandboxes, extensionCfg.Authenticator)
616-
617-
if err := mgr.Add(a2a.NewA2ARegistrar(
616+
a2aRegistrar := a2a.NewA2ARegistrar(
618617
mgr.GetCache(),
619618
a2aHandler,
620619
cfg.A2ABaseUrl+httpserver.APIPathA2A,
@@ -623,17 +622,18 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne
623622
int(cfg.Streaming.MaxBufSize.Value()),
624623
int(cfg.Streaming.InitialBufSize.Value()),
625624
cfg.Streaming.Timeout,
626-
)); err != nil {
625+
)
626+
if err := mgr.Add(a2aRegistrar); err != nil {
627627
setupLog.Error(err, "unable to set up a2a registrar")
628628
os.Exit(1)
629629
}
630630

631-
// Create MCP handler that bridges to A2A
631+
// Create MCP handler that invokes agents directly via their A2A clients,
632+
// bypassing the controller's own HTTP A2A listener.
632633
mcpHandler, err := mcp.NewMCPHandler(
633634
mgr.GetClient(),
634-
cfg.A2ABaseUrl+httpserver.APIPathA2A,
635+
a2aRegistrar.ClientRegistry(),
635636
extensionCfg.Authenticator,
636-
cfg.Streaming.Timeout,
637637
)
638638
if err != nil {
639639
setupLog.Error(err, "unable to create MCP handler")

0 commit comments

Comments
 (0)