Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bdc3124
fix: Zalo Bot Platform API compatibility — struct tags, response pars…
duhd-vnpay Mar 10, 2026
913f8fc
feat: download Zalo CDN photos to local temp files before agent proce…
duhd-vnpay Mar 10, 2026
a95eee9
fix: always convert credentials to []byte for bytea column in channel…
duhd-vnpay Mar 10, 2026
a88536a
feat: Party Mode — multi-persona collaborative discussion engine
duhd-vnpay Mar 10, 2026
2f26d58
fix: auto-clean orphaned tool messages from session history
duhd-vnpay Mar 10, 2026
9466043
fix: route subagent announce through correct agent in delegation context
duhd-vnpay Mar 10, 2026
3d28e2d
feat: team auth — sender_id tracking + fix thinking model prefill
duhd-vnpay Mar 12, 2026
7420834
merge: upstream origin/main — 70+ commits (contacts, activity, skills…
duhd-vnpay Mar 12, 2026
6d6a7c7
Merge branch 'nextlevelbuilder:main' into main
duhd-vnpay Mar 12, 2026
68a9e95
Merge remote-tracking branch 'origin/main'
duhd-vnpay Mar 12, 2026
925a5c1
Merge branch 'main' of https://github.com/duhd-vnpay/goclaw
duhd-vnpay Mar 12, 2026
f26d52a
fix: TypeScript strict mode errors in Party Mode UI
duhd-vnpay Mar 12, 2026
b332024
test: add data-testid attributes to Party Mode components for E2E tes…
duhd-vnpay Mar 13, 2026
d405895
fix: Party Mode — deterministic provider selection + snake_case WS pr…
duhd-vnpay Mar 13, 2026
ba3871e
Merge remote-tracking branch 'origin/main'
duhd-vnpay Mar 13, 2026
787bb01
fix: restore Party Mode session history when selecting old sessions
duhd-vnpay Mar 13, 2026
da3b5d3
fix: prevent nil pointer crash in OpenAI SSE tool call accumulation
duhd-vnpay Mar 13, 2026
65c249e
fix: detect and handle truncated LLM responses to prevent stuck agent…
duhd-vnpay Mar 13, 2026
578197e
Merge tag 'v1.19.0'
duhd-vnpay Mar 13, 2026
cf940a1
fix: detect and retry interrupted SSE streams from LLM providers
duhd-vnpay Mar 14, 2026
6e053eb
fix: StorageHandler uses workspace dir instead of hardcoded ~/.goclaw
duhd-vnpay Mar 14, 2026
59cbab4
feat(whatsapp): add media resolution for incoming messages
duhd-vnpay Mar 14, 2026
dfa9782
chore: rename migration 000018 to 000019 and bump schema version
duhd-vnpay Mar 14, 2026
875718e
feat(db): add projects + project_mcp_overrides tables (migration 000020)
duhd-vnpay Mar 14, 2026
08b6bd0
feat(store): add ProjectStore interface with Project and ProjectMCPOv…
duhd-vnpay Mar 14, 2026
15d0aa4
feat(store/pg): implement PGProjectStore with secret-key validation
duhd-vnpay Mar 14, 2026
f53f0c0
feat: add project-scoped MCP process isolation (Tasks 4+5+6)
duhd-vnpay Mar 14, 2026
167f464
feat: ResolverFunc accepts project context, Router caches per (agent,…
duhd-vnpay Mar 14, 2026
3ada4fb
feat: resolve project at message arrival, propagate through delegatio…
duhd-vnpay Mar 14, 2026
41c3dce
feat: add Google Chat channel with Pub/Sub pull, Cards V2, Drive upload
duhd-vnpay Mar 14, 2026
07f2c69
feat: project HTTP API (CRUD + MCP overrides)
duhd-vnpay Mar 14, 2026
e2b54e0
test: unit tests for MCP scope per project
duhd-vnpay Mar 14, 2026
43080a8
fix: use word-boundary anchors in secret key regex
duhd-vnpay Mar 14, 2026
6d24a39
feat(ui): add Projects management page with MCP overrides
duhd-vnpay Mar 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/google/uuid"

Expand All @@ -18,6 +19,7 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/channels"
"github.com/nextlevelbuilder/goclaw/internal/channels/discord"
"github.com/nextlevelbuilder/goclaw/internal/channels/feishu"
"github.com/nextlevelbuilder/goclaw/internal/channels/googlechat"
slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack"
"github.com/nextlevelbuilder/goclaw/internal/channels/telegram"
"github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp"
Expand Down Expand Up @@ -309,6 +311,18 @@ func runGateway() {
}()
}

// Sweep orphan traces left by previous crashes (running > 1h)
if pgStores.Tracing != nil {
go func() {
n, err := pgStores.Tracing.SweepOrphanTraces(context.Background(), time.Hour)
if err != nil {
slog.Warn("orphan trace sweep failed", "error", err)
} else if n > 0 {
slog.Info("orphan trace sweep complete", "swept", n)
}
}()
}

// Redis cache: compiled via build tags. Build with 'go build -tags redis' to enable.
redisClient := initRedisClient(cfg)
defer shutdownRedis(redisClient)
Expand Down Expand Up @@ -689,7 +703,7 @@ func runGateway() {
if mcpMgr != nil {
mcpToolLister = mcpMgr
}
agentsH, skillsH, tracesH, mcpH, customToolsH, channelInstancesH, providersH, delegationsH, builtinToolsH, pendingMessagesH := wireHTTP(pgStores, cfg.Gateway.Token, msgBus, toolsReg, providerRegistry, permPE.IsOwner, gatewayAddr, mcpToolLister)
agentsH, skillsH, tracesH, mcpH, customToolsH, channelInstancesH, providersH, delegationsH, builtinToolsH, pendingMessagesH, projectsH := wireHTTP(pgStores, cfg.Gateway.Token, msgBus, toolsReg, providerRegistry, permPE.IsOwner, gatewayAddr, mcpToolLister)
if agentsH != nil {
server.SetAgentsHandler(agentsH)
}
Expand All @@ -705,6 +719,9 @@ func runGateway() {
if mcpH != nil {
server.SetMCPHandler(mcpH)
}
if projectsH != nil {
server.SetProjectHandler(projectsH)
}
if customToolsH != nil {
server.SetCustomToolsHandler(customToolsH)
}
Expand Down Expand Up @@ -753,8 +770,10 @@ func runGateway() {
// Supports media from any agent workspace (each agent has its own workspace from DB).
server.SetFilesHandler(httpapi.NewFilesHandler(cfg.Gateway.Token))

// Storage file management — browse/delete files under ~/.goclaw/ (excluding skills dirs).
server.SetStorageHandler(httpapi.NewStorageHandler(config.ExpandHome("~/.goclaw"), cfg.Gateway.Token))
// Storage file management — browse/delete files under the resolved workspace directory.
// Uses GOCLAW_WORKSPACE (or default ~/.goclaw/workspace) so it works correctly
// in Docker deployments where volumes are mounted outside ~/.goclaw/.
server.SetStorageHandler(httpapi.NewStorageHandler(workspace, cfg.Gateway.Token))

// Media upload endpoint — accepts multipart file uploads, returns temp path + MIME type.
server.SetMediaUploadHandler(httpapi.NewMediaUploadHandler(cfg.Gateway.Token))
Expand Down Expand Up @@ -808,6 +827,7 @@ func runGateway() {
instanceLoader.RegisterFactory(channels.TypeZaloPersonal, zalopersonal.FactoryWithPendingStore(pgStores.PendingMessages))
instanceLoader.RegisterFactory(channels.TypeWhatsApp, whatsapp.Factory)
instanceLoader.RegisterFactory(channels.TypeSlack, slackchannel.FactoryWithPendingStore(pgStores.PendingMessages))
instanceLoader.RegisterFactory(channels.TypeGoogleChat, googlechat.FactoryWithPendingStore(pgStores.PendingMessages))
if err := instanceLoader.LoadAll(context.Background()); err != nil {
slog.Error("failed to load channel instances from DB", "error", err)
}
Expand All @@ -819,6 +839,11 @@ func runGateway() {
// Register channels/instances/links/teams RPC methods
wireChannelRPCMethods(server, pgStores, channelMgr, agentRouter, msgBus)

// Register party mode WS RPC methods
if pgStores.Party != nil {
methods.NewPartyMethods(pgStores.Party, pgStores.Agents, providerRegistry, msgBus).Register(server.Router())
}

// Wire channel event subscribers (cache invalidation, pairing, cascade disable)
wireChannelEventSubscribers(msgBus, server, pgStores, channelMgr, instanceLoader, pairingMethods, cfg)

Expand Down Expand Up @@ -1026,7 +1051,7 @@ func runGateway() {
channelMgr.SetContactCollector(contactCollector) // propagate to all channel handlers
}

go consumeInboundMessages(ctx, msgBus, agentRouter, cfg, sched, channelMgr, consumerTeamStore, quotaChecker, delegateMgr, pgStores.Sessions, pgStores.Agents, contactCollector)
go consumeInboundMessages(ctx, msgBus, agentRouter, cfg, sched, channelMgr, consumerTeamStore, quotaChecker, delegateMgr, pgStores.Sessions, pgStores.Agents, contactCollector, pgStores.Projects)

// Task recovery ticker: re-dispatches stale/pending team tasks on startup and periodically.
var taskTicker *tasks.TaskTicker
Expand Down
11 changes: 11 additions & 0 deletions cmd/gateway_channels_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/channels"
"github.com/nextlevelbuilder/goclaw/internal/channels/discord"
"github.com/nextlevelbuilder/goclaw/internal/channels/feishu"
"github.com/nextlevelbuilder/goclaw/internal/channels/googlechat"
slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack"
"github.com/nextlevelbuilder/goclaw/internal/channels/telegram"
"github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp"
Expand Down Expand Up @@ -97,6 +98,16 @@ func registerConfigChannels(cfg *config.Config, channelMgr *channels.Manager, ms
slog.Info("feishu/lark channel enabled (config)")
}
}

if cfg.Channels.GoogleChat.Enabled && cfg.Channels.GoogleChat.ServiceAccountFile != "" && instanceLoader == nil {
gc, err := googlechat.New(cfg.Channels.GoogleChat, msgBus, nil)
if err != nil {
slog.Error("failed to initialize google chat channel", "error", err)
} else {
channelMgr.RegisterChannel(channels.TypeGoogleChat, gc)
slog.Info("google chat channel enabled (config)")
}
}
}

// wireChannelRPCMethods registers WS RPC methods for channels, instances, agent links, and teams.
Expand Down
64 changes: 41 additions & 23 deletions cmd/gateway_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// and routes them through the scheduler/agent loop, then publishes the response back.
// Also handles subagent announcements: routes them through the parent agent's session
// (matching TS subagent-announce.ts pattern) so the agent can reformulate for the user.
func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents *agent.Router, cfg *config.Config, sched *scheduler.Scheduler, channelMgr *channels.Manager, teamStore store.TeamStore, quotaChecker *channels.QuotaChecker, delegateMgr *tools.DelegateManager, sessStore store.SessionStore, agentStore store.AgentStore, contactCollector *store.ContactCollector) {
func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents *agent.Router, cfg *config.Config, sched *scheduler.Scheduler, channelMgr *channels.Manager, teamStore store.TeamStore, quotaChecker *channels.QuotaChecker, delegateMgr *tools.DelegateManager, sessStore store.SessionStore, agentStore store.AgentStore, contactCollector *store.ContactCollector, projectStore store.ProjectStore) {
slog.Info("inbound message consumer started")

// Inbound message deduplication (matching TS src/infra/dedupe.ts + inbound-dedupe.ts).
Expand Down Expand Up @@ -64,7 +64,11 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
}
}

agentLoop, err := agents.Get(agentID)
// Resolve project for this chat (nil projectStore = backward compatible)
channelType := resolveChannelType(channelMgr, msg.Channel)
projectID, projectOverrides := resolveProjectOverrides(ctx, projectStore, channelType, msg.ChatID)

agentLoop, err := agents.GetForProject(agentID, projectID, projectOverrides)
if err != nil {
slog.Warn("inbound: agent not found", "agent", agentID, "channel", msg.Channel)
return
Expand Down Expand Up @@ -321,7 +325,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
Media: reqMedia,
ForwardMedia: fwdMedia,
Channel: msg.Channel,
ChannelType: resolveChannelType(channelMgr, msg.Channel),
ChannelType: channelType,
ChatID: msg.ChatID,
PeerKind: peerKind,
LocalKey: msg.Metadata["local_key"],
Expand All @@ -333,6 +337,8 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
ToolAllow: msg.ToolAllow,
ExtraSystemPrompt: extraPrompt,
SkillFilter: skillFilter,
ProjectID: projectID,
ProjectOverrides: projectOverrides,
}, scheduler.ScheduleOpts{
MaxConcurrent: maxConcurrent,
})
Expand Down Expand Up @@ -456,6 +462,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
origPeerKind := msg.Metadata["origin_peer_kind"]
origLocalKey := msg.Metadata["origin_local_key"]
origChannelType := resolveChannelType(channelMgr, origChannel)
saProjectID, saProjectOverrides := resolveProjectOverrides(ctx, projectStore, origChannelType, msg.ChatID)
parentAgent := msg.Metadata["parent_agent"]
if parentAgent == "" {
parentAgent = "default"
Expand Down Expand Up @@ -529,6 +536,8 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
Stream: false,
ParentTraceID: parentTraceID,
ParentRootSpanID: parentRootSpanID,
ProjectID: saProjectID,
ProjectOverrides: saProjectOverrides,
}
// Handle announce asynchronously with per-session serialization.
// The mutex ensures concurrent announces for the same session wait for
Expand Down Expand Up @@ -590,6 +599,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
origPeerKind := msg.Metadata["origin_peer_kind"]
origLocalKey := msg.Metadata["origin_local_key"]
origChannelType := resolveChannelType(channelMgr, origChannel)
dlgProjectID, dlgProjectOverrides := resolveProjectOverrides(ctx, projectStore, origChannelType, msg.ChatID)
parentAgent := msg.Metadata["parent_agent"]
if parentAgent == "" {
parentAgent = "default"
Expand Down Expand Up @@ -660,6 +670,8 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
Stream: false,
ParentTraceID: parentTraceID,
ParentRootSpanID: parentRootSpanID,
ProjectID: dlgProjectID,
ProjectOverrides: dlgProjectOverrides,
}

// Same per-session serialization as subagent announce above.
Expand Down Expand Up @@ -714,6 +726,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
origPeerKind := msg.Metadata["origin_peer_kind"]
origLocalKey := msg.Metadata["origin_local_key"]
origChannelType := resolveChannelType(channelMgr, origChannel)
hoProjectID, hoProjectOverrides := resolveProjectOverrides(ctx, projectStore, origChannelType, msg.ChatID)
targetAgent := msg.AgentID
if targetAgent == "" {
targetAgent = cfg.ResolveDefaultAgentID()
Expand Down Expand Up @@ -744,16 +757,18 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
outMeta := buildAnnounceOutMeta(origLocalKey)

outCh := sched.Schedule(ctx, scheduler.LaneDelegate, agent.RunRequest{
SessionKey: sessionKey,
Message: msg.Content,
Channel: origChannel,
ChannelType: origChannelType,
ChatID: msg.ChatID,
PeerKind: origPeerKind,
LocalKey: origLocalKey,
UserID: announceUserID,
RunID: fmt.Sprintf("handoff-%s", msg.Metadata["handoff_id"]),
Stream: false,
SessionKey: sessionKey,
Message: msg.Content,
Channel: origChannel,
ChannelType: origChannelType,
ChatID: msg.ChatID,
PeerKind: origPeerKind,
LocalKey: origLocalKey,
UserID: announceUserID,
RunID: fmt.Sprintf("handoff-%s", msg.Metadata["handoff_id"]),
Stream: false,
ProjectID: hoProjectID,
ProjectOverrides: hoProjectOverrides,
})

go func(origCh, chatID string, meta map[string]string) {
Expand Down Expand Up @@ -784,6 +799,7 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
origPeerKind := msg.Metadata["origin_peer_kind"]
origLocalKey := msg.Metadata["origin_local_key"]
origChannelType := resolveChannelType(channelMgr, origChannel)
tmProjectID, tmProjectOverrides := resolveProjectOverrides(ctx, projectStore, origChannelType, msg.ChatID)
targetAgent := msg.AgentID // team_message sets AgentID to the target agent key
if targetAgent == "" {
targetAgent = cfg.ResolveDefaultAgentID()
Expand Down Expand Up @@ -820,16 +836,18 @@ func consumeInboundMessages(ctx context.Context, msgBus *bus.MessageBus, agents
outMeta := buildAnnounceOutMeta(origLocalKey)

outCh := sched.Schedule(ctx, scheduler.LaneDelegate, agent.RunRequest{
SessionKey: sessionKey,
Message: msg.Content,
Channel: origChannel,
ChannelType: origChannelType,
ChatID: msg.ChatID,
PeerKind: origPeerKind,
LocalKey: origLocalKey,
UserID: announceUserID,
RunID: fmt.Sprintf("teammate-%s-%s", msg.Metadata["from_agent"], msg.Metadata["to_agent"]),
Stream: false,
SessionKey: sessionKey,
Message: msg.Content,
Channel: origChannel,
ChannelType: origChannelType,
ChatID: msg.ChatID,
PeerKind: origPeerKind,
LocalKey: origLocalKey,
UserID: announceUserID,
RunID: fmt.Sprintf("teammate-%s-%s", msg.Metadata["from_agent"], msg.Metadata["to_agent"]),
Stream: false,
ProjectID: tmProjectID,
ProjectOverrides: tmProjectOverrides,
})

go func(origCh, chatID, senderID string, meta, inMeta map[string]string) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway_consumer_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func makeSchedulerRunFunc(agents *agent.Router, cfg *config.Config) scheduler.Ru
}
}

loop, err := agents.Get(agentID)
loop, err := agents.GetForProject(agentID, req.ProjectID, req.ProjectOverrides)
if err != nil {
return nil, fmt.Errorf("agent %s not found: %w", agentID, err)
}
Expand Down
30 changes: 30 additions & 0 deletions cmd/gateway_consumer_project.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cmd

import (
"context"
"log/slog"

"github.com/nextlevelbuilder/goclaw/internal/store"
)

// resolveProjectOverrides looks up the project for a chat and returns its ID + MCP env overrides.
// Returns empty values if no project is configured (backward compatible).
func resolveProjectOverrides(ctx context.Context, projectStore store.ProjectStore, channelType, chatID string) (string, map[string]map[string]string) {
if projectStore == nil || channelType == "" || chatID == "" {
return "", nil
}
project, err := projectStore.GetProjectByChatID(ctx, channelType, chatID)
if err != nil {
slog.Warn("project.resolve_failed", "channelType", channelType, "chatID", chatID, "error", err)
return "", nil
}
if project == nil {
return "", nil
}
overrides, err := projectStore.GetMCPOverridesMap(ctx, project.ID)
if err != nil {
slog.Warn("project.overrides_failed", "project", project.Slug, "error", err)
return project.ID.String(), nil
}
return project.ID.String(), overrides
}
Loading