Skip to content

Commit abe07b2

Browse files
duhd-vnpayclaude
andcommitted
feat: add Google Chat channel with Pub/Sub pull, Cards V2, Drive upload
Implements Google Chat as a new channel type (google_chat) in GoClaw: - Service Account JWT auth with auto-refresh (no Google client libs) - Pub/Sub REST pull loop for inbound messages with dedup cache - Markdown → Google Chat text format conversion (bold, italic, links) - Cards V2 for tables/structured content - Byte-aware chunking for Vietnamese/CJK (3900-byte safety limit) - Drive upload for long-form responses with retention cleanup - Thread routing for group conversations - Placeholder edit pattern (Thinking... → final response) - Exponential backoff retry on 429/5xx - 28 unit tests, all passing Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 28fab95 commit abe07b2

18 files changed

Lines changed: 2513 additions & 1 deletion

cmd/gateway.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/nextlevelbuilder/goclaw/internal/channels"
1919
"github.com/nextlevelbuilder/goclaw/internal/channels/discord"
2020
"github.com/nextlevelbuilder/goclaw/internal/channels/feishu"
21+
"github.com/nextlevelbuilder/goclaw/internal/channels/googlechat"
2122
slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack"
2223
"github.com/nextlevelbuilder/goclaw/internal/channels/telegram"
2324
"github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp"
@@ -689,7 +690,7 @@ func runGateway() {
689690
if mcpMgr != nil {
690691
mcpToolLister = mcpMgr
691692
}
692-
agentsH, skillsH, tracesH, mcpH, customToolsH, channelInstancesH, providersH, delegationsH, builtinToolsH, pendingMessagesH := wireHTTP(pgStores, cfg.Gateway.Token, msgBus, toolsReg, providerRegistry, permPE.IsOwner, gatewayAddr, mcpToolLister)
693+
agentsH, skillsH, tracesH, mcpH, customToolsH, channelInstancesH, providersH, delegationsH, builtinToolsH, pendingMessagesH, projectsH := wireHTTP(pgStores, cfg.Gateway.Token, msgBus, toolsReg, providerRegistry, permPE.IsOwner, gatewayAddr, mcpToolLister)
693694
if agentsH != nil {
694695
server.SetAgentsHandler(agentsH)
695696
}
@@ -705,6 +706,9 @@ func runGateway() {
705706
if mcpH != nil {
706707
server.SetMCPHandler(mcpH)
707708
}
709+
if projectsH != nil {
710+
server.SetProjectHandler(projectsH)
711+
}
708712
if customToolsH != nil {
709713
server.SetCustomToolsHandler(customToolsH)
710714
}
@@ -810,6 +814,7 @@ func runGateway() {
810814
instanceLoader.RegisterFactory(channels.TypeZaloPersonal, zalopersonal.FactoryWithPendingStore(pgStores.PendingMessages))
811815
instanceLoader.RegisterFactory(channels.TypeWhatsApp, whatsapp.Factory)
812816
instanceLoader.RegisterFactory(channels.TypeSlack, slackchannel.FactoryWithPendingStore(pgStores.PendingMessages))
817+
instanceLoader.RegisterFactory(channels.TypeGoogleChat, googlechat.FactoryWithPendingStore(pgStores.PendingMessages))
813818
if err := instanceLoader.LoadAll(context.Background()); err != nil {
814819
slog.Error("failed to load channel instances from DB", "error", err)
815820
}

cmd/gateway_channels_setup.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/nextlevelbuilder/goclaw/internal/channels"
1414
"github.com/nextlevelbuilder/goclaw/internal/channels/discord"
1515
"github.com/nextlevelbuilder/goclaw/internal/channels/feishu"
16+
"github.com/nextlevelbuilder/goclaw/internal/channels/googlechat"
1617
slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack"
1718
"github.com/nextlevelbuilder/goclaw/internal/channels/telegram"
1819
"github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp"
@@ -97,6 +98,16 @@ func registerConfigChannels(cfg *config.Config, channelMgr *channels.Manager, ms
9798
slog.Info("feishu/lark channel enabled (config)")
9899
}
99100
}
101+
102+
if cfg.Channels.GoogleChat.Enabled && cfg.Channels.GoogleChat.ServiceAccountFile != "" && instanceLoader == nil {
103+
gc, err := googlechat.New(cfg.Channels.GoogleChat, msgBus, nil)
104+
if err != nil {
105+
slog.Error("failed to initialize google chat channel", "error", err)
106+
} else {
107+
channelMgr.RegisterChannel(channels.TypeGoogleChat, gc)
108+
slog.Info("google chat channel enabled (config)")
109+
}
110+
}
100111
}
101112

102113
// wireChannelRPCMethods registers WS RPC methods for channels, instances, agent links, and teams.

internal/channels/channel.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ const (
5858
TypeWhatsApp = "whatsapp"
5959
TypeZaloOA = "zalo_oa"
6060
TypeZaloPersonal = "zalo_personal"
61+
TypeGoogleChat = "google_chat"
6162
)
6263

6364
// Channel defines the interface that all channel implementations must satisfy.
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package googlechat
2+
3+
import (
4+
"context"
5+
"crypto"
6+
"crypto/rand"
7+
"crypto/rsa"
8+
"crypto/sha256"
9+
"crypto/x509"
10+
"encoding/base64"
11+
"encoding/json"
12+
"encoding/pem"
13+
"fmt"
14+
"io"
15+
"net/http"
16+
"net/url"
17+
"os"
18+
"strings"
19+
"sync"
20+
"time"
21+
)
22+
23+
type ServiceAccountAuth struct {
24+
email string
25+
privateKey *rsa.PrivateKey
26+
scopes []string
27+
token string
28+
expiresAt time.Time
29+
mu sync.Mutex
30+
tokenEndpoint string
31+
httpClient *http.Client
32+
}
33+
34+
type serviceAccountFile struct {
35+
Type string `json:"type"`
36+
ClientEmail string `json:"client_email"`
37+
PrivateKey string `json:"private_key"`
38+
TokenURI string `json:"token_uri"`
39+
}
40+
41+
func NewServiceAccountAuth(saFilePath string, scopes []string) (*ServiceAccountAuth, error) {
42+
data, err := os.ReadFile(saFilePath)
43+
if err != nil {
44+
return nil, fmt.Errorf("read service account file: %w", err)
45+
}
46+
47+
var sa serviceAccountFile
48+
if err := json.Unmarshal(data, &sa); err != nil {
49+
return nil, fmt.Errorf("parse service account file: %w", err)
50+
}
51+
if sa.ClientEmail == "" {
52+
return nil, fmt.Errorf("service account file missing client_email")
53+
}
54+
if sa.PrivateKey == "" {
55+
return nil, fmt.Errorf("service account file missing private_key")
56+
}
57+
58+
block, _ := pem.Decode([]byte(sa.PrivateKey))
59+
if block == nil {
60+
return nil, fmt.Errorf("failed to decode PEM block from private_key")
61+
}
62+
63+
key, err := x509.ParsePKCS8PrivateKey(block.Bytes)
64+
if err != nil {
65+
rsaKey, err2 := x509.ParsePKCS1PrivateKey(block.Bytes)
66+
if err2 != nil {
67+
return nil, fmt.Errorf("parse private key: %w (pkcs1: %w)", err, err2)
68+
}
69+
key = rsaKey
70+
}
71+
72+
rsaKey, ok := key.(*rsa.PrivateKey)
73+
if !ok {
74+
return nil, fmt.Errorf("private key is not RSA")
75+
}
76+
77+
ep := sa.TokenURI
78+
if ep == "" {
79+
ep = tokenEndpoint
80+
}
81+
82+
return &ServiceAccountAuth{
83+
email: sa.ClientEmail,
84+
privateKey: rsaKey,
85+
scopes: scopes,
86+
tokenEndpoint: ep,
87+
httpClient: &http.Client{Timeout: 10 * time.Second},
88+
}, nil
89+
}
90+
91+
func (a *ServiceAccountAuth) Token(ctx context.Context) (string, error) {
92+
a.mu.Lock()
93+
defer a.mu.Unlock()
94+
95+
if a.token != "" && time.Now().Add(60*time.Second).Before(a.expiresAt) {
96+
return a.token, nil
97+
}
98+
99+
now := time.Now()
100+
claims := map[string]any{
101+
"iss": a.email,
102+
"scope": strings.Join(a.scopes, " "),
103+
"aud": tokenEndpoint,
104+
"iat": now.Unix(),
105+
"exp": now.Add(time.Hour).Unix(),
106+
}
107+
108+
signedJWT, err := signJWT(a.privateKey, claims)
109+
if err != nil {
110+
return "", fmt.Errorf("sign JWT: %w", err)
111+
}
112+
113+
form := url.Values{
114+
"grant_type": {"urn:ietf:params:oauth:grant-type:jwt-bearer"},
115+
"assertion": {signedJWT},
116+
}
117+
118+
req, err := http.NewRequestWithContext(ctx, "POST", a.tokenEndpoint, strings.NewReader(form.Encode()))
119+
if err != nil {
120+
return "", err
121+
}
122+
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
123+
124+
resp, err := a.httpClient.Do(req)
125+
if err != nil {
126+
return "", fmt.Errorf("token exchange request: %w", err)
127+
}
128+
defer resp.Body.Close()
129+
130+
body, _ := io.ReadAll(resp.Body)
131+
if resp.StatusCode != http.StatusOK {
132+
return "", fmt.Errorf("token exchange failed (%d): %s", resp.StatusCode, string(body))
133+
}
134+
135+
var tokenResp struct {
136+
AccessToken string `json:"access_token"`
137+
ExpiresIn int `json:"expires_in"`
138+
TokenType string `json:"token_type"`
139+
}
140+
if err := json.Unmarshal(body, &tokenResp); err != nil {
141+
return "", fmt.Errorf("parse token response: %w", err)
142+
}
143+
144+
a.token = tokenResp.AccessToken
145+
a.expiresAt = now.Add(time.Duration(tokenResp.ExpiresIn) * time.Second)
146+
147+
return a.token, nil
148+
}
149+
150+
func signJWT(key *rsa.PrivateKey, claims map[string]any) (string, error) {
151+
header := base64URLEncode([]byte(`{"alg":"RS256","typ":"JWT"}`))
152+
payload, err := json.Marshal(claims)
153+
if err != nil {
154+
return "", err
155+
}
156+
payloadEnc := base64URLEncode(payload)
157+
signingInput := header + "." + payloadEnc
158+
159+
hash := sha256.Sum256([]byte(signingInput))
160+
sig, err := rsa.SignPKCS1v15(rand.Reader, key, crypto.SHA256, hash[:])
161+
if err != nil {
162+
return "", err
163+
}
164+
165+
return signingInput + "." + base64URLEncode(sig), nil
166+
}
167+
168+
func base64URLEncode(data []byte) string {
169+
return base64.RawURLEncoding.EncodeToString(data)
170+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package googlechat
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"crypto/rsa"
7+
"crypto/x509"
8+
"encoding/json"
9+
"encoding/pem"
10+
"net/http"
11+
"net/http/httptest"
12+
"os"
13+
"path/filepath"
14+
"testing"
15+
"time"
16+
)
17+
18+
func testServiceAccountJSON(t *testing.T, dir string) (string, *rsa.PrivateKey) {
19+
t.Helper()
20+
key, err := rsa.GenerateKey(rand.Reader, 2048)
21+
if err != nil {
22+
t.Fatal(err)
23+
}
24+
pkcs8, err := x509.MarshalPKCS8PrivateKey(key)
25+
if err != nil {
26+
t.Fatal(err)
27+
}
28+
pemBlock := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8})
29+
30+
sa := map[string]string{
31+
"type": "service_account",
32+
"client_email": "[email protected]",
33+
"private_key": string(pemBlock),
34+
"token_uri": "https://oauth2.googleapis.com/token",
35+
}
36+
data, _ := json.Marshal(sa)
37+
path := filepath.Join(dir, "sa.json")
38+
if err := os.WriteFile(path, data, 0600); err != nil {
39+
t.Fatal(err)
40+
}
41+
return path, key
42+
}
43+
44+
func TestNewServiceAccountAuth_ValidFile(t *testing.T) {
45+
dir := t.TempDir()
46+
path, _ := testServiceAccountJSON(t, dir)
47+
auth, err := NewServiceAccountAuth(path, []string{scopeChat})
48+
if err != nil {
49+
t.Fatalf("unexpected error: %v", err)
50+
}
51+
if auth.email != "[email protected]" {
52+
t.Errorf("email = %q, want [email protected]", auth.email)
53+
}
54+
}
55+
56+
func TestNewServiceAccountAuth_InvalidFile(t *testing.T) {
57+
dir := t.TempDir()
58+
path := filepath.Join(dir, "bad.json")
59+
os.WriteFile(path, []byte("{bad json"), 0600)
60+
_, err := NewServiceAccountAuth(path, []string{scopeChat})
61+
if err == nil {
62+
t.Fatal("expected error for invalid JSON")
63+
}
64+
}
65+
66+
func TestNewServiceAccountAuth_MissingFile(t *testing.T) {
67+
_, err := NewServiceAccountAuth("/nonexistent/sa.json", []string{scopeChat})
68+
if err == nil {
69+
t.Fatal("expected error for missing file")
70+
}
71+
}
72+
73+
func TestServiceAccountAuth_Token_CachesWithinTTL(t *testing.T) {
74+
dir := t.TempDir()
75+
path, _ := testServiceAccountJSON(t, dir)
76+
callCount := 0
77+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
78+
callCount++
79+
json.NewEncoder(w).Encode(map[string]any{
80+
"access_token": "tok-123",
81+
"expires_in": 3600,
82+
"token_type": "Bearer",
83+
})
84+
}))
85+
defer ts.Close()
86+
87+
auth, err := NewServiceAccountAuth(path, []string{scopeChat})
88+
if err != nil {
89+
t.Fatal(err)
90+
}
91+
auth.tokenEndpoint = ts.URL
92+
93+
ctx := context.Background()
94+
tok1, _ := auth.Token(ctx)
95+
tok2, _ := auth.Token(ctx)
96+
if tok1 != tok2 {
97+
t.Errorf("tokens differ")
98+
}
99+
if callCount != 1 {
100+
t.Errorf("callCount = %d, want 1", callCount)
101+
}
102+
}
103+
104+
func TestServiceAccountAuth_Token_RefreshesExpired(t *testing.T) {
105+
dir := t.TempDir()
106+
path, _ := testServiceAccountJSON(t, dir)
107+
callCount := 0
108+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
109+
callCount++
110+
json.NewEncoder(w).Encode(map[string]any{
111+
"access_token": "tok",
112+
"expires_in": 1,
113+
"token_type": "Bearer",
114+
})
115+
}))
116+
defer ts.Close()
117+
118+
auth, err := NewServiceAccountAuth(path, []string{scopeChat})
119+
if err != nil {
120+
t.Fatal(err)
121+
}
122+
auth.tokenEndpoint = ts.URL
123+
auth.Token(context.Background())
124+
auth.expiresAt = time.Now().Add(-1 * time.Minute)
125+
auth.Token(context.Background())
126+
if callCount != 2 {
127+
t.Errorf("callCount = %d, want 2", callCount)
128+
}
129+
}
130+
131+
func TestServiceAccountAuth_Token_RefreshFailure(t *testing.T) {
132+
dir := t.TempDir()
133+
path, _ := testServiceAccountJSON(t, dir)
134+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
135+
w.WriteHeader(500)
136+
}))
137+
defer ts.Close()
138+
139+
auth, _ := NewServiceAccountAuth(path, []string{scopeChat})
140+
auth.tokenEndpoint = ts.URL
141+
_, err := auth.Token(context.Background())
142+
if err == nil {
143+
t.Fatal("expected error on 500")
144+
}
145+
}

0 commit comments

Comments
 (0)