Skip to content
23 changes: 20 additions & 3 deletions cmd/gateway_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/agent"
"github.com/nextlevelbuilder/goclaw/internal/bus"
"github.com/nextlevelbuilder/goclaw/internal/channels"
"github.com/nextlevelbuilder/goclaw/internal/channels/telegram/voiceguard"
"github.com/nextlevelbuilder/goclaw/internal/config"
"github.com/nextlevelbuilder/goclaw/internal/scheduler"
"github.com/nextlevelbuilder/goclaw/internal/sessions"
Expand Down Expand Up @@ -151,6 +152,18 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
"- Keep responses concise and focused; long replies are disruptive in groups.\n" +
"- Address the group naturally. If the history shows a multi-person conversation, consider the full context before answering."
}
if agentID == cfg.Channels.Telegram.Voice.AgentID && cfg.Channels.Telegram.Voice.AgentID != "" && peerKind == string(sessions.PeerDirect) {
if tmpl := cfg.Channels.Telegram.Voice.DMContextTemplate; tmpl != "" {
// Substitute {user_id} β€” the only runtime value the gateway knows.
// All other deployment-specific values (e.g. tenant_id) are baked into the template.
voiceCtx := strings.ReplaceAll(tmpl, "{user_id}", userID)
if extraPrompt != "" {
extraPrompt += "\n\n" + voiceCtx
} else {
extraPrompt = voiceCtx
}
}
}

// Delegation announces carry media as ForwardMedia (not deleted, forwarded to output).
// User-uploaded media goes in Media (loaded as images for LLM, then deleted).
Expand Down Expand Up @@ -193,7 +206,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
}

// Handle result asynchronously to not block the flush callback.
go func(channel, chatID, session, rID string, meta map[string]string) {
go func(channel, chatID, session, rID, agentKey, peer string, originalContent string, meta map[string]string) {
outcome := <-outCh

// Clean up run tracking (in case HandleAgentEvent didn't fire for terminal events)
Expand Down Expand Up @@ -241,11 +254,14 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
return
}

replyContent := outcome.Result.Content
replyContent = voiceguard.SanitizeReply(cfg.Channels.Telegram.Voice.AgentID, agentKey, channel, peer, originalContent, replyContent, cfg.Channels.Telegram.Voice)

// Publish response back to the channel
outMsg := bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: outcome.Result.Content,
Content: replyContent,
Metadata: meta,
}

Expand All @@ -264,7 +280,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
}

msgBus.PublishOutbound(outMsg)
}(msg.Channel, msg.ChatID, sessionKey, runID, outMeta)
}(msg.Channel, msg.ChatID, sessionKey, runID, agentID, peerKind, msg.Content, outMeta)
}

// Inbound debounce: merge rapid messages from the same sender before processing.
Expand Down Expand Up @@ -668,6 +684,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
}
}


// resolveAgentRoute determines which agent should handle a message
// based on config bindings. Priority: peer β†’ channel β†’ default.
// Matching TS resolve-route.ts binding resolution.
Expand Down
Loading