diff --git a/go/core/internal/a2a/a2a_registrar.go b/go/core/internal/a2a/a2a_registrar.go index f4430e4d3c..22fb8f592c 100644 --- a/go/core/internal/a2a/a2a_registrar.go +++ b/go/core/internal/a2a/a2a_registrar.go @@ -31,6 +31,11 @@ type A2ARegistrar struct { sandboxA2AURL string authenticator auth.AuthProvider a2aBaseOptions []a2aclient.Option + agentObserver AgentObserver +} + +type AgentObserver interface { + NotifyAgentsChanged(ctx context.Context) } var _ manager.Runnable = (*A2ARegistrar)(nil) @@ -45,6 +50,7 @@ func NewA2ARegistrar( streamingMaxBuf int, streamingInitialBuf int, streamingTimeout time.Duration, + agentObserver AgentObserver, ) (*A2ARegistrar, error) { if clientRegistry == nil { return nil, fmt.Errorf("clientRegistry must not be nil") @@ -61,6 +67,7 @@ func NewA2ARegistrar( a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf), debugOpt(), }, + agentObserver: agentObserver, } return reg, nil @@ -102,7 +109,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al } if err := a.upsertAgentHandler(ctx, agent, log); err != nil { log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent)) + return } + a.notifyAgentChange(ctx) }, UpdateFunc: func(oldObj, newObj any) { oldAgent, ok1 := informerAgentObject(oldObj) @@ -110,11 +119,19 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al if !ok1 || !ok2 { return } - if oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent) { + specChanged := oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent) + if specChanged { if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil { log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent)) + return } } + // Also notify when readiness conditions change so subscribers don't + // hold stale agent lists (the resource filter uses Accepted + + // DeploymentReady, which are status conditions, not spec fields). + if specChanged || agentReadinessChanged(oldAgent, newAgent) { + a.notifyAgentChange(ctx) + } }, DeleteFunc: func(obj any) { agent, ok := deletedInformerAgentObject(obj) @@ -125,6 +142,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al a.handlerMux.RemoveAgentHandler(ref) a.clientRegistry.delete(ref) log.V(1).Info("removed A2A handler", "agent", ref) + a.notifyAgentChange(ctx) }, }); err != nil { return fmt.Errorf("failed to add informer event handler for %T: %w", prototype, err) @@ -133,6 +151,33 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al return nil } +func (a *A2ARegistrar) notifyAgentChange(ctx context.Context) { + if a.agentObserver != nil { + a.agentObserver.NotifyAgentsChanged(ctx) + } +} + +func agentReadinessChanged(oldAgent, newAgent v1alpha2.AgentObject) bool { + return isAgentReady(oldAgent) != isAgentReady(newAgent) +} + +func isAgentReady(agent v1alpha2.AgentObject) bool { + status := agent.GetAgentStatus() + if status == nil { + return false + } + deploymentReady, accepted := false, false + for _, c := range status.Conditions { + if c.Type == "Ready" && c.Reason == "DeploymentReady" && string(c.Status) == "True" { + deploymentReady = true + } + if c.Type == "Accepted" && string(c.Status) == "True" { + accepted = true + } + } + return deploymentReady && accepted +} + func sameAgentSpec(oldAgent, newAgent v1alpha2.AgentObject) bool { oldSpec := oldAgent.GetAgentSpec() newSpec := newAgent.GetAgentSpec() diff --git a/go/core/internal/mcp/mcp_handler.go b/go/core/internal/mcp/mcp_handler.go index c6ed5f5cb3..c8681646be 100644 --- a/go/core/internal/mcp/mcp_handler.go +++ b/go/core/internal/mcp/mcp_handler.go @@ -2,6 +2,7 @@ package mcp import ( "context" + "encoding/json" "fmt" "net/http" "strings" @@ -40,7 +41,7 @@ type AgentSummary struct { } type InvokeAgentInput struct { - Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name"` + Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name. To find a list of available sources, use the 'agents' resource."` Task string `json:"task" jsonschema:"Task to run"` ContextID string `json:"context_id,omitempty" jsonschema:"Optional A2A context ID to continue a conversation"` } @@ -65,7 +66,12 @@ func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegist Name: "kagent-agents", Version: version.Version, } - server := mcpsdk.NewServer(impl, nil) + server := mcpsdk.NewServer(impl, &mcpsdk.ServerOptions{ + // No-op handlers enable subscription tracking in the SDK; actual + // notifications are sent via NotifyAgentsChanged. + SubscribeHandler: func(context.Context, *mcpsdk.SubscribeRequest) error { return nil }, + UnsubscribeHandler: func(context.Context, *mcpsdk.UnsubscribeRequest) error { return nil }, + }) handler.server = server // Add list_agents tool. @@ -97,6 +103,17 @@ func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegist handler.handleInvokeAgent, ) + // Add agents resource for clients that pre-populate context + server.AddResource( + &mcpsdk.Resource{ + URI: "kagent://agents", + Name: "agents", + Description: "List of invokable kagent agents (accepted + deploymentReady)", + MIMEType: "application/json", + }, + handler.readAgentsResource, + ) + // Create HTTP handler var httpOpts *mcpsdk.StreamableHTTPOptions if env.KagentMCPStateless.Get() { @@ -112,23 +129,14 @@ func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegist return handler, nil } -// handleListAgents handles the list_agents MCP tool -func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { - log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") - +// listReadyAgents returns agents that are accepted and deployment-ready. +func (h *MCPHandler) listReadyAgents(ctx context.Context) ([]AgentSummary, error) { agentList := &v1alpha2.AgentList{} if err := h.kubeClient.List(ctx, agentList); err != nil { - return &mcpsdk.CallToolResult{ - Content: []mcpsdk.Content{ - &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, - }, - IsError: true, - }, ListAgentsOutput{}, nil + return nil, err } - - agents := make([]AgentSummary, 0) + agents := make([]AgentSummary, 0, len(agentList.Items)) for _, agent := range agentList.Items { - // Check if agent is accepted and deployment ready deploymentReady := false accepted := false for _, condition := range agent.Status.Conditions { @@ -139,18 +147,30 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR accepted = true } } - if !accepted || !deploymentReady { continue } - - ref := agent.Namespace + "/" + agent.Name - description := agent.Spec.Description agents = append(agents, AgentSummary{ - Ref: ref, - Description: description, + Ref: agent.Namespace + "/" + agent.Name, + Description: agent.Spec.Description, }) } + return agents, nil +} + +// handleListAgents handles the list_agents MCP tool +func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { + log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") + + agents, err := h.listReadyAgents(ctx) + if err != nil { + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, + }, + IsError: true, + }, ListAgentsOutput{}, nil + } log.Info("Listed agents", "count", len(agents)) @@ -179,6 +199,36 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR }, output, nil } +// readAgentsResource handles reads of the kagent://agents resource. +func (h *MCPHandler) readAgentsResource(ctx context.Context, req *mcpsdk.ReadResourceRequest) (*mcpsdk.ReadResourceResult, error) { + agents, err := h.listReadyAgents(ctx) + if err != nil { + return nil, fmt.Errorf("listing agents: %w", err) + } + data, err := json.Marshal(agents) + if err != nil { + return nil, fmt.Errorf("marshaling agents: %w", err) + } + return &mcpsdk.ReadResourceResult{ + Contents: []*mcpsdk.ResourceContents{{ + URI: "kagent://agents", + MIMEType: "application/json", + Text: string(data), + }}, + }, nil +} + +// NotifyAgentsChanged sends a resources/updated notification for kagent://agents +// to all subscribed clients. Called by A2ARegistrar when agents are added, updated, +// or removed. +func (h *MCPHandler) NotifyAgentsChanged(ctx context.Context) { + if err := h.server.ResourceUpdated(ctx, &mcpsdk.ResourceUpdatedNotificationParams{ + URI: "kagent://agents", + }); err != nil { + ctrllog.FromContext(ctx).WithName("mcp-handler").Error(err, "failed to send resource updated notification") + } +} + // handleInvokeAgent handles the invoke_agent MCP tool func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) { log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") diff --git a/go/core/pkg/app/app.go b/go/core/pkg/app/app.go index ddad07d546..148dbf86a7 100644 --- a/go/core/pkg/app/app.go +++ b/go/core/pkg/app/app.go @@ -611,9 +611,22 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne os.Exit(1) } + clientRegistry := a2a.NewAgentClientRegistry() + + // Create MCP handler that invokes agents directly via their A2A clients, + // bypassing the controller's own HTTP A2A listener. + mcpHandler, err := mcp.NewMCPHandler( + mgr.GetClient(), + clientRegistry, + extensionCfg.Authenticator, + ) + if err != nil { + setupLog.Error(err, "unable to create MCP handler") + os.Exit(1) + } + // Register A2A handlers on all replicas a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A, httpserver.APIPathA2ASandboxes, extensionCfg.Authenticator) - clientRegistry := a2a.NewAgentClientRegistry() a2aRegistrar, err := a2a.NewA2ARegistrar( mgr.GetCache(), a2aHandler, @@ -624,6 +637,7 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne int(cfg.Streaming.MaxBufSize.Value()), int(cfg.Streaming.InitialBufSize.Value()), cfg.Streaming.Timeout, + mcpHandler, ) if err != nil { setupLog.Error(err, "unable to create a2a registrar") @@ -634,18 +648,6 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne os.Exit(1) } - // Create MCP handler that invokes agents directly via their A2A clients, - // bypassing the controller's own HTTP A2A listener. - mcpHandler, err := mcp.NewMCPHandler( - mgr.GetClient(), - clientRegistry, - extensionCfg.Authenticator, - ) - if err != nil { - setupLog.Error(err, "unable to create MCP handler") - os.Exit(1) - } - // +kubebuilder:scaffold:builder if metricsCertWatcher != nil { setupLog.Info("Adding metrics certificate watcher to manager") diff --git a/go/core/test/e2e/invoke_mcp_test.go b/go/core/test/e2e/invoke_mcp_test.go index 3bc5e2ddfd..19d39718c5 100644 --- a/go/core/test/e2e/invoke_mcp_test.go +++ b/go/core/test/e2e/invoke_mcp_test.go @@ -160,6 +160,101 @@ func TestE2EMCPEndpointInvokeAgent(t *testing.T) { require.True(t, foundText, "Should have text content containing 'kebab' in response") } +// TestE2EMCPAgentsResourceList verifies the kagent://agents resource is advertised by the server. +func TestE2EMCPAgentsResourceList(t *testing.T) { + ctx := context.Background() + session := setupMCPClient(t) + + result, err := session.ListResources(ctx, &mcp.ListResourcesParams{}) + require.NoError(t, err, "Should list resources") + + found := false + for _, r := range result.Resources { + if r.URI == "kagent://agents" { + found = true + require.Equal(t, "application/json", r.MIMEType, "kagent://agents should have JSON MIME type") + break + } + } + require.True(t, found, "kagent://agents resource not found in resources/list response") +} + +// TestE2EMCPAgentsResourceRead verifies reading kagent://agents returns a JSON array containing the deployed kebab-agent. +func TestE2EMCPAgentsResourceRead(t *testing.T) { + ctx := context.Background() + session := setupMCPClient(t) + + result, err := session.ReadResource(ctx, &mcp.ReadResourceParams{ + URI: "kagent://agents", + }) + require.NoError(t, err, "Should read kagent://agents resource") + require.NotEmpty(t, result.Contents, "Resource contents should not be empty") + require.Equal(t, "application/json", result.Contents[0].MIMEType, "Content should be JSON") + + var agents []struct { + Ref string `json:"ref"` + Description string `json:"description,omitempty"` + } + require.NoError(t, json.Unmarshal([]byte(result.Contents[0].Text), &agents), "Contents should be a valid JSON array") + + found := false + for _, a := range agents { + if a.Ref == "kagent/kebab-agent" { + found = true + break + } + } + require.True(t, found, "kagent/kebab-agent should appear in kagent://agents resource") +} + +// TestE2EMCPAgentsResourceNotification verifies that creating an agent triggers a resources/updated +// notification for kagent://agents on subscribed MCP clients. +func TestE2EMCPAgentsResourceNotification(t *testing.T) { + notified := make(chan string, 1) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + transport := &mcp.StreamableClientTransport{Endpoint: mcpEndpointURL()} + impl := &mcp.Implementation{Name: "e2e-test-notifications", Version: "0.0.0"} + mcpClient := mcp.NewClient(impl, &mcp.ClientOptions{ + ResourceUpdatedHandler: func(_ context.Context, req *mcp.ResourceUpdatedNotificationRequest) { + select { + case notified <- req.Params.URI: + default: + } + }, + }) + + session, err := mcpClient.Connect(ctx, transport, nil) + require.NoError(t, err, "Should connect MCP client with notification handler") + t.Cleanup(func() { session.Close() }) + + // Subscribe before triggering the change so we don't miss the notification. + err = session.Subscribe(ctx, &mcp.SubscribeParams{URI: "kagent://agents"}) + require.NoError(t, err, "Should subscribe to kagent://agents") + + // Create a new agent — the A2ARegistrar informer fires immediately on object + // creation and calls NotifyAgentsChanged, which pushes the notification. + k8sClient := setupK8sClient(t, false) + mockURL, stopMock := setupMockServer(t, "mocks/invoke_inline_agent.json") + defer stopMock() + + modelCfg := setupModelConfig(t, k8sClient, mockURL) + agent := generateAgent(modelCfg.Name, nil, AgentOptions{Name: "mcp-notify-test"}) + require.NoError(t, k8sClient.Create(t.Context(), agent), "Should create agent") + t.Cleanup(func() { + _ = k8sClient.Delete(context.Background(), agent) + }) + + select { + case uri := <-notified: + require.Equal(t, "kagent://agents", uri, "Notification URI should match subscribed resource") + case <-time.After(30 * time.Second): + t.Fatal("Did not receive resources/updated notification for kagent://agents within 30s") + } +} + // TestE2EMCPEndpointErrorHandling tests error handling in the MCP endpoint func TestE2EMCPEndpointErrorHandling(t *testing.T) { ctx := context.Background()