diff --git a/cmd/gateway_consumer.go b/cmd/gateway_consumer.go index a4dbb88ea..9c14cb1d5 100644 --- a/cmd/gateway_consumer.go +++ b/cmd/gateway_consumer.go @@ -13,6 +13,7 @@ import ( "github.com/nextlevelbuilder/goclaw/internal/agent" "github.com/nextlevelbuilder/goclaw/internal/bus" "github.com/nextlevelbuilder/goclaw/internal/channels" + "github.com/nextlevelbuilder/goclaw/internal/channels/telegram/voiceguard" "github.com/nextlevelbuilder/goclaw/internal/config" "github.com/nextlevelbuilder/goclaw/internal/scheduler" "github.com/nextlevelbuilder/goclaw/internal/sessions" @@ -151,6 +152,18 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents "- Keep responses concise and focused; long replies are disruptive in groups.\n" + "- Address the group naturally. If the history shows a multi-person conversation, consider the full context before answering." } + if agentID == cfg.Channels.Telegram.Voice.AgentID && cfg.Channels.Telegram.Voice.AgentID != "" && peerKind == string(sessions.PeerDirect) { + if tmpl := cfg.Channels.Telegram.Voice.DMContextTemplate; tmpl != "" { + // Substitute {user_id} — the only runtime value the gateway knows. + // All other deployment-specific values (e.g. tenant_id) are baked into the template. + voiceCtx := strings.ReplaceAll(tmpl, "{user_id}", userID) + if extraPrompt != "" { + extraPrompt += "\n\n" + voiceCtx + } else { + extraPrompt = voiceCtx + } + } + } // Delegation announces carry media as ForwardMedia (not deleted, forwarded to output). // User-uploaded media goes in Media (loaded as images for LLM, then deleted). @@ -193,7 +206,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents } // Handle result asynchronously to not block the flush callback. - go func(channel, chatID, session, rID string, meta map[string]string) { + go func(channel, chatID, session, rID, agentKey, peer string, originalContent string, meta map[string]string) { outcome := <-outCh // Clean up run tracking (in case HandleAgentEvent didn't fire for terminal events) @@ -241,11 +254,14 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents return } + replyContent := outcome.Result.Content + replyContent = voiceguard.SanitizeReply(cfg.Channels.Telegram.Voice.AgentID, agentKey, channel, peer, originalContent, replyContent, cfg.Channels.Telegram.Voice) + // Publish response back to the channel outMsg := bus.OutboundMessage{ Channel: channel, ChatID: chatID, - Content: outcome.Result.Content, + Content: replyContent, Metadata: meta, } @@ -264,7 +280,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents } msgBus.PublishOutbound(outMsg) - }(msg.Channel, msg.ChatID, sessionKey, runID, outMeta) + }(msg.Channel, msg.ChatID, sessionKey, runID, agentID, peerKind, msg.Content, outMeta) } // Inbound debounce: merge rapid messages from the same sender before processing. @@ -668,6 +684,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents } } + // resolveAgentRoute determines which agent should handle a message // based on config bindings. Priority: peer → channel → default. // Matching TS resolve-route.ts binding resolution. diff --git a/cmd/gateway_consumer_audio_sanitize_test.go b/cmd/gateway_consumer_audio_sanitize_test.go new file mode 100644 index 000000000..f709ea50f --- /dev/null +++ b/cmd/gateway_consumer_audio_sanitize_test.go @@ -0,0 +1,331 @@ +package cmd + +import ( + "strings" + "testing" + + "github.com/nextlevelbuilder/goclaw/internal/config" + "github.com/nextlevelbuilder/goclaw/internal/sessions" +) + +// --------------------------------------------------------------------------- +// sanitizeVoiceAgentReply +// --------------------------------------------------------------------------- + +// newTgCfg is a helper that builds a minimal TelegramConfig for testing. +// voiceAgentID is the value of VoiceAgentID on the channel config. +// Optionally pass custom fallback strings (empty = use built-in defaults). +func newTgCfg(voiceAgentID, fallbackTranscript, fallbackNoTranscript string) config.TelegramConfig { + return config.TelegramConfig{ + VoiceAgentID: voiceAgentID, + AudioGuardFallbackTranscript: fallbackTranscript, + AudioGuardFallbackNoTranscript: fallbackNoTranscript, + } +} + +const ( + testVoiceAgent = "my-voice-agent" + dmPeer = string(sessions.PeerDirect) +) + +// TestSanitize_PassThrough_WrongAgent verifies that when the agentID does not match +// the configured VoiceAgentID, the reply is returned unchanged. +func TestSanitize_PassThrough_WrongAgent(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := "" + reply := "system error occurred" + got := sanitizeVoiceAgentReply(testVoiceAgent, "other-agent", "telegram", dmPeer, inbound, reply, tgCfg) + if got != reply { + t.Errorf("expected passthrough, got %q", got) + } +} + +// TestSanitize_PassThrough_EmptyVoiceAgentID verifies that when VoiceAgentID is empty +// (feature not configured), all replies pass through untouched. +func TestSanitize_PassThrough_EmptyVoiceAgentID(t *testing.T) { + tgCfg := newTgCfg("", "", "") + inbound := "" + reply := "exit status 1" + got := sanitizeVoiceAgentReply("", testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + if got != reply { + t.Errorf("expected passthrough when VoiceAgentID empty, got %q", got) + } +} + +// TestSanitize_PassThrough_NonTelegram verifies that non-Telegram channels are not guarded. +func TestSanitize_PassThrough_NonTelegram(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := "" + reply := "rate limit exceeded" + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "discord", dmPeer, inbound, reply, tgCfg) + if got != reply { + t.Errorf("expected passthrough for non-telegram channel, got %q", got) + } +} + +// TestSanitize_PassThrough_GroupChat verifies that group chat replies are not guarded. +func TestSanitize_PassThrough_GroupChat(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := "" + reply := "system error occurred" + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", string(sessions.PeerGroup), inbound, reply, tgCfg) + if got != reply { + t.Errorf("expected passthrough for group chat, got %q", got) + } +} + +// TestSanitize_PassThrough_NoAudioTag verifies that text-only inbound messages are not guarded. +func TestSanitize_PassThrough_NoAudioTag(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := "just a regular text message" + reply := "system error occurred" + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + if got != reply { + t.Errorf("expected passthrough when no audio tag in inbound, got %q", got) + } +} + +// TestSanitize_PassThrough_CleanReply verifies that a clean (non-error) reply is not rewritten. +func TestSanitize_PassThrough_CleanReply(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := "" + reply := "Great job! Your pronunciation is improving." + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + if got != reply { + t.Errorf("expected clean reply passthrough, got %q", got) + } +} + +// TestSanitize_ErrorWithTranscript_DefaultFallback verifies that when a transcript is available +// and no custom fallback is configured, the built-in English message is used. +func TestSanitize_ErrorWithTranscript_DefaultFallback(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := `I usually wake up at seven` + reply := "system error: tool execution failed" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + + // The default fallback must contain the transcript text. + if !contains(got, "I usually wake up at seven") { + t.Errorf("expected transcript in fallback, got: %q", got) + } + // Must not contain the original technical error. + if contains(got, "system error") { + t.Errorf("technical error leaked into fallback: %q", got) + } +} + +// TestSanitize_ErrorWithTranscript_CustomFallback verifies that a custom fallback template +// from TelegramConfig is used when set. +func TestSanitize_ErrorWithTranscript_CustomFallback(t *testing.T) { + customTpl := "Transcript received: %s. Please send again!" + tgCfg := newTgCfg(testVoiceAgent, customTpl, "") + inbound := `hello world` + reply := "rate limit exceeded" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + want := "Transcript received: hello world. Please send again!" + if got != want { + t.Errorf("expected %q, got %q", want, got) + } +} + +// TestSanitize_ErrorNoTranscript_DefaultFallback verifies the no-transcript default path. +func TestSanitize_ErrorNoTranscript_DefaultFallback(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := "" // no transcript block + reply := "exit status 1" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + + // Must not contain the original technical error. + if contains(got, "exit status") { + t.Errorf("technical error leaked into fallback: %q", got) + } + // Must be non-empty. + if got == "" { + t.Error("expected non-empty fallback, got empty string") + } +} + +// TestSanitize_ErrorNoTranscript_CustomFallback verifies the no-transcript custom message path. +func TestSanitize_ErrorNoTranscript_CustomFallback(t *testing.T) { + custom := "Sorry, please resend your voice note." + tgCfg := newTgCfg(testVoiceAgent, "", custom) + inbound := "" + reply := "tool error: service unavailable" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + if got != custom { + t.Errorf("expected custom no-transcript fallback %q, got %q", custom, got) + } +} + +// TestSanitize_MediaAudioTag verifies that is treated the same as . +func TestSanitize_MediaAudioTag(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + inbound := `good morning` + reply := "rate limit: too many requests" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + if contains(got, "rate limit") { + t.Errorf("technical error leaked: %q", got) + } + if !contains(got, "good morning") { + t.Errorf("expected transcript in fallback, got: %q", got) + } +} + +// TestSanitize_ErrorWithTranscript_CustomFallbackNoPlaceholder verifies that a +// custom fallback template WITHOUT a %s placeholder does NOT produce +// "%!(EXTRA string=...)" garbage. The transcript is silently omitted but the +// student receives a clean message. +func TestSanitize_ErrorWithTranscript_CustomFallbackNoPlaceholder(t *testing.T) { + // Operator set a clean message with no %s — common mistake. + customTpl := "Please resend your voice note, there was a small hiccup!" + tgCfg := newTgCfg(testVoiceAgent, customTpl, "") + inbound := `hello world` + reply := "system error: tool execution failed" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + + // Must be exactly the template string — no %!(EXTRA...) suffix appended. + if got != customTpl { + t.Errorf("expected clean fallback %q, got %q", customTpl, got) + } + // Explicit check: fmt.Sprintf leakage looks like "%!(EXTRA". + if strings.Contains(got, "%!") { + t.Errorf("fmt.Sprintf garbage leaked into output: %q", got) + } +} + +// TestSanitize_ErrorWithTranscript_CustomFallbackWithPlaceholder verifies that +// a custom template WITH %s correctly inlines the transcript. +func TestSanitize_ErrorWithTranscript_CustomFallbackWithPlaceholder(t *testing.T) { + customTpl := `Mình nghe: "%s" — gửi lại nhé!` + tgCfg := newTgCfg(testVoiceAgent, customTpl, "") + inbound := `xin chào` + reply := "exit status 1" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + want := `Mình nghe: "xin chào" — gửi lại nhé!` + if got != want { + t.Errorf("expected %q, got %q", want, got) + } +} + +// TestSanitize_ErrorWithTranscript_DefaultFallbackNoFmtGarbage verifies that +// the built-in default also inlines transcript cleanly after switching from +// fmt.Sprintf to strings.ReplaceAll. +func TestSanitize_ErrorWithTranscript_DefaultFallbackNoFmtGarbage(t *testing.T) { + tgCfg := newTgCfg(testVoiceAgent, "", "") + transcript := "I wake up at 7am every day" + inbound := "" + transcript + "" + reply := "tool error: evaluation failed" + + got := sanitizeVoiceAgentReply(testVoiceAgent, testVoiceAgent, "telegram", dmPeer, inbound, reply, tgCfg) + + if strings.Contains(got, "%!") { + t.Errorf("fmt.Sprintf garbage in default fallback: %q", got) + } + if !strings.Contains(got, transcript) { + t.Errorf("expected transcript %q in fallback, got: %q", transcript, got) + } +} + +// --------------------------------------------------------------------------- +// containsTechnicalErrorLanguage +// --------------------------------------------------------------------------- + +func TestContainsTechnicalError_Positives(t *testing.T) { + cases := []string{ + "vấn đề kỹ thuật xảy ra", + "lỗi hệ thống", + "vấn đề hệ thống", + "technical issue detected", + "system error: something broke", + "exit status 1", + "rate limit exceeded", + "api rate limit hit", + "tool error: execution failed", + // mixed case + "SYSTEM ERROR occurred", + "Rate Limit Exceeded", + } + for _, s := range cases { + if !containsTechnicalErrorLanguage(s) { + t.Errorf("expected true for %q, got false", s) + } + } +} + +func TestContainsTechnicalError_Negatives(t *testing.T) { + cases := []string{ + "", + "Great job!", + "Your pronunciation is improving.", + "Please try again.", + "I heard you say: hello world.", + } + for _, s := range cases { + if containsTechnicalErrorLanguage(s) { + t.Errorf("expected false for %q, got true", s) + } + } +} + +// --------------------------------------------------------------------------- +// extractTranscriptFromInbound +// --------------------------------------------------------------------------- + +func TestExtractTranscript_Present(t *testing.T) { + cases := []struct { + input string + want string + }{ + { + input: `hello world`, + want: "hello world", + }, + { + input: ` spaces around `, + want: "spaces around", + }, + { + input: "\n\nMulti\nline\ntranscript\n\n", + want: "Multi line transcript", + }, + { + input: "only transcript", + want: "only transcript", + }, + } + for _, tc := range cases { + got := extractTranscriptFromInbound(tc.input) + if got != tc.want { + t.Errorf("input %q: expected %q, got %q", tc.input, tc.want, got) + } + } +} + +func TestExtractTranscript_Absent(t *testing.T) { + cases := []string{ + "", + "plain text message", + "", + } + for _, s := range cases { + got := extractTranscriptFromInbound(s) + if got != "" { + t.Errorf("expected empty transcript for %q, got %q", s, got) + } + } +} + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +func contains(s, substr string) bool { + return strings.Contains(s, substr) +} diff --git a/internal/agent/loop.go b/internal/agent/loop.go index 4845151f3..f44fb4f87 100644 --- a/internal/agent/loop.go +++ b/internal/agent/loop.go @@ -3,6 +3,7 @@ package agent import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "os" @@ -44,21 +45,22 @@ type BootstrapCleanupFunc func(ctx context.Context, agentID uuid.UUID, userID st // Loop is the agent execution loop for one agent instance. // Think → Act → Observe cycle with tool execution. type Loop struct { - id string - agentUUID uuid.UUID // set in managed mode for context propagation - agentType string // "open" or "predefined" (managed mode) - provider providers.Provider - model string - contextWindow int - maxIterations int - workspace string - - eventPub bus.EventPublisher // currently unused by Loop; kept for future use - sessions store.SessionStore + id string + agentUUID uuid.UUID // set in managed mode for context propagation + agentType string // "open" or "predefined" (managed mode) + provider providers.Provider + model string + modelFallbacks []string + contextWindow int + maxIterations int + workspace string + + eventPub bus.EventPublisher // currently unused by Loop; kept for future use + sessions store.SessionStore tools *tools.Registry toolPolicy *tools.PolicyEngine // optional: filters tools sent to LLM agentToolPolicy *config.ToolPolicySpec // per-agent tool policy from DB (nil = no restrictions) - activeRuns atomic.Int32 // number of currently executing runs + activeRuns atomic.Int32 // number of currently executing runs // Per-session summarization lock: prevents concurrent summarize goroutines for the same session. summarizeMu sync.Map // sessionKey → *sync.Mutex @@ -71,10 +73,10 @@ type Loop struct { contextFiles []bootstrap.ContextFile // Per-user file seeding + dynamic context loading (managed mode) - ensureUserFiles EnsureUserFilesFunc - contextFileLoader ContextFileLoaderFunc - bootstrapCleanup BootstrapCleanupFunc - seededUsers sync.Map // userID → true, avoid re-check per request + ensureUserFiles EnsureUserFilesFunc + contextFileLoader ContextFileLoaderFunc + bootstrapCleanup BootstrapCleanupFunc + seededUsers sync.Map // userID → true, avoid re-check per request // Compaction config (memory flush settings) compactionCfg *config.CompactionConfig @@ -83,8 +85,8 @@ type Loop struct { contextPruningCfg *config.ContextPruningConfig // Sandbox info - sandboxEnabled bool - sandboxContainerDir string + sandboxEnabled bool + sandboxContainerDir string sandboxWorkspaceAccess string // Event callback for broadcasting agent events (run.started, chunk, tool.call, etc.) @@ -107,7 +109,7 @@ type Loop struct { // AgentEvent is emitted during agent execution for WS broadcasting. type AgentEvent struct { - Type string `json:"type"` // "run.started", "run.completed", "run.failed", "chunk", "tool.call", "tool.result" + Type string `json:"type"` // "run.started", "run.completed", "run.failed", "chunk", "tool.call", "tool.result" AgentID string `json:"agentId"` RunID string `json:"runId"` Payload interface{} `json:"payload,omitempty"` @@ -115,14 +117,15 @@ type AgentEvent struct { // LoopConfig configures a new Loop. type LoopConfig struct { - ID string - Provider providers.Provider - Model string - ContextWindow int - MaxIterations int - Workspace string - Bus bus.EventPublisher - Sessions store.SessionStore + ID string + Provider providers.Provider + Model string + ModelFallbacks []string + ContextWindow int + MaxIterations int + Workspace string + Bus bus.EventPublisher + Sessions store.SessionStore Tools *tools.Registry ToolPolicy *tools.PolicyEngine // optional: filters tools sent to LLM AgentToolPolicy *config.ToolPolicySpec // per-agent tool policy from DB (nil = no restrictions) @@ -142,8 +145,8 @@ type LoopConfig struct { ContextPruningCfg *config.ContextPruningConfig // Sandbox info (injected into system prompt) - SandboxEnabled bool - SandboxContainerDir string // e.g. "/workspace" + SandboxEnabled bool + SandboxContainerDir string // e.g. "/workspace" SandboxWorkspaceAccess string // "none", "ro", "rw" // Managed mode: agent UUID for context propagation to tools @@ -159,9 +162,9 @@ type LoopConfig struct { TraceCollector *tracing.Collector // Security: input guard for injection detection, max message size - InputGuard *InputGuard // nil = auto-create when InjectionAction != "off" - InjectionAction string // "log", "warn" (default), "block", "off" - MaxMessageChars int // 0 = use default (32000) + InputGuard *InputGuard // nil = auto-create when InjectionAction != "off" + InjectionAction string // "log", "warn" (default), "block", "off" + MaxMessageChars int // 0 = use default (32000) // Global builtin tool settings (from builtin_tools table, managed mode) BuiltinToolSettings tools.BuiltinToolSettings @@ -193,62 +196,84 @@ func NewLoop(cfg LoopConfig) *Loop { guard = NewInputGuard() } + // Normalize model fallback list: trim, dedupe, exclude primary model. + fallbacks := make([]string, 0, len(cfg.ModelFallbacks)) + if len(cfg.ModelFallbacks) > 0 { + seen := map[string]struct{}{} + primary := strings.TrimSpace(cfg.Model) + if primary != "" { + seen[primary] = struct{}{} + } + for _, raw := range cfg.ModelFallbacks { + model := strings.TrimSpace(raw) + if model == "" { + continue + } + if _, ok := seen[model]; ok { + continue + } + seen[model] = struct{}{} + fallbacks = append(fallbacks, model) + } + } + return &Loop{ - id: cfg.ID, - agentUUID: cfg.AgentUUID, - agentType: cfg.AgentType, - provider: cfg.Provider, - model: cfg.Model, - contextWindow: cfg.ContextWindow, - maxIterations: cfg.MaxIterations, - workspace: cfg.Workspace, - eventPub: cfg.Bus, - sessions: cfg.Sessions, - tools: cfg.Tools, - toolPolicy: cfg.ToolPolicy, - agentToolPolicy: cfg.AgentToolPolicy, - onEvent: cfg.OnEvent, - ownerIDs: cfg.OwnerIDs, - skillsLoader: cfg.SkillsLoader, - skillAllowList: cfg.SkillAllowList, - hasMemory: cfg.HasMemory, - contextFiles: cfg.ContextFiles, - ensureUserFiles: cfg.EnsureUserFiles, - contextFileLoader: cfg.ContextFileLoader, - bootstrapCleanup: cfg.BootstrapCleanup, - compactionCfg: cfg.CompactionCfg, - contextPruningCfg: cfg.ContextPruningCfg, - sandboxEnabled: cfg.SandboxEnabled, - sandboxContainerDir: cfg.SandboxContainerDir, + id: cfg.ID, + agentUUID: cfg.AgentUUID, + agentType: cfg.AgentType, + provider: cfg.Provider, + model: cfg.Model, + modelFallbacks: fallbacks, + contextWindow: cfg.ContextWindow, + maxIterations: cfg.MaxIterations, + workspace: cfg.Workspace, + eventPub: cfg.Bus, + sessions: cfg.Sessions, + tools: cfg.Tools, + toolPolicy: cfg.ToolPolicy, + agentToolPolicy: cfg.AgentToolPolicy, + onEvent: cfg.OnEvent, + ownerIDs: cfg.OwnerIDs, + skillsLoader: cfg.SkillsLoader, + skillAllowList: cfg.SkillAllowList, + hasMemory: cfg.HasMemory, + contextFiles: cfg.ContextFiles, + ensureUserFiles: cfg.EnsureUserFiles, + contextFileLoader: cfg.ContextFileLoader, + bootstrapCleanup: cfg.BootstrapCleanup, + compactionCfg: cfg.CompactionCfg, + contextPruningCfg: cfg.ContextPruningCfg, + sandboxEnabled: cfg.SandboxEnabled, + sandboxContainerDir: cfg.SandboxContainerDir, sandboxWorkspaceAccess: cfg.SandboxWorkspaceAccess, - traceCollector: cfg.TraceCollector, - inputGuard: guard, - injectionAction: action, - maxMessageChars: cfg.MaxMessageChars, - builtinToolSettings: cfg.BuiltinToolSettings, - thinkingLevel: cfg.ThinkingLevel, + traceCollector: cfg.TraceCollector, + inputGuard: guard, + injectionAction: action, + maxMessageChars: cfg.MaxMessageChars, + builtinToolSettings: cfg.BuiltinToolSettings, + thinkingLevel: cfg.ThinkingLevel, } } // RunRequest is the input for processing a message through the agent. type RunRequest struct { - SessionKey string // composite key: agent:{agentId}:{channel}:{peerKind}:{chatId} - Message string // user message - Media []string // local file paths to images (already sanitized) - ForwardMedia []string // media paths to forward to output (not deleted, from delegation results) - Channel string // source channel - ChatID string // source chat ID - PeerKind string // "direct" or "group" (for session key building and tool context) - RunID string // unique run identifier - UserID string // external user ID (TEXT, free-form) for multi-tenant scoping - SenderID string // original individual sender ID (preserved in group chats for permission checks) - Stream bool // whether to stream response chunks - ExtraSystemPrompt string // optional: injected into system prompt (skills, subagent context, etc.) - HistoryLimit int // max user turns to keep in context (0=unlimited, from channel config) - ParentTraceID uuid.UUID // if set, reuse parent trace instead of creating new (announce runs) - ParentRootSpanID uuid.UUID // if set, nest announce agent span under this parent span - TraceName string // override trace name (default: "chat ") - TraceTags []string // additional tags for the trace (e.g. "cron") + SessionKey string // composite key: agent:{agentId}:{channel}:{peerKind}:{chatId} + Message string // user message + Media []string // local file paths to images (already sanitized) + ForwardMedia []string // media paths to forward to output (not deleted, from delegation results) + Channel string // source channel + ChatID string // source chat ID + PeerKind string // "direct" or "group" (for session key building and tool context) + RunID string // unique run identifier + UserID string // external user ID (TEXT, free-form) for multi-tenant scoping + SenderID string // original individual sender ID (preserved in group chats for permission checks) + Stream bool // whether to stream response chunks + ExtraSystemPrompt string // optional: injected into system prompt (skills, subagent context, etc.) + HistoryLimit int // max user turns to keep in context (0=unlimited, from channel config) + ParentTraceID uuid.UUID // if set, reuse parent trace instead of creating new (announce runs) + ParentRootSpanID uuid.UUID // if set, nest announce agent span under this parent span + TraceName string // override trace name (default: "chat ") + TraceTags []string // additional tags for the trace (e.g. "cron") } // RunResult is the output of a completed agent run. @@ -262,11 +287,112 @@ type RunResult struct { // MediaResult represents a media file produced by a tool during the agent run. type MediaResult struct { - Path string `json:"path"` // local file path + Path string `json:"path"` // local file path ContentType string `json:"content_type,omitempty"` // MIME type AsVoice bool `json:"as_voice,omitempty"` // send as voice message (Telegram OGG) } +// modelCandidates returns primary model + configured fallbacks, de-duplicated. +// Empty model is kept as a single candidate to allow provider default model usage. +func (l *Loop) modelCandidates(primary string) []string { + candidates := make([]string, 0, 1+len(l.modelFallbacks)) + seen := map[string]struct{}{} + add := func(raw string) { + model := strings.TrimSpace(raw) + if model == "" { + return + } + if _, ok := seen[model]; ok { + return + } + seen[model] = struct{}{} + candidates = append(candidates, model) + } + + add(primary) + for _, m := range l.modelFallbacks { + add(m) + } + if len(candidates) == 0 { + return []string{""} + } + return candidates +} + +// callProviderWithFallback calls the provider and retries with fallback models +// when the upstream rejects the primary model due to rate limiting. +func (l *Loop) callProviderWithFallback( + ctx context.Context, + req providers.ChatRequest, + stream bool, + onChunk func(providers.StreamChunk), +) (*providers.ChatResponse, string, error) { + candidates := l.modelCandidates(req.Model) + var lastErr error + lastModel := req.Model + + for idx, model := range candidates { + chatReq := req + chatReq.Model = model + + var ( + resp *providers.ChatResponse + err error + ) + if stream { + resp, err = l.provider.ChatStream(ctx, chatReq, onChunk) + } else { + resp, err = l.provider.Chat(ctx, chatReq) + } + if err == nil { + if idx > 0 { + slog.Warn("llm model fallback succeeded", + "agent", l.id, + "provider", l.provider.Name(), + "model", model, + "attempt", idx+1, + "candidates", len(candidates), + ) + } + return resp, model, nil + } + + lastErr = err + lastModel = model + if !isRateLimitFailure(err) || idx == len(candidates)-1 { + return nil, model, err + } + + nextModel := candidates[idx+1] + slog.Warn("llm model rate-limited; trying fallback model", + "agent", l.id, + "provider", l.provider.Name(), + "current_model", model, + "next_model", nextModel, + "attempt", idx+1, + "candidates", len(candidates), + "error", err.Error(), + ) + } + + return nil, lastModel, lastErr +} + +func isRateLimitFailure(err error) bool { + if err == nil { + return false + } + var httpErr *providers.HTTPError + if errors.As(err, &httpErr) { + return httpErr.Status == 429 + } + lower := strings.ToLower(err.Error()) + return strings.Contains(lower, "rate limit") || + strings.Contains(lower, "too many requests") || + strings.Contains(lower, "quota exceeded") || + strings.Contains(lower, "resource_exhausted") +} + // Run processes a single message through the agent loop. // It blocks until completion and returns the final response. func (l *Loop) Run(ctx context.Context, req RunRequest) (*RunResult, error) { @@ -513,7 +639,7 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) var totalUsage providers.Usage iteration := 0 var finalContent string - var asyncToolCalls []string // track async spawn tool names for fallback + var asyncToolCalls []string // track async spawn tool names for fallback var mediaResults []MediaResult // media files from tool MEDIA: results // Inject retry hook so channels can update placeholder on LLM retries. @@ -564,38 +690,34 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error) // Call LLM (streaming or non-streaming) var resp *providers.ChatResponse var err error + var usedModel string llmSpanStart := time.Now().UTC() - - if req.Stream { - resp, err = l.provider.ChatStream(ctx, chatReq, func(chunk providers.StreamChunk) { - if chunk.Thinking != "" { - l.emit(AgentEvent{ - Type: protocol.ChatEventThinking, - AgentID: l.id, - RunID: req.RunID, - Payload: map[string]string{"content": chunk.Thinking}, - }) - } - if chunk.Content != "" { - l.emit(AgentEvent{ - Type: protocol.ChatEventChunk, - AgentID: l.id, - RunID: req.RunID, - Payload: map[string]string{"content": chunk.Content}, - }) - } - }) - } else { - resp, err = l.provider.Chat(ctx, chatReq) - } + resp, usedModel, err = l.callProviderWithFallback(ctx, chatReq, req.Stream, func(chunk providers.StreamChunk) { + if chunk.Thinking != "" { + l.emit(AgentEvent{ + Type: protocol.ChatEventThinking, + AgentID: l.id, + RunID: req.RunID, + Payload: map[string]string{"content": chunk.Thinking}, + }) + } + if chunk.Content != "" { + l.emit(AgentEvent{ + Type: protocol.ChatEventChunk, + AgentID: l.id, + RunID: req.RunID, + Payload: map[string]string{"content": chunk.Content}, + }) + } + }) if err != nil { - l.emitLLMSpan(ctx, llmSpanStart, iteration, messages, nil, err) + l.emitLLMSpan(ctx, llmSpanStart, iteration, usedModel, messages, nil, err) return nil, fmt.Errorf("LLM call failed (iteration %d): %w", iteration, err) } - l.emitLLMSpan(ctx, llmSpanStart, iteration, messages, resp, nil) + l.emitLLMSpan(ctx, llmSpanStart, iteration, usedModel, messages, resp, nil) if resp.Usage != nil { totalUsage.PromptTokens += resp.Usage.PromptTokens diff --git a/internal/agent/loop_fallback_test.go b/internal/agent/loop_fallback_test.go new file mode 100644 index 000000000..0f71b01d4 --- /dev/null +++ b/internal/agent/loop_fallback_test.go @@ -0,0 +1,107 @@ +package agent + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/nextlevelbuilder/goclaw/internal/providers" +) + +type scriptedProvider struct { + calls []string + failByModel map[string]error +} + +func (p *scriptedProvider) Chat(_ context.Context, req providers.ChatRequest) (*providers.ChatResponse, error) { + p.calls = append(p.calls, req.Model) + if err, ok := p.failByModel[req.Model]; ok { + return nil, err + } + return &providers.ChatResponse{Content: "ok", FinishReason: "stop"}, nil +} + +func (p *scriptedProvider) ChatStream(ctx context.Context, req providers.ChatRequest, onChunk func(providers.StreamChunk)) (*providers.ChatResponse, error) { + resp, err := p.Chat(ctx, req) + if err != nil { + return nil, err + } + if onChunk != nil { + onChunk(providers.StreamChunk{Content: resp.Content}) + onChunk(providers.StreamChunk{Done: true}) + } + return resp, nil +} + +func (p *scriptedProvider) DefaultModel() string { return "" } +func (p *scriptedProvider) Name() string { return "openrouter" } + +func TestCallProviderWithFallback_OnRateLimitUsesNextModel(t *testing.T) { + prov := &scriptedProvider{failByModel: map[string]error{ + "m1": &providers.HTTPError{Status: 429, Body: "rate limit"}, + }} + loop := &Loop{ + id: "router-agent", + provider: prov, + model: "m1", + modelFallbacks: []string{"m2"}, + } + + resp, usedModel, err := loop.callProviderWithFallback(context.Background(), providers.ChatRequest{Model: "m1"}, false, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if usedModel != "m2" { + t.Fatalf("usedModel = %q, want %q", usedModel, "m2") + } + if resp == nil || resp.Content != "ok" { + t.Fatalf("unexpected response: %#v", resp) + } + wantCalls := []string{"m1", "m2"} + if !reflect.DeepEqual(prov.calls, wantCalls) { + t.Fatalf("calls = %#v, want %#v", prov.calls, wantCalls) + } +} + +func TestCallProviderWithFallback_NonRateLimitDoesNotFallback(t *testing.T) { + prov := &scriptedProvider{failByModel: map[string]error{ + "m1": &providers.HTTPError{Status: 400, Body: "bad request"}, + }} + loop := &Loop{ + id: "router-agent", + provider: prov, + model: "m1", + modelFallbacks: []string{"m2"}, + } + + _, usedModel, err := loop.callProviderWithFallback(context.Background(), providers.ChatRequest{Model: "m1"}, false, nil) + if err == nil { + t.Fatal("expected error, got nil") + } + if usedModel != "m1" { + t.Fatalf("usedModel = %q, want %q", usedModel, "m1") + } + wantCalls := []string{"m1"} + if !reflect.DeepEqual(prov.calls, wantCalls) { + t.Fatalf("calls = %#v, want %#v", prov.calls, wantCalls) + } +} + +func TestModelCandidates_DedupesAndKeepsOrder(t *testing.T) { + loop := &Loop{ + modelFallbacks: []string{"m1", "", "m2", "m2", "m3"}, + } + got := loop.modelCandidates("m1") + want := []string{"m1", "m2", "m3"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("candidates = %#v, want %#v", got, want) + } +} + +func TestIsRateLimitFailure_RecognizesWrappedHTTP429(t *testing.T) { + err := errors.New("wrapper: " + (&providers.HTTPError{Status: 429, Body: "too many requests"}).Error()) + if !isRateLimitFailure(err) { + t.Fatal("expected wrapped 429-like error to be treated as rate-limit") + } +} diff --git a/internal/agent/loop_tracing.go b/internal/agent/loop_tracing.go index f9ad10364..1a5e5f6ad 100644 --- a/internal/agent/loop_tracing.go +++ b/internal/agent/loop_tracing.go @@ -33,23 +33,26 @@ func (l *Loop) IsRunning() bool { return l.activeRuns.Load() > 0 } // emitLLMSpan records an LLM call span if tracing is active. // When GOCLAW_TRACE_VERBOSE is set, messages are serialized as InputPreview. -func (l *Loop) emitLLMSpan(ctx context.Context, start time.Time, iteration int, messages []providers.Message, resp *providers.ChatResponse, callErr error) { +func (l *Loop) emitLLMSpan(ctx context.Context, start time.Time, iteration int, model string, messages []providers.Message, resp *providers.ChatResponse, callErr error) { traceID := tracing.TraceIDFromContext(ctx) collector := tracing.CollectorFromContext(ctx) if collector == nil || traceID == uuid.Nil { return } + if model == "" { + model = l.model + } now := time.Now().UTC() dur := int(now.Sub(start).Milliseconds()) span := store.SpanData{ TraceID: traceID, SpanType: store.SpanTypeLLMCall, - Name: fmt.Sprintf("%s/%s #%d", l.provider.Name(), l.model, iteration), + Name: fmt.Sprintf("%s/%s #%d", l.provider.Name(), model, iteration), StartTime: start, EndTime: &now, DurationMS: dur, - Model: l.model, + Model: model, Provider: l.provider.Name(), Status: store.SpanStatusCompleted, Level: store.SpanLevelDefault, diff --git a/internal/agent/resolver.go b/internal/agent/resolver.go index 5d5779a23..223188bc0 100644 --- a/internal/agent/resolver.go +++ b/internal/agent/resolver.go @@ -21,13 +21,13 @@ import ( // ResolverDeps holds shared dependencies for the managed-mode agent resolver. type ResolverDeps struct { - AgentStore store.AgentStore - ProviderReg *providers.Registry - Bus bus.EventPublisher - Sessions store.SessionStore - Tools *tools.Registry - ToolPolicy *tools.PolicyEngine - Skills *skills.Loader + AgentStore store.AgentStore + ProviderReg *providers.Registry + Bus bus.EventPublisher + Sessions store.SessionStore + Tools *tools.Registry + ToolPolicy *tools.PolicyEngine + Skills *skills.Loader HasMemory bool OnEvent func(AgentEvent) TraceCollector *tracing.Collector @@ -259,28 +259,28 @@ func NewManagedResolver(deps ResolverDeps) ResolverFunc { } loop := NewLoop(LoopConfig{ - ID: ag.AgentKey, - AgentUUID: ag.ID, - AgentType: ag.AgentType, - Provider: provider, - Model: ag.Model, - ContextWindow: contextWindow, - MaxIterations: maxIter, - Workspace: workspace, - Bus: deps.Bus, - Sessions: deps.Sessions, - Tools: toolsReg, - ToolPolicy: deps.ToolPolicy, - AgentToolPolicy: ag.ParseToolsConfig(), - SkillsLoader: deps.Skills, - HasMemory: hasMemory, - ContextFiles: contextFiles, - EnsureUserFiles: deps.EnsureUserFiles, - ContextFileLoader: deps.ContextFileLoader, - BootstrapCleanup: deps.BootstrapCleanup, - OnEvent: deps.OnEvent, - TraceCollector: deps.TraceCollector, - InjectionAction: deps.InjectionAction, + ID: ag.AgentKey, + AgentUUID: ag.ID, + AgentType: ag.AgentType, + Provider: provider, + Model: ag.Model, + ContextWindow: contextWindow, + MaxIterations: maxIter, + Workspace: workspace, + Bus: deps.Bus, + Sessions: deps.Sessions, + Tools: toolsReg, + ToolPolicy: deps.ToolPolicy, + AgentToolPolicy: ag.ParseToolsConfig(), + SkillsLoader: deps.Skills, + HasMemory: hasMemory, + ContextFiles: contextFiles, + EnsureUserFiles: deps.EnsureUserFiles, + ContextFileLoader: deps.ContextFileLoader, + BootstrapCleanup: deps.BootstrapCleanup, + OnEvent: deps.OnEvent, + TraceCollector: deps.TraceCollector, + InjectionAction: deps.InjectionAction, MaxMessageChars: deps.MaxMessageChars, CompactionCfg: compactionCfg, ContextPruningCfg: contextPruningCfg, @@ -288,7 +288,8 @@ func NewManagedResolver(deps ResolverDeps) ResolverFunc { SandboxContainerDir: sandboxContainerDir, SandboxWorkspaceAccess: sandboxWorkspaceAccess, BuiltinToolSettings: builtinSettings, - ThinkingLevel: ag.ParseThinkingLevel(), + ThinkingLevel: ag.ParseThinkingLevel(), + ModelFallbacks: ag.ParseModelFallbacks(), }) slog.Info("resolved agent from DB", "agent", agentKey, "model", ag.Model, "provider", ag.Provider) diff --git a/internal/channels/telegram/channel.go b/internal/channels/telegram/channel.go index 2ff33a188..c02b5f29e 100644 --- a/internal/channels/telegram/channel.go +++ b/internal/channels/telegram/channel.go @@ -34,11 +34,13 @@ type Channel struct { pairingReplySent sync.Map // userID string → time.Time (debounce pairing replies) threadIDs sync.Map // localKey string → messageThreadID int (for forum topic routing) approvedGroups sync.Map // chatIDStr string → true (cached group pairing approval) + dmAgentAffinity sync.Map // chatIDStr string → dmAffinity (sticky DM route) + sttSem sttSem // bounds parallel STT HTTP calls to sttMaxConcurrent groupHistory *channels.PendingHistory historyLimit int requireMention bool pollCancel context.CancelFunc // cancels the long polling context - pollDone chan struct{} // closed when polling goroutine exits + pollDone chan struct{} // closed when polling goroutine exits } type thinkingCancel struct { diff --git a/internal/channels/telegram/factory.go b/internal/channels/telegram/factory.go index a8b569857..b81cd1f26 100644 --- a/internal/channels/telegram/factory.go +++ b/internal/channels/telegram/factory.go @@ -17,6 +17,12 @@ type telegramCreds struct { } // telegramInstanceConfig maps the non-secret config JSONB from the channel_instances table. +// It supports two JSON layouts for voice settings: +// - Nested (preferred for new rows): {"voice": {"agent_id": "speaking-agent", ...}} +// - Flat (legacy, still accepted): {"voice_agent_id": "speaking-agent", ...} +// +// buildChannel promotes flat fields into the nested Voice struct when Voice.AgentID is empty, +// so existing DB rows continue to work without migration. type telegramInstanceConfig struct { DMPolicy string `json:"dm_policy,omitempty"` GroupPolicy string `json:"group_policy,omitempty"` @@ -27,6 +33,24 @@ type telegramInstanceConfig struct { MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` LinkPreview *bool `json:"link_preview,omitempty"` AllowFrom []string `json:"allow_from,omitempty"` + + // Nested voice config — preferred layout for new DB rows. + Voice config.TelegramVoiceConfig `json:"voice,omitempty"` + + // Legacy flat fields — populated by older DB rows. + // buildChannel promotes these into Voice when Voice.AgentID is empty. + LegacySTTProxyURL string `json:"stt_proxy_url,omitempty"` + LegacySTTAPIKey string `json:"stt_api_key,omitempty"` + LegacySTTTenantID string `json:"stt_tenant_id,omitempty"` + LegacySTTTimeoutSec int `json:"stt_timeout_seconds,omitempty"` + LegacyVoiceAgentID string `json:"voice_agent_id,omitempty"` + LegacyVoiceStartMessage string `json:"voice_start_message,omitempty"` + LegacyVoiceIntentKeywords []string `json:"voice_intent_keywords,omitempty"` + LegacyVoiceAffinityClearKeywords []string `json:"voice_affinity_clear_keywords,omitempty"` + LegacyVoiceAffinityTTLMinutes int `json:"voice_affinity_ttl_minutes,omitempty"` + LegacyVoiceDMContextTemplate string `json:"voice_dm_context_template,omitempty"` + LegacyAudioGuardFallbackTranscript string `json:"audio_guard_fallback_transcript,omitempty"` + LegacyAudioGuardFallbackNoTranscript string `json:"audio_guard_fallback_no_transcript,omitempty"` } // Factory creates a Telegram channel from DB instance data (no agent/team store). @@ -64,6 +88,34 @@ func buildChannel(name string, creds json.RawMessage, cfg json.RawMessage, } } + // Resolve voice config: prefer the nested "voice" block. + // When absent, promote flat legacy fields so existing DB rows need no migration. + // + // IMPORTANT — legacy promotion is all-or-nothing: + // if Voice.AgentID is already set in the nested block, we assume the row + // has been fully migrated and skip ALL flat fields. Partial migrations + // (nested AgentID + flat keywords) are not supported. Migrate all voice + // fields to the nested block in one atomic DB update. + voiceCfg := ic.Voice + if voiceCfg.AgentID == "" && ic.LegacyVoiceAgentID != "" { + // Promote all flat voice fields as a unit (all-or-nothing). + voiceCfg.AgentID = ic.LegacyVoiceAgentID + voiceCfg.StartMessage = ic.LegacyVoiceStartMessage + voiceCfg.IntentKeywords = ic.LegacyVoiceIntentKeywords + voiceCfg.AffinityClearKeywords = ic.LegacyVoiceAffinityClearKeywords + voiceCfg.AffinityTTLMinutes = ic.LegacyVoiceAffinityTTLMinutes + voiceCfg.DMContextTemplate = ic.LegacyVoiceDMContextTemplate + voiceCfg.AudioGuardFallbackTranscript = ic.LegacyAudioGuardFallbackTranscript + voiceCfg.AudioGuardFallbackNoTranscript = ic.LegacyAudioGuardFallbackNoTranscript + } + // STT fields are batched together: if no URL, the other STT fields are meaningless. + if voiceCfg.STTProxyURL == "" && ic.LegacySTTProxyURL != "" { + voiceCfg.STTProxyURL = ic.LegacySTTProxyURL + voiceCfg.STTAPIKey = ic.LegacySTTAPIKey + voiceCfg.STTTenantID = ic.LegacySTTTenantID + voiceCfg.STTTimeoutSeconds = ic.LegacySTTTimeoutSec + } + tgCfg := config.TelegramConfig{ Enabled: true, Token: c.Token, @@ -77,6 +129,7 @@ func buildChannel(name string, creds json.RawMessage, cfg json.RawMessage, ReactionLevel: ic.ReactionLevel, MediaMaxBytes: ic.MediaMaxBytes, LinkPreview: ic.LinkPreview, + Voice: voiceCfg, } // DB instances default to "pairing" for groups (secure by default). diff --git a/internal/channels/telegram/handlers.go b/internal/channels/telegram/handlers.go index 381d8e3d9..1543d528e 100644 --- a/internal/channels/telegram/handlers.go +++ b/internal/channels/telegram/handlers.go @@ -15,6 +15,22 @@ import ( "github.com/nextlevelbuilder/goclaw/internal/channels/typing" ) +const defaultVoiceAffinityTTL = 6 * time.Hour + +type dmAffinity struct { + AgentID string + UpdatedAt time.Time +} + +// voiceAffinityTTL returns the configured DM affinity TTL for the channel, +// falling back to defaultVoiceAffinityTTL (6h) when not explicitly set. +func (c *Channel) voiceAffinityTTL() time.Duration { + if mins := c.config.Voice.AffinityTTLMinutes; mins > 0 { + return time.Duration(mins) * time.Minute + } + return defaultVoiceAffinityTTL +} + // handleMessage processes an incoming Telegram update. func (c *Channel) handleMessage(ctx context.Context, update telego.Update) { message := update.Message @@ -181,6 +197,11 @@ func (c *Channel) handleMessage(ctx context.Context, update telego.Update) { ) } else { m.Transcript = transcript + if transcript != "" { + slog.Info("telegram: transcript attached to inbound media", + "type", m.Type, "chars", len(transcript), + ) + } } case "document": @@ -374,21 +395,10 @@ func (c *Channel) handleMessage(ctx context.Context, update telego.Update) { peerKind = "group" } - // Audio-aware routing: if a voice/audio message was received and a dedicated speaking agent - // is configured, route to that agent instead of the default channel agent. - // This prevents voice turns from landing on a text-router agent that cannot handle audio. - targetAgentID := c.AgentID() - if c.config.VoiceAgentID != "" { - for _, m := range mediaList { - if m.Type == "audio" || m.Type == "voice" { - targetAgentID = c.config.VoiceAgentID - slog.Debug("telegram: routing voice inbound to speaking agent", - "agent_id", targetAgentID, "media_type", m.Type, - ) - break - } - } - } + // Audio-aware routing: delegate to resolveTargetAgent so that the priority + // chain (audio_media → start_command → voice_intent → session_affinity) is + // independently testable without Telegram bot dependencies. + targetAgentID, finalContent := c.resolveTargetAgent(chatIDStr, isGroup, mediaList, finalContent) c.Bus().PublishInbound(bus.InboundMessage{ Channel: c.Name(), @@ -409,6 +419,40 @@ func (c *Channel) handleMessage(ctx context.Context, update telego.Update) { } } +// matchesVoiceIntent reports whether normalized (lowercased, trimmed) DM text contains any of +// the deployment-configured VoiceIntentKeywords. Returns false when the keyword list is empty, +// effectively disabling text-intent routing for deployments that don't need it. +func (c *Channel) matchesVoiceIntent(normalized string) bool { + if len(c.config.Voice.IntentKeywords) == 0 || normalized == "" { + return false + } + // Lowercase each keyword defensively: the caller already lowercases the + // inbound text, but config keywords may arrive with mixed case from DB. + for _, kw := range c.config.Voice.IntentKeywords { + if strings.Contains(normalized, strings.ToLower(kw)) { + return true + } + } + return false +} + +// matchesAffinityClear reports whether normalized DM text matches any of the deployment-configured +// VoiceAffinityClearKeywords, which signals that the user wants a non-voice agent. Returns false +// when the keyword list is empty (affinity is then only cleared by TTL expiry). +func (c *Channel) matchesAffinityClear(normalized string) bool { + if len(c.config.Voice.AffinityClearKeywords) == 0 || normalized == "" { + return false + } + // Lowercase each keyword defensively: the caller already lowercases the + // inbound text, but config keywords may arrive with mixed case from DB. + for _, kw := range c.config.Voice.AffinityClearKeywords { + if strings.Contains(normalized, strings.ToLower(kw)) { + return true + } + } + return false +} + // detectMention checks if a Telegram message mentions the bot. // Checks both msg.Text/Entities (text messages) and msg.Caption/CaptionEntities (photo/media messages). func (c *Channel) detectMention(msg *telego.Message, botUsername string) bool { @@ -483,3 +527,105 @@ func isServiceMessage(msg *telego.Message) bool { // new_chat_title, new_chat_photo, pinned_message, etc.) return true } + +// resolveTargetAgent decides which agent should handle the inbound message and +// whether the content should be rewritten (e.g. /start → StartMessage). +// +// Priority chain: +// 1. Audio/voice media present → always route to Voice.AgentID +// 2. /start or "start" text (DM only) → route + rewrite content +// 3. Text matches IntentKeywords (DM) → route + set affinity +// 4. Existing non-expired affinity (DM)→ continue routing to affinity agent +// 5. AffinityClearKeywords match (DM) → evict affinity, route to default +// 6. Fallback → route to default agent +// +// No I/O side-effects: no Telegram API calls, no bus publish. +// State mutations (dmAgentAffinity store/delete) are intentional and fully +// contained here — this function is the single owner of affinity state changes. +func (c *Channel) resolveTargetAgent( + chatIDStr string, + isGroup bool, + mediaList []MediaInfo, + content string, +) (agentID string, finalContent string) { + agentID = c.AgentID() + finalContent = content + + voiceAgentID := c.config.Voice.AgentID + if voiceAgentID == "" { + // Voice routing not configured — every message goes to the default agent. + return + } + + routeReason := "" + + // Priority 1: audio/voice media present — highest priority, applies in groups too. + for _, m := range mediaList { + if m.Type == "audio" || m.Type == "voice" { + agentID = voiceAgentID + routeReason = "audio_media" + break + } + } + + // Priorities 2–5 only apply to DMs. + if routeReason == "" && !isGroup { + normalized := strings.ToLower(strings.TrimSpace(content)) + + switch { + case normalized == "/start" || normalized == "start": + // Priority 2: /start → bootstrap the voice session. + agentID = voiceAgentID + routeReason = "start_command" + startMsg := c.config.Voice.StartMessage + if startMsg == "" { + startMsg = "User sent /start." + } + finalContent = startMsg + + case c.matchesVoiceIntent(normalized): + // Priority 3: keyword signals the user wants a voice interaction. + agentID = voiceAgentID + routeReason = "voice_intent" + } + + if routeReason == "" { + if c.matchesAffinityClear(normalized) { + // Priority 5: user switched away from voice practice. + c.dmAgentAffinity.Delete(chatIDStr) + slog.Info("telegram: cleared DM affinity (keyword match)", "chat_id", chatIDStr) + } else if v, ok := c.dmAgentAffinity.Load(chatIDStr); ok { + // Priority 4: sticky session — keep routing to the affinity agent. + if affinity, ok := v.(dmAffinity); ok { + if time.Since(affinity.UpdatedAt) <= c.voiceAffinityTTL() && affinity.AgentID != "" { + agentID = affinity.AgentID + routeReason = "session_affinity" + } else { + // TTL expired — evict. + c.dmAgentAffinity.Delete(chatIDStr) + } + } + } + } + } + + // Persist affinity for DM routes that reach the voice agent. + // Group chats are excluded: affinity is only read inside the !isGroup block, + // so storing a group chatID would waste sync.Map space that is never reclaimed. + if !isGroup && (routeReason == "audio_media" || routeReason == "start_command" || + routeReason == "voice_intent" || routeReason == "session_affinity") { + c.dmAgentAffinity.Store(chatIDStr, dmAffinity{ + AgentID: voiceAgentID, + UpdatedAt: time.Now(), + }) + } + + if routeReason != "" { + slog.Info("telegram: routing inbound to voice agent", + "agent_id", agentID, + "reason", routeReason, + "is_group", isGroup, + ) + } + return +} diff --git a/internal/channels/telegram/handlers_voice_routing_test.go b/internal/channels/telegram/handlers_voice_routing_test.go new file mode 100644 index 000000000..a844424db --- /dev/null +++ b/internal/channels/telegram/handlers_voice_routing_test.go @@ -0,0 +1,274 @@ +package telegram + +// handlers_voice_routing_test.go — table-driven tests for resolveTargetAgent. +// +// Tests live in package telegram (white-box) so we can: +// - access unexported types (dmAffinity, MediaInfo) +// - seed dmAgentAffinity directly without an exported API +// - call resolveTargetAgent without going through the Telegram bot loop +// +// Each case creates a minimal Channel stub with a real BaseChannel so that +// c.AgentID() works, then asserts the returned (agentID, finalContent) pair +// and any affinity side-effects. + +import ( + "sync" + "testing" + "time" + + "github.com/nextlevelbuilder/goclaw/internal/channels" + "github.com/nextlevelbuilder/goclaw/internal/config" +) + +const ( + testDefaultAgent = "default-agent" + testVoiceAgent = "voice-agent" +) + +// newRoutingChannel builds the minimal Channel needed for resolveTargetAgent. +// It wires a real BaseChannel so c.AgentID() returns testDefaultAgent. +func newRoutingChannel(voiceCfg config.TelegramVoiceConfig) *Channel { + base := channels.NewBaseChannel("telegram", nil, nil) + base.SetAgentID(testDefaultAgent) + return &Channel{ + BaseChannel: base, + config: config.TelegramConfig{ + Voice: voiceCfg, + }, + } +} + +// ── Table-driven routing tests ──────────────────────────────────────────────── + +func TestResolveTargetAgent(t *testing.T) { + baseCfg := config.TelegramVoiceConfig{ + AgentID: testVoiceAgent, + StartMessage: "Voice session started.", + IntentKeywords: []string{"speaking", "pronunciation"}, + AffinityClearKeywords: []string{"homework", "payment"}, + AffinityTTLMinutes: 60, + } + + validAffinity := dmAffinity{AgentID: testVoiceAgent, UpdatedAt: time.Now()} + expiredAffinity := dmAffinity{AgentID: testVoiceAgent, UpdatedAt: time.Now().Add(-2 * time.Hour)} + + tests := []struct { + name string + voiceCfg config.TelegramVoiceConfig + chatID string + isGroup bool + mediaList []MediaInfo + content string + preAffinity *dmAffinity // non-nil → seed dmAgentAffinity before call + wantAgentID string + wantContent string // "" means content must remain unchanged + wantAffinity bool // true = affinity entry must exist after call + }{ + // ── Priority 1: audio/voice media ────────────────────────────────────── + { + name: "audio in DM → voice agent", + voiceCfg: baseCfg, + chatID: "c1", + isGroup: false, + mediaList: []MediaInfo{{Type: "audio"}}, + content: "hello", + wantAgentID: testVoiceAgent, + wantAffinity: true, + }, + { + name: "voice in group → voice agent (audio overrides group check)", + voiceCfg: baseCfg, + chatID: "c2", + isGroup: true, + mediaList: []MediaInfo{{Type: "voice"}}, + wantAgentID: testVoiceAgent, + // Group chats must NOT have affinity stored — it is never read for groups + // and would accumulate indefinitely in sync.Map. + wantAffinity: false, + }, + // ── Priority 2: /start command ───────────────────────────────────────── + { + name: "/start rewrites content with StartMessage", + voiceCfg: baseCfg, + chatID: "c3", + isGroup: false, + content: "/start", + wantAgentID: testVoiceAgent, + wantContent: "Voice session started.", + wantAffinity: true, + }, + { + name: "bare 'start' keyword also rewrites", + voiceCfg: baseCfg, + chatID: "c4", + isGroup: false, + content: "start", + wantAgentID: testVoiceAgent, + wantContent: "Voice session started.", + wantAffinity: true, + }, + { + name: "/start in group does NOT route (only audio does)", + voiceCfg: baseCfg, + chatID: "c5", + isGroup: true, + content: "/start", + wantAgentID: testDefaultAgent, + wantAffinity: false, + }, + { + name: "/start with no StartMessage uses built-in default", + voiceCfg: config.TelegramVoiceConfig{ + AgentID: testVoiceAgent, + // StartMessage intentionally empty + }, + chatID: "c6", + isGroup: false, + content: "/start", + wantAgentID: testVoiceAgent, + wantContent: "User sent /start.", + wantAffinity: true, + }, + // ── Priority 3: intent keywords ──────────────────────────────────────── + { + name: "intent keyword match routes to voice agent", + voiceCfg: baseCfg, + chatID: "c7", + isGroup: false, + content: "I want to practice speaking today", + wantAgentID: testVoiceAgent, + wantAffinity: true, + }, + { + name: "intent keyword is case-insensitive", + voiceCfg: baseCfg, + chatID: "c8", + isGroup: false, + content: "Let's do some PRONUNCIATION practice", + wantAgentID: testVoiceAgent, + wantAffinity: true, + }, + { + name: "no keyword match → default agent", + voiceCfg: baseCfg, + chatID: "c9", + isGroup: false, + content: "What time does the library open?", + wantAgentID: testDefaultAgent, + wantAffinity: false, + }, + { + name: "intent keyword in group does NOT route", + voiceCfg: baseCfg, + chatID: "c10", + isGroup: true, + content: "speaking practice please", + wantAgentID: testDefaultAgent, + wantAffinity: false, + }, + // ── Priority 4: session affinity ─────────────────────────────────────── + { + name: "valid affinity continues routing to voice agent", + voiceCfg: baseCfg, + chatID: "c11", + isGroup: false, + preAffinity: &validAffinity, + content: "How was that?", + wantAgentID: testVoiceAgent, + wantAffinity: true, + }, + { + name: "expired affinity routes to default and is evicted", + voiceCfg: baseCfg, + chatID: "c12", + isGroup: false, + preAffinity: &expiredAffinity, + content: "How was that?", + wantAgentID: testDefaultAgent, + wantAffinity: false, + }, + // ── Priority 5: affinity clear keywords ─────────────────────────────── + { + name: "clear keyword evicts affinity → default agent", + voiceCfg: baseCfg, + chatID: "c13", + isGroup: false, + preAffinity: &validAffinity, + content: "I have a homework question", + wantAgentID: testDefaultAgent, + wantAffinity: false, + }, + // ── Voice agent not configured ───────────────────────────────────────── + { + name: "no voice agent → always default regardless of media", + voiceCfg: config.TelegramVoiceConfig{}, // AgentID empty + chatID: "c14", + isGroup: false, + mediaList: []MediaInfo{{Type: "voice"}}, + content: "/start", + wantAgentID: testDefaultAgent, + wantAffinity: false, + }, + } + + for _, tt := range tests { + tt := tt // capture loop var for t.Parallel() (Go < 1.22 safety) + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ch := newRoutingChannel(tt.voiceCfg) + if tt.preAffinity != nil { + ch.dmAgentAffinity.Store(tt.chatID, *tt.preAffinity) + } + + gotAgent, gotContent := ch.resolveTargetAgent( + tt.chatID, tt.isGroup, tt.mediaList, tt.content, + ) + + if gotAgent != tt.wantAgentID { + t.Errorf("agentID: got %q, want %q", gotAgent, tt.wantAgentID) + } + + expectedContent := tt.content + if tt.wantContent != "" { + expectedContent = tt.wantContent + } + if gotContent != expectedContent { + t.Errorf("content:\n got %q\n want %q", gotContent, expectedContent) + } + + _, hasAffinity := ch.dmAgentAffinity.Load(tt.chatID) + if tt.wantAffinity && !hasAffinity { + t.Error("affinity: expected entry to exist after call, but it was absent") + } + if !tt.wantAffinity && hasAffinity { + t.Error("affinity: expected entry to be absent after call, but it exists") + } + }) + } +} + +// TestResolveTargetAgent_AffinityRace verifies that concurrent calls on the +// same chatID do not cause data races on dmAgentAffinity (sync.Map). +// Run with -race to activate the Go race detector. +func TestResolveTargetAgent_AffinityRace(t *testing.T) { + ch := newRoutingChannel(config.TelegramVoiceConfig{ + AgentID: testVoiceAgent, + IntentKeywords: []string{"speaking"}, + }) + + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ch.resolveTargetAgent( + "race-chat", + false, + []MediaInfo{{Type: "voice"}}, + "speaking test", + ) + }() + } + wg.Wait() // pass/fail determined by -race flag, not assertions +} diff --git a/internal/channels/telegram/stt.go b/internal/channels/telegram/stt.go index ef6bd090e..de0f6fe45 100644 --- a/internal/channels/telegram/stt.go +++ b/internal/channels/telegram/stt.go @@ -11,46 +11,112 @@ import ( "net/http" "os" "path/filepath" + "strings" + "sync" "time" ) const ( - // defaultSTTTimeoutSeconds is the default timeout for STT proxy requests. + // defaultSTTTimeoutSeconds is the fallback timeout for STT proxy requests. defaultSTTTimeoutSeconds = 30 - // sttTranscribeEndpoint is the path appended to STTProxyURL. + // sttTranscribeEndpoint is appended to Voice.STTProxyURL when the URL does + // not already end with the path. sttTranscribeEndpoint = "/transcribe_audio" + + // sttMaxConcurrent caps the number of simultaneous STT HTTP calls per + // Channel instance. When this many calls are in flight, additional callers + // block inside acquire() until a slot is freed by release(). + // + // A buffered channel used as a counting semaphore is the idiomatic Go pattern; + // see https://go.dev/doc/effective_go#channels (Channels as Semaphores). + sttMaxConcurrent = 4 ) -// sttResponse is the expected JSON response from the STT proxy. +// sttResponse is the JSON payload returned by the STT proxy on success. type sttResponse struct { Transcript string `json:"transcript"` } -// transcribeAudio calls the configured STT proxy service with the given audio file and returns -// the transcribed text. It returns ("", nil) silently when: -// - STT is not configured (STTProxyURL is empty), or -// - filePath is empty (download failed earlier in the pipeline). +// ── Shared HTTP client ──────────────────────────────────────────────────────── +// +// A package-level client is shared across all Channel instances. +// Sharing one client lets the underlying Transport pool TCP connections to the +// same STT proxy host, avoiding a new dial on every audio request. +// sync.Once guarantees the client is initialised exactly once. + +var ( + sttHTTPClientOnce sync.Once + sttHTTPClient *http.Client +) + +func getSTTHTTPClient() *http.Client { + sttHTTPClientOnce.Do(func() { + sttHTTPClient = &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: 20, + MaxIdleConnsPerHost: 10, // STT traffic targets a single host + IdleConnTimeout: 90 * time.Second, + }, + } + }) + return sttHTTPClient +} + +// ── Per-channel semaphore ───────────────────────────────────────────────────── +// +// sttSem is a counting semaphore built from a buffered channel — the idiomatic +// Go approach (Effective Go, §Channels as Semaphores): +// +// acquire() writes into the channel; blocks when the buffer is full, i.e. when +// sttMaxConcurrent goroutines are already inside the critical section. +// release() reads from the channel, freeing one slot for the next waiter. +// +// sync.Once creates the channel exactly once per Channel instance. +// The zero value of sttSem is safe — call init() before first use. + +type sttSem struct { + once sync.Once + ch chan struct{} +} + +func (s *sttSem) init() { + s.once.Do(func() { s.ch = make(chan struct{}, sttMaxConcurrent) }) +} + +func (s *sttSem) acquire() { s.ch <- struct{}{} } +func (s *sttSem) release() { <-s.ch } + +// ── transcribeAudio ─────────────────────────────────────────────────────────── + +// transcribeAudio calls the configured STT proxy with the audio file at filePath +// and returns the transcribed text. +// +// Returns ("", nil) without a network call when: +// - Voice.STTProxyURL is empty (STT not configured), or +// - filePath is empty (audio download failed earlier in the pipeline). // -// Any HTTP or parse error is returned so the caller can log it and fall back gracefully. -// This matches the TS speaking-service /transcribe_audio contract used in managed deployments. +// Concurrency is bounded to sttMaxConcurrent simultaneous calls per Channel via a +// buffered-channel semaphore; the shared package-level http.Client pools TCP +// connections across all calls to the same STT host. func (c *Channel) transcribeAudio(ctx context.Context, filePath string) (string, error) { - if c.config.STTProxyURL == "" { - // STT not configured — skip silently. + if c.config.Voice.STTProxyURL == "" { return "", nil } if filePath == "" { - // File download failed earlier; nothing to transcribe. return "", nil } - // Resolve request timeout. - timeoutSec := c.config.STTTimeoutSeconds + // Acquire a concurrency slot; defer ensures release on every exit path. + c.sttSem.init() + c.sttSem.acquire() + defer c.sttSem.release() + + timeoutSec := c.config.Voice.STTTimeoutSeconds if timeoutSec <= 0 { timeoutSec = defaultSTTTimeoutSeconds } - // Open the downloaded audio file. f, err := os.Open(filePath) if err != nil { return "", fmt.Errorf("stt: open audio file %q: %w", filePath, err) @@ -58,58 +124,61 @@ func (c *Channel) transcribeAudio(ctx context.Context, filePath string) (string, defer f.Close() // Build multipart/form-data body. - // Fields: - // file — audio file bytes (required) - // tenant_id — optional tenant identifier forwarded to the proxy + // Fields required by the /transcribe_audio contract: + // audio — raw audio bytes + // tenant_id — forwarded to the proxy for auth/audit parity var body bytes.Buffer w := multipart.NewWriter(&body) - fw, err := w.CreateFormFile("file", filepath.Base(filePath)) + fw, err := w.CreateFormFile("audio", filepath.Base(filePath)) if err != nil { - return "", fmt.Errorf("stt: create form file field: %w", err) + return "", fmt.Errorf("stt: create multipart audio field: %w", err) } if _, err := io.Copy(fw, f); err != nil { - return "", fmt.Errorf("stt: write audio bytes to form: %w", err) + return "", fmt.Errorf("stt: write audio bytes: %w", err) } - if c.config.STTTenantID != "" { - if err := w.WriteField("tenant_id", c.config.STTTenantID); err != nil { - return "", fmt.Errorf("stt: write tenant_id field: %w", err) - } + tenantID := strings.TrimSpace(c.config.Voice.STTTenantID) + if tenantID == "" { + tenantID = "default" + } + if err := w.WriteField("tenant_id", tenantID); err != nil { + return "", fmt.Errorf("stt: write tenant_id field: %w", err) } - if err := w.Close(); err != nil { return "", fmt.Errorf("stt: close multipart writer: %w", err) } - // Build HTTP request with a deadline. reqCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second) defer cancel() - url := c.config.STTProxyURL + sttTranscribeEndpoint + baseURL := strings.TrimRight(strings.TrimSpace(c.config.Voice.STTProxyURL), "/") + url := baseURL + if !strings.HasSuffix(baseURL, sttTranscribeEndpoint) { + url = baseURL + sttTranscribeEndpoint + } + req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, url, &body) if err != nil { return "", fmt.Errorf("stt: build request to %q: %w", url, err) } req.Header.Set("Content-Type", w.FormDataContentType()) - if c.config.STTAPIKey != "" { - req.Header.Set("Authorization", "Bearer "+c.config.STTAPIKey) + if c.config.Voice.STTAPIKey != "" { + req.Header.Set("Authorization", "Bearer "+c.config.Voice.STTAPIKey) } slog.Debug("telegram: calling STT proxy", "url", url, "file", filepath.Base(filePath)) - client := &http.Client{} - resp, err := client.Do(req) + resp, err := getSTTHTTPClient().Do(req) if err != nil { return "", fmt.Errorf("stt: request to %q failed: %w", url, err) } defer resp.Body.Close() - respBody, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 1 MB cap + respBody, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 1 MB safety cap if err != nil { return "", fmt.Errorf("stt: read response body: %w", err) } - if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("stt: upstream returned %d: %s", resp.StatusCode, string(respBody)) } @@ -119,15 +188,10 @@ func (c *Channel) transcribeAudio(ctx context.Context, filePath string) (string, return "", fmt.Errorf("stt: parse response JSON: %w", err) } - slog.Debug("telegram: STT transcript received", - "length", len(result.Transcript), - "preview", func() string { - if len(result.Transcript) > 80 { - return result.Transcript[:80] + "..." - } - return result.Transcript - }(), - ) - + if result.Transcript == "" { + slog.Warn("telegram: STT transcript is empty", "url", url) + return "", nil + } + slog.Info("telegram: STT transcript received", "length", len(result.Transcript)) return result.Transcript, nil } diff --git a/internal/channels/telegram/stt_test.go b/internal/channels/telegram/stt_test.go index 70670fc12..31419e7c6 100644 --- a/internal/channels/telegram/stt_test.go +++ b/internal/channels/telegram/stt_test.go @@ -52,7 +52,9 @@ func TestTranscribeAudio_NoProxy(t *testing.T) { // no-op even when STT is configured. func TestTranscribeAudio_EmptyFilePath(t *testing.T) { c := newChannelWithSTT(config.TelegramConfig{ - STTProxyURL: "https://stt.example.com", + Voice: config.TelegramVoiceConfig{ + STTProxyURL: "https://stt.example.com", + }, }) transcript, err := c.transcribeAudio(context.Background(), "") if err != nil { @@ -72,7 +74,11 @@ func TestTranscribeAudio_MissingFile(t *testing.T) { })) defer srv.Close() - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) _, err := c.transcribeAudio(context.Background(), "/nonexistent/file.ogg") if err == nil { t.Fatal("expected an error for missing file, got nil") @@ -94,12 +100,12 @@ func TestTranscribeAudio_Success(t *testing.T) { if r.Method != http.MethodPost { t.Errorf("expected POST, got %s", r.Method) } - // Verify multipart body contains a "file" field. + // Verify multipart body contains an "audio" field. if err := r.ParseMultipartForm(1 << 20); err != nil { t.Errorf("parse multipart: %v", err) } - if _, _, err := r.FormFile("file"); err != nil { - t.Errorf("expected 'file' field in multipart form: %v", err) + if _, _, err := r.FormFile("audio"); err != nil { + t.Errorf("expected 'audio' field in multipart form: %v", err) } w.Header().Set("Content-Type", "application/json") @@ -107,7 +113,11 @@ func TestTranscribeAudio_Success(t *testing.T) { })) defer srv.Close() - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) transcript, err := c.transcribeAudio(context.Background(), audioFile) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -134,8 +144,10 @@ func TestTranscribeAudio_BearerToken(t *testing.T) { defer srv.Close() c := newChannelWithSTT(config.TelegramConfig{ - STTProxyURL: srv.URL, - STTAPIKey: wantKey, + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + STTAPIKey: wantKey, + }, }) if _, err := c.transcribeAudio(context.Background(), audioFile); err != nil { t.Fatalf("unexpected error: %v", err) @@ -160,7 +172,11 @@ func TestTranscribeAudio_NoAuthHeader(t *testing.T) { })) defer srv.Close() - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) if _, err := c.transcribeAudio(context.Background(), audioFile); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -185,8 +201,10 @@ func TestTranscribeAudio_TenantID(t *testing.T) { defer srv.Close() c := newChannelWithSTT(config.TelegramConfig{ - STTProxyURL: srv.URL, - STTTenantID: wantTenant, + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + STTTenantID: wantTenant, + }, }) if _, err := c.transcribeAudio(context.Background(), audioFile); err != nil { t.Fatalf("unexpected error: %v", err) @@ -196,27 +214,35 @@ func TestTranscribeAudio_TenantID(t *testing.T) { } } -// TestTranscribeAudio_NoTenantField verifies that when STTTenantID is empty, the -// multipart form does NOT include a "tenant_id" field. -func TestTranscribeAudio_NoTenantField(t *testing.T) { +// TestTranscribeAudio_DefaultTenantFallback verifies that when STTTenantID is +// empty, a default tenant_id value is still sent to satisfy the STT endpoint contract. +func TestTranscribeAudio_DefaultTenantFallback(t *testing.T) { audioFile := writeTempAudio(t, "fake-ogg-bytes") defer os.Remove(audioFile) + const wantDefaultTenant = "default" + var gotTenant string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(1 << 20); err == nil { - if tid := r.FormValue("tenant_id"); tid != "" { - t.Errorf("expected no tenant_id field, got %q", tid) - } + gotTenant = r.FormValue("tenant_id") } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(sttResponse{Transcript: "ok"}) })) defer srv.Close() - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) if _, err := c.transcribeAudio(context.Background(), audioFile); err != nil { t.Fatalf("unexpected error: %v", err) } + if gotTenant != wantDefaultTenant { + t.Errorf("expected tenant_id %q, got %q", wantDefaultTenant, gotTenant) + } } // TestTranscribeAudio_UpstreamError verifies that a non-200 response is surfaced @@ -230,7 +256,11 @@ func TestTranscribeAudio_UpstreamError(t *testing.T) { })) defer srv.Close() - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) _, err := c.transcribeAudio(context.Background(), audioFile) if err == nil { t.Fatal("expected error for non-200 response, got nil") @@ -252,7 +282,11 @@ func TestTranscribeAudio_InvalidJSON(t *testing.T) { })) defer srv.Close() - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) _, err := c.transcribeAudio(context.Background(), audioFile) if err == nil { t.Fatal("expected error for invalid JSON, got nil") @@ -271,7 +305,11 @@ func TestTranscribeAudio_EmptyTranscript(t *testing.T) { })) defer srv.Close() - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) transcript, err := c.transcribeAudio(context.Background(), audioFile) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -281,6 +319,77 @@ func TestTranscribeAudio_EmptyTranscript(t *testing.T) { } } +// TestTranscribeAudio_OldFileFieldMustNotBeUsed guards against regression to the old "file" +// multipart field name. The speaking-service /transcribe_audio contract requires "audio". +func TestTranscribeAudio_OldFileFieldMustNotBeUsed(t *testing.T) { + audioFile := writeTempAudio(t, "fake-ogg-bytes") + defer os.Remove(audioFile) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if err := r.ParseMultipartForm(1 << 20); err != nil { + t.Errorf("parse multipart: %v", err) + http.Error(w, "bad request", http.StatusBadRequest) + return + } + // The legacy "file" field must NOT be present. + if r.MultipartForm != nil && r.MultipartForm.File != nil { + if _, ok := r.MultipartForm.File["file"]; ok { + t.Error("deprecated 'file' field found in multipart form; must use 'audio'") + } + } + // The required "audio" field must be present. + if _, _, err := r.FormFile("audio"); err != nil { + t.Errorf("required 'audio' field missing from multipart form: %v", err) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(sttResponse{Transcript: "ok"}) + })) + defer srv.Close() + + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) + if _, err := c.transcribeAudio(context.Background(), audioFile); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +// TestTranscribeAudio_GoclawSTTTenantIDEnvOverride verifies that the GOCLAW_STT_TENANT_ID +// env var (set via applyEnvOverrides) is picked up as STTTenantID. This test simulates what +// applyEnvOverrides does: it pre-populates config.STTTenantID from the env var before the +// channel is created. The stt layer only reads from config — it never reads env directly. +func TestTranscribeAudio_GoclawSTTTenantIDEnvOverride(t *testing.T) { + audioFile := writeTempAudio(t, "fake-ogg-bytes") + defer os.Remove(audioFile) + + const wantTenant = "goclaw-stt-corp" + var gotTenant string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if err := r.ParseMultipartForm(1 << 20); err == nil { + gotTenant = r.FormValue("tenant_id") + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(sttResponse{Transcript: "ok"}) + })) + defer srv.Close() + + // applyEnvOverrides would have populated STTTenantID from GOCLAW_STT_TENANT_ID before here. + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + STTTenantID: wantTenant, // simulates applyEnvOverrides having set this from env + }, + }) + if _, err := c.transcribeAudio(context.Background(), audioFile); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotTenant != wantTenant { + t.Errorf("expected tenant_id %q, got %q", wantTenant, gotTenant) + } +} + // TestTranscribeAudio_ContextCancelled verifies that a cancelled context causes // the HTTP call to fail fast. func TestTranscribeAudio_ContextCancelled(t *testing.T) { @@ -296,7 +405,11 @@ func TestTranscribeAudio_ContextCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // cancel immediately - c := newChannelWithSTT(config.TelegramConfig{STTProxyURL: srv.URL}) + c := newChannelWithSTT(config.TelegramConfig{ + Voice: config.TelegramVoiceConfig{ + STTProxyURL: srv.URL, + }, + }) _, err := c.transcribeAudio(ctx, audioFile) if err == nil { t.Fatal("expected error for cancelled context, got nil") diff --git a/internal/channels/telegram/voiceguard/guard.go b/internal/channels/telegram/voiceguard/guard.go new file mode 100644 index 000000000..b45bf670e --- /dev/null +++ b/internal/channels/telegram/voiceguard/guard.go @@ -0,0 +1,141 @@ +// Package voiceguard provides the Telegram voice-agent audio guard. +// +// Responsibility: intercept replies from a configured voice agent on +// Telegram DM turns that carried an audio/voice message and replace any +// technical-error language with a user-friendly coaching fallback. +// +// Design constraints: +// - Zero dependency on the Telegram bot SDK, message bus, or scheduler. +// - Pure string→string transformation — safe to unit-test in isolation. +// - All deployment customisation is passed via [config.TelegramVoiceConfig]; +// the package itself holds no mutable state. +package voiceguard + +import ( + "html" + "regexp" + "strings" + + "github.com/nextlevelbuilder/goclaw/internal/config" +) + +// transcriptTagRe matches the first block, +// including multi-line content. +var transcriptTagRe = regexp.MustCompile(`(?s)(.*?)`) + +// defaultFallbackTranscript is the built-in coaching message when the agent +// reply contains error language AND the inbound message has a transcript. +// Use strings.ReplaceAll (not fmt.Sprintf) so that custom templates that +// omit %s do not produce "%!(EXTRA string=…)" garbage. +const defaultFallbackTranscript = "🎙️ Got your voice message! I heard: \"%s\"\n\n" + + "There was a brief hiccup on my end — please send your response again and I'll review it right away." + +// defaultFallbackNoTranscript is used when no transcript is available. +const defaultFallbackNoTranscript = "🎙️ Got your voice message!\n\n" + + "I had a little trouble processing it — could you send it again or type your response? I'll get back to you straight away." + +// defaultErrorMarkers is the built-in set of substrings (all lowercase) that +// indicate a technical error leaked into the agent reply. +// +// NOTE — AudioGuardErrorMarkers in TelegramVoiceConfig REPLACES (not extends) +// this list. When an operator sets custom markers, only those markers are +// checked; the defaults below are ignored. To augment the defaults, copy this +// list into your config and append your custom entries. +var defaultErrorMarkers = []string{ + "vấn đề kỹ thuật", + "vấn đề hệ thống", + "lỗi hệ thống", + "technical issue", + "system error", + "exit status", + "rate limit", + "api rate limit", + "tool error", +} + +// SanitizeReply intercepts replies from the configured voice agent on Telegram +// DMs and replaces any technical-error language with a user-friendly fallback. +// +// It returns the original reply unchanged when any of the following is true: +// - voiceAgentID is empty, or agentID ≠ voiceAgentID (wrong agent) +// - channel ≠ "telegram" +// - peerKind ≠ "direct" (group chat) +// - inbound contains neither nor (text-only turn) +// - reply does not contain recognised error language +// +// Parameters: +// - voiceAgentID: value of cfg.Channels.Telegram.Voice.AgentID +// - agentID: the agent that produced this reply +// - channel: channel transport name (e.g. "telegram") +// - peerKind: "direct" or "group" +// - inbound: original inbound message content (may contain XML-like tags) +// - reply: agent reply to inspect and possibly replace +// - voiceCfg: TelegramVoiceConfig from the channel config +func SanitizeReply( + voiceAgentID, agentID, channel, peerKind, inbound, reply string, + voiceCfg config.TelegramVoiceConfig, +) string { + if voiceAgentID == "" || agentID != voiceAgentID { + return reply + } + if channel != "telegram" || peerKind != "direct" { + return reply + } + if !strings.Contains(inbound, "") && !strings.Contains(inbound, "") { + return reply + } + if !containsErrorLanguage(reply, voiceCfg.AudioGuardErrorMarkers) { + return reply + } + + transcript := extractTranscript(inbound) + if transcript != "" { + tpl := voiceCfg.AudioGuardFallbackTranscript + if tpl == "" { + tpl = defaultFallbackTranscript + } + // strings.ReplaceAll: templates without %s pass through unchanged. + return strings.ReplaceAll(tpl, "%s", transcript) + } + + msg := voiceCfg.AudioGuardFallbackNoTranscript + if msg == "" { + msg = defaultFallbackNoTranscript + } + return msg +} + +// containsErrorLanguage reports whether s (lowercased) contains any marker. +// +// When customMarkers is non-empty it is used exclusively — the built-in +// defaultErrorMarkers list is NOT consulted. This is intentional: operators +// who set custom markers take full ownership of the detection set. See the +// AudioGuardErrorMarkers field comment in TelegramVoiceConfig for the rationale. +func containsErrorLanguage(s string, customMarkers []string) bool { + lower := strings.ToLower(strings.TrimSpace(s)) + if lower == "" { + return false + } + markers := customMarkers + if len(markers) == 0 { + markers = defaultErrorMarkers + } + for _, m := range markers { + if strings.Contains(lower, m) { + return true + } + } + return false +} + +// extractTranscript returns the content of the first +// block found in content, with HTML entities unescaped and whitespace collapsed. +// Returns "" when no block is present. +func extractTranscript(content string) string { + m := transcriptTagRe.FindStringSubmatch(content) + if len(m) < 2 { + return "" + } + t := strings.TrimSpace(html.UnescapeString(m[1])) + return strings.Join(strings.Fields(t), " ") +} diff --git a/internal/channels/telegram/voiceguard/guard_test.go b/internal/channels/telegram/voiceguard/guard_test.go new file mode 100644 index 000000000..acacbb17c --- /dev/null +++ b/internal/channels/telegram/voiceguard/guard_test.go @@ -0,0 +1,166 @@ +package voiceguard_test + +import ( + "strings" + "testing" + + "github.com/nextlevelbuilder/goclaw/internal/channels/telegram/voiceguard" + "github.com/nextlevelbuilder/goclaw/internal/config" +) + +// ── Helpers ────────────────────────────────────────────────────────────────── + +const testAgent = "my-voice-agent" + +func voiceCfg( + fallbackTranscript, + fallbackNoTranscript string, + markers []string, +) config.TelegramVoiceConfig { + return config.TelegramVoiceConfig{ + AgentID: testAgent, + AudioGuardFallbackTranscript: fallbackTranscript, + AudioGuardFallbackNoTranscript: fallbackNoTranscript, + AudioGuardErrorMarkers: markers, + } +} + +func sanitize(inbound, reply string, cfg config.TelegramVoiceConfig) string { + return voiceguard.SanitizeReply(testAgent, testAgent, "telegram", "direct", inbound, reply, cfg) +} + +// ── Pass-through: guard must not fire ──────────────────────────────────────── + +func TestSanitize_PassThrough_WrongAgent(t *testing.T) { + inbound := "" + reply := "system error" + got := voiceguard.SanitizeReply(testAgent, "other-agent", "telegram", "direct", inbound, reply, voiceCfg("", "", nil)) + if got != reply { + t.Errorf("wrong agent: expected passthrough, got %q", got) + } +} + +func TestSanitize_PassThrough_EmptyVoiceAgentID(t *testing.T) { + inbound := "" + reply := "exit status 1" + got := voiceguard.SanitizeReply("", testAgent, "telegram", "direct", inbound, reply, voiceCfg("", "", nil)) + if got != reply { + t.Errorf("empty voiceAgentID: expected passthrough, got %q", got) + } +} + +func TestSanitize_PassThrough_NonTelegram(t *testing.T) { + inbound := "" + reply := "rate limit exceeded" + got := voiceguard.SanitizeReply(testAgent, testAgent, "discord", "direct", inbound, reply, voiceCfg("", "", nil)) + if got != reply { + t.Errorf("non-telegram channel: expected passthrough, got %q", got) + } +} + +func TestSanitize_PassThrough_GroupChat(t *testing.T) { + inbound := "" + reply := "system error occurred" + got := voiceguard.SanitizeReply(testAgent, testAgent, "telegram", "group", inbound, reply, voiceCfg("", "", nil)) + if got != reply { + t.Errorf("group chat: expected passthrough, got %q", got) + } +} + +func TestSanitize_PassThrough_NoAudioTag(t *testing.T) { + inbound := "just a regular text message" + reply := "technical issue in processing" + got := sanitize(inbound, reply, voiceCfg("", "", nil)) + if got != reply { + t.Errorf("text-only inbound: expected passthrough, got %q", got) + } +} + +func TestSanitize_PassThrough_CleanReply(t *testing.T) { + inbound := "" + reply := "Great pronunciation! Keep going." + got := sanitize(inbound, reply, voiceCfg("", "", nil)) + if got != reply { + t.Errorf("clean reply: expected passthrough, got %q", got) + } +} + +// ── Guard fires: default fallbacks ─────────────────────────────────────────── + +func TestSanitize_DefaultFallback_WithTranscript(t *testing.T) { + inbound := `hello world` + reply := "system error occurred" + got := sanitize(inbound, reply, voiceCfg("", "", nil)) + if !strings.Contains(got, "hello world") { + t.Errorf("expected transcript in fallback, got %q", got) + } +} + +func TestSanitize_DefaultFallback_NoTranscript(t *testing.T) { + inbound := "" + reply := "exit status 1 — tool error" + got := sanitize(inbound, reply, voiceCfg("", "", nil)) + if got == reply { + t.Error("expected fallback, got original reply unchanged") + } + if got == "" { + t.Error("fallback must not be empty") + } +} + +// ── Guard fires: custom fallbacks ──────────────────────────────────────────── + +func TestSanitize_CustomFallback_WithPlaceholder(t *testing.T) { + inbound := `xin chào` + reply := "lỗi hệ thống nghiêm trọng" + customTpl := `Tôi nghe được: "%s". Vui lòng thử lại!` + got := sanitize(inbound, reply, voiceCfg(customTpl, "", nil)) + want := `Tôi nghe được: "xin chào". Vui lòng thử lại!` + if got != want { + t.Errorf("custom fallback:\n got %q\n want %q", got, want) + } +} + +func TestSanitize_CustomFallback_NoPlaceholder(t *testing.T) { + // Template without %s — strings.ReplaceAll must not produce garbage. + inbound := `xin chào` + reply := "system error" + customTpl := "Vui lòng gửi lại nhé!" + got := sanitize(inbound, reply, voiceCfg(customTpl, "", nil)) + if got != customTpl { + t.Errorf("no-placeholder template: expected %q verbatim, got %q", customTpl, got) + } +} + +// ── Custom error markers: REPLACES behaviour ────────────────────────────────── + +func TestSanitize_CustomMarkers_Trigger(t *testing.T) { + inbound := "" + reply := "deployment pipeline aborted" + got := sanitize(inbound, reply, voiceCfg("", "", []string{"deployment pipeline"})) + if got == reply { + t.Error("custom marker: expected fallback, got original reply") + } +} + +func TestSanitize_CustomMarkers_ReplacesDefaults(t *testing.T) { + // When custom markers are set, defaultErrorMarkers must NOT fire. + // "system error" is in the default list but not in the custom list below. + inbound := "" + reply := "system error" + got := sanitize(inbound, reply, voiceCfg("", "", []string{"only-this-marker"})) + if got != reply { + t.Errorf("custom markers should replace defaults: expected passthrough for %q, got %q", reply, got) + } +} + +// ── Audio tag variants ──────────────────────────────────────────────────────── + +func TestSanitize_AudioTag_AlsoTriggers(t *testing.T) { + inbound := "" + reply := "system error" + got := sanitize(inbound, reply, voiceCfg("", "", nil)) + if got == reply { + t.Error("media:audio tag: expected fallback, got original reply") + } +} diff --git a/internal/config/config_channels.go b/internal/config/config_channels.go index 225e21243..b16cdee69 100644 --- a/internal/config/config_channels.go +++ b/internal/config/config_channels.go @@ -10,30 +10,84 @@ type ChannelsConfig struct { Feishu FeishuConfig `json:"feishu"` } +// TelegramVoiceConfig groups all voice-specific settings for the Telegram channel +// under a single nested JSON key "voice". This provides a clean visual boundary +// between base channel settings (token, policies, media) and the voice pipeline. +type TelegramVoiceConfig struct { + // ── STT (Speech-to-Text) pipeline ───────────────────────────────────────── + // When STTProxyURL is set, audio/voice inbound messages are transcribed before + // being forwarded to the agent. + STTProxyURL string `json:"stt_proxy_url,omitempty"` // base URL of STT proxy (e.g. "https://stt.example.com") + STTAPIKey string `json:"stt_api_key,omitempty"` // Bearer token for the STT proxy + STTTenantID string `json:"stt_tenant_id,omitempty"` // forwarded to STT proxy; settable via GOCLAW_STT_TENANT_ID env var + STTTimeoutSeconds int `json:"stt_timeout_seconds,omitempty"` // per-request timeout (default 30s) + + // ── Audio-aware routing ─────────────────────────────────────────────────── + // When AgentID is set, voice/audio inbound messages are routed to this agent + // instead of the default channel agent. + AgentID string `json:"agent_id,omitempty"` // e.g. "speaking-agent"; settable via GOCLAW_VOICE_AGENT_ID env var + StartMessage string `json:"start_message,omitempty"` // content injected on /start; default "User sent /start." + + // ── Intent routing ──────────────────────────────────────────────────────── + // Inbound text is lowercased before matching; keywords are also lowercased at + // match time to tolerate mixed-case values from the DB. + // When non-empty, text messages matching any keyword are routed to AgentID + // and the DM affinity is set. + // Example: ["speaking", "pronunciation", "ielts part"] + IntentKeywords []string `json:"intent_keywords,omitempty"` + + // ── Session affinity management ─────────────────────────────────────────── + // AffinityClearKeywords: when a DM text matches any entry the affinity is + // cleared and the next message routes back to the default agent. + // Example: ["homework", "payment", "schedule"] + AffinityClearKeywords []string `json:"affinity_clear_keywords,omitempty"` + // AffinityTTLMinutes: 0 = built-in default (360 min = 6 h). + AffinityTTLMinutes int `json:"affinity_ttl_minutes,omitempty"` + + // ── DM context injection ────────────────────────────────────────────────── + // DMContextTemplate is injected as extra system prompt on Telegram DM turns + // handled by the voice agent. Supports {user_id} placeholder. + // Settable via GOCLAW_VOICE_DM_CONTEXT_TEMPLATE env var. + // + // Example: + // + // "Context:\n- tenant: my-school\n- user_id: {user_id}\nNEVER expose errors." + DMContextTemplate string `json:"dm_context_template,omitempty"` + + // ── Audio guard ─────────────────────────────────────────────────────────── + // Replaces technical-error agent replies with user-friendly coaching fallbacks. + // + // AudioGuardFallbackTranscript: sent when a block is present. + // Supports %s as a placeholder for the transcript text. + // AudioGuardFallbackNoTranscript: sent when no transcript is available. + // AudioGuardErrorMarkers: lowercase substrings that trigger the guard. + // + // IMPORTANT — AudioGuardErrorMarkers REPLACES (not extends) the built-in English+ + // Vietnamese marker list. To augment the defaults, copy the default list and append + // your custom entries. Leave empty to use the built-in defaults unchanged. + AudioGuardFallbackTranscript string `json:"audio_guard_fallback_transcript,omitempty"` + AudioGuardFallbackNoTranscript string `json:"audio_guard_fallback_no_transcript,omitempty"` + AudioGuardErrorMarkers []string `json:"audio_guard_error_markers,omitempty"` +} + type TelegramConfig struct { Enabled bool `json:"enabled"` Token string `json:"token"` Proxy string `json:"proxy,omitempty"` AllowFrom FlexibleStringSlice `json:"allow_from"` - DMPolicy string `json:"dm_policy,omitempty"` // "pairing" (default), "allowlist", "open", "disabled" - GroupPolicy string `json:"group_policy,omitempty"` // "open" (default), "allowlist", "disabled" - RequireMention *bool `json:"require_mention,omitempty"` // require @bot mention in groups (default true) - HistoryLimit int `json:"history_limit,omitempty"` // max pending group messages for context (default 50, 0=disabled) - StreamMode string `json:"stream_mode,omitempty"` // "off" (default), "partial" — streaming preview via message edits - ReactionLevel string `json:"reaction_level,omitempty"` // "off" (default), "minimal", "full" — status emoji reactions - MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // max media download size in bytes (default 20MB) - LinkPreview *bool `json:"link_preview,omitempty"` // enable URL previews in messages (default true) - - // Optional STT (Speech-to-Text) pipeline for voice/audio inbound messages. - // When stt_proxy_url is set, audio/voice messages are transcribed before being forwarded to the agent. - STTProxyURL string `json:"stt_proxy_url,omitempty"` // base URL of the STT proxy service (e.g. "https://stt.example.com") - STTAPIKey string `json:"stt_api_key,omitempty"` // Bearer token for the STT proxy - STTTenantID string `json:"stt_tenant_id,omitempty"` // optional tenant/org identifier forwarded to the STT proxy - STTTimeoutSeconds int `json:"stt_timeout_seconds,omitempty"` // per-request timeout for STT calls (default 30s) + DMPolicy string `json:"dm_policy,omitempty"` // "pairing" (default), "allowlist", "open", "disabled" + GroupPolicy string `json:"group_policy,omitempty"` // "open" (default), "allowlist", "disabled" + RequireMention *bool `json:"require_mention,omitempty"` // require @bot mention in groups (default true) + HistoryLimit int `json:"history_limit,omitempty"` // max pending group messages (default 50, 0=disabled) + StreamMode string `json:"stream_mode,omitempty"` // "off" (default), "partial" — streaming via message edits + ReactionLevel string `json:"reaction_level,omitempty"` // "off" (default), "minimal", "full" + MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // max media download size in bytes (default 20 MB) + LinkPreview *bool `json:"link_preview,omitempty"` // enable URL previews in messages (default true) - // Optional audio-aware routing: when set, voice/audio inbound messages are routed to this - // agent instead of the default channel agent. Requires the named agent to exist in the config. - VoiceAgentID string `json:"voice_agent_id,omitempty"` // agent ID to route voice inbound to (e.g. "speaking-agent") + // Voice groups all voice-pipeline settings (STT, routing, affinity, audio guard). + // DB rows using the older flat layout (voice_agent_id, stt_proxy_url, …) are still + // supported — factory.go promotes flat fields into Voice on load. + Voice TelegramVoiceConfig `json:"voice,omitempty"` } type DiscordConfig struct { diff --git a/internal/config/config_load.go b/internal/config/config_load.go index b2a2d0e70..801a3e1aa 100644 --- a/internal/config/config_load.go +++ b/internal/config/config_load.go @@ -111,6 +111,14 @@ func (c *Config) applyEnvOverrides() { envStr("GOCLAW_TELEGRAM_TOKEN", &c.Channels.Telegram.Token) envStr("GOCLAW_DISCORD_TOKEN", &c.Channels.Discord.Token) envStr("GOCLAW_ZALO_TOKEN", &c.Channels.Zalo.Token) + // Voice-pipeline runtime overrides — allow operators to configure via environment + // without editing config.json. Each var maps to the corresponding field in + // cfg.Channels.Telegram.Voice (TelegramVoiceConfig). + envStr("GOCLAW_VOICE_AGENT_ID", &c.Channels.Telegram.Voice.AgentID) + envStr("GOCLAW_STT_TENANT_ID", &c.Channels.Telegram.Voice.STTTenantID) + envStr("GOCLAW_VOICE_DM_CONTEXT_TEMPLATE", &c.Channels.Telegram.Voice.DMContextTemplate) + envStr("GOCLAW_AUDIO_GUARD_FALLBACK_TRANSCRIPT", &c.Channels.Telegram.Voice.AudioGuardFallbackTranscript) + envStr("GOCLAW_AUDIO_GUARD_FALLBACK_NO_TRANSCRIPT", &c.Channels.Telegram.Voice.AudioGuardFallbackNoTranscript) envStr("GOCLAW_LARK_APP_ID", &c.Channels.Feishu.AppID) envStr("GOCLAW_LARK_APP_SECRET", &c.Channels.Feishu.AppSecret) envStr("GOCLAW_LARK_ENCRYPT_KEY", &c.Channels.Feishu.EncryptKey) diff --git a/internal/config/config_load_voice_test.go b/internal/config/config_load_voice_test.go new file mode 100644 index 000000000..83af497e4 --- /dev/null +++ b/internal/config/config_load_voice_test.go @@ -0,0 +1,136 @@ +package config_test + +// config_load_voice_test.go — verifies that all 5 voice-agent env vars are +// wired through applyEnvOverrides into Config.Channels.Telegram. +// +// These tests protect against the managed-mode regression where +// GOCLAW_VOICE_AGENT_ID was missing, causing VoiceDMContextTemplate injection +// and AudioGuard sanitization to be silently skipped at runtime. +// See gateway_consumer.go lines 156 and 249 for the gates that depend on this. + +import ( + "os" + "testing" + + "github.com/nextlevelbuilder/goclaw/internal/config" +) + +// setEnv sets KEY=VALUE for the duration of the test and restores on cleanup. +func setEnv(t *testing.T, pairs ...string) { + t.Helper() + if len(pairs)%2 != 0 { + t.Fatal("setEnv: odd number of arguments") + } + for i := 0; i < len(pairs); i += 2 { + key, val := pairs[i], pairs[i+1] + prev, existed := os.LookupEnv(key) + if err := os.Setenv(key, val); err != nil { + t.Fatalf("setEnv Setenv(%s): %v", key, err) + } + t.Cleanup(func() { + if existed { + os.Setenv(key, prev) + } else { + os.Unsetenv(key) + } + }) + } +} + +// TestVoiceAgentIDEnvOverride is the critical regression test: GOCLAW_VOICE_AGENT_ID +// must populate cfg.Channels.Telegram.VoiceAgentID so that gateway_consumer.go's +// injection/sanitize gates fire correctly in managed mode. +// +// Before the fix, this env var did not exist in applyEnvOverrides, so +// VoiceAgentID was always "" and both voice features were dead code. +func TestVoiceAgentIDEnvOverride(t *testing.T) { + setEnv(t, "GOCLAW_VOICE_AGENT_ID", "speaking-agent") + cfg := config.Default() + cfg.ApplyEnvOverrides() + if got := cfg.Channels.Telegram.VoiceAgentID; got != "speaking-agent" { + t.Errorf("GOCLAW_VOICE_AGENT_ID: expected %q, got %q", "speaking-agent", got) + } +} + +// TestSTTTenantIDEnvOverride verifies the existing GOCLAW_STT_TENANT_ID override. +func TestSTTTenantIDEnvOverride(t *testing.T) { + setEnv(t, "GOCLAW_STT_TENANT_ID", "my-school") + cfg := config.Default() + cfg.ApplyEnvOverrides() + if got := cfg.Channels.Telegram.STTTenantID; got != "my-school" { + t.Errorf("GOCLAW_STT_TENANT_ID: expected %q, got %q", "my-school", got) + } +} + +// TestVoiceDMContextTemplateEnvOverride verifies the existing template override. +func TestVoiceDMContextTemplateEnvOverride(t *testing.T) { + tmpl := "Runtime context:\n- user_id: {user_id}" + setEnv(t, "GOCLAW_VOICE_DM_CONTEXT_TEMPLATE", tmpl) + cfg := config.Default() + cfg.ApplyEnvOverrides() + if got := cfg.Channels.Telegram.VoiceDMContextTemplate; got != tmpl { + t.Errorf("GOCLAW_VOICE_DM_CONTEXT_TEMPLATE: expected %q, got %q", tmpl, got) + } +} + +// TestAudioGuardFallbackEnvOverrides verifies the two audio-guard fallback overrides. +// In managed mode these provide Vietnamese deployment-specific messages without +// requiring a config.json file. +func TestAudioGuardFallbackEnvOverrides(t *testing.T) { + setEnv(t, + "GOCLAW_AUDIO_GUARD_FALLBACK_TRANSCRIPT", "Got it: %s — please resend", + "GOCLAW_AUDIO_GUARD_FALLBACK_NO_TRANSCRIPT", "Got your voice — please resend", + ) + cfg := config.Default() + cfg.ApplyEnvOverrides() + if got := cfg.Channels.Telegram.AudioGuardFallbackTranscript; got != "Got it: %s — please resend" { + t.Errorf("GOCLAW_AUDIO_GUARD_FALLBACK_TRANSCRIPT: got %q", got) + } + if got := cfg.Channels.Telegram.AudioGuardFallbackNoTranscript; got != "Got your voice — please resend" { + t.Errorf("GOCLAW_AUDIO_GUARD_FALLBACK_NO_TRANSCRIPT: got %q", got) + } +} + +// TestVoiceEnvOverridesDoNotClobberConfigFileValues verifies that an empty env +// var does NOT overwrite a value already set (e.g. from config.json). +// envStr only writes when the env var is non-empty. +func TestVoiceEnvOverridesDoNotClobberConfigFileValues(t *testing.T) { + cfg := config.Default() + cfg.Channels.Telegram.VoiceAgentID = "my-custom-agent" + os.Unsetenv("GOCLAW_VOICE_AGENT_ID") + cfg.ApplyEnvOverrides() + if got := cfg.Channels.Telegram.VoiceAgentID; got != "my-custom-agent" { + t.Errorf("empty env var should not overwrite config: expected %q, got %q", "my-custom-agent", got) + } +} + +// TestAllVoiceEnvVarsTogether verifies all 5 voice env vars applied simultaneously, +// matching the full set a managed-mode deployment (like EduOS) would set. +func TestAllVoiceEnvVarsTogether(t *testing.T) { + setEnv(t, + "GOCLAW_VOICE_AGENT_ID", "speaking-agent", + "GOCLAW_STT_TENANT_ID", "edu-tenant", + "GOCLAW_VOICE_DM_CONTEXT_TEMPLATE", "ctx: {user_id}", + "GOCLAW_AUDIO_GUARD_FALLBACK_TRANSCRIPT", "heard: %s", + "GOCLAW_AUDIO_GUARD_FALLBACK_NO_TRANSCRIPT", "resend please", + ) + cfg := config.Default() + cfg.ApplyEnvOverrides() + + tg := cfg.Channels.Telegram + if tg.VoiceAgentID != "speaking-agent" { + t.Errorf("VoiceAgentID: got %q", tg.VoiceAgentID) + } + if tg.STTTenantID != "edu-tenant" { + t.Errorf("STTTenantID: got %q", tg.STTTenantID) + } + if tg.VoiceDMContextTemplate != "ctx: {user_id}" { + t.Errorf("VoiceDMContextTemplate: got %q", tg.VoiceDMContextTemplate) + } + if tg.AudioGuardFallbackTranscript != "heard: %s" { + t.Errorf("AudioGuardFallbackTranscript: got %q", tg.AudioGuardFallbackTranscript) + } + if tg.AudioGuardFallbackNoTranscript != "resend please" { + t.Errorf("AudioGuardFallbackNoTranscript: got %q", tg.AudioGuardFallbackNoTranscript) + } +} diff --git a/internal/store/agent_store.go b/internal/store/agent_store.go index 1ec21683f..7d7d2678f 100644 --- a/internal/store/agent_store.go +++ b/internal/store/agent_store.go @@ -3,6 +3,7 @@ package store import ( "context" "encoding/json" + "strings" "github.com/google/uuid" "github.com/nextlevelbuilder/goclaw/internal/config" @@ -16,9 +17,9 @@ const ( // Agent status constants. const ( - AgentStatusActive = "active" - AgentStatusInactive = "inactive" - AgentStatusSummoning = "summoning" + AgentStatusActive = "active" + AgentStatusInactive = "inactive" + AgentStatusSummoning = "summoning" AgentStatusSummonFailed = "summon_failed" ) @@ -136,6 +137,40 @@ func (a *AgentData) ParseThinkingLevel() string { return cfg.ThinkingLevel } +// ParseModelFallbacks extracts model_fallbacks from other_config JSONB. +// Returns nil if not configured. +func (a *AgentData) ParseModelFallbacks() []string { + if len(a.OtherConfig) == 0 { + return nil + } + var cfg struct { + ModelFallbacks []string `json:"model_fallbacks"` + } + if json.Unmarshal(a.OtherConfig, &cfg) != nil { + return nil + } + if len(cfg.ModelFallbacks) == 0 { + return nil + } + out := make([]string, 0, len(cfg.ModelFallbacks)) + seen := make(map[string]struct{}, len(cfg.ModelFallbacks)) + for _, m := range cfg.ModelFallbacks { + model := strings.TrimSpace(m) + if model == "" { + continue + } + if _, ok := seen[model]; ok { + continue + } + seen[model] = struct{}{} + out = append(out, model) + } + if len(out) == 0 { + return nil + } + return out +} + // AgentShareData represents an agent share grant. type AgentShareData struct { BaseModel