Skip to content

Commit b8ac13a

Browse files
committed
feat: setup subrouter for pub/sub trigger
1 parent 9a6efea commit b8ac13a

File tree

8 files changed

+728
-1
lines changed

8 files changed

+728
-1
lines changed

cmd/launcher/full/full.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ import (
2222
"google.golang.org/adk/cmd/launcher/web"
2323
"google.golang.org/adk/cmd/launcher/web/a2a"
2424
"google.golang.org/adk/cmd/launcher/web/api"
25+
"google.golang.org/adk/cmd/launcher/web/triggers/pubsub"
2526
"google.golang.org/adk/cmd/launcher/web/webui"
2627
)
2728

2829
// NewLauncher returnes the most versatile universal launcher with all options built-in.
2930
func NewLauncher() launcher.Launcher {
30-
return universal.NewLauncher(console.NewLauncher(), web.NewLauncher(webui.NewLauncher(), a2a.NewLauncher(), api.NewLauncher()))
31+
return universal.NewLauncher(console.NewLauncher(), web.NewLauncher(webui.NewLauncher(), a2a.NewLauncher(), api.NewLauncher(), pubsub.NewLauncher()))
3132
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package pubsub provides a sublauncher that adds PubSub trigger capabilities to ADK web server.
16+
package pubsub
17+
18+
import (
19+
"flag"
20+
"fmt"
21+
"net/http"
22+
"strings"
23+
"time"
24+
25+
"github.com/gorilla/mux"
26+
27+
"google.golang.org/adk/cmd/launcher"
28+
"google.golang.org/adk/cmd/launcher/web"
29+
"google.golang.org/adk/internal/cli/util"
30+
"google.golang.org/adk/server/adkrest/controllers/triggers"
31+
)
32+
33+
type pubsubConfig struct {
34+
pathPrefix string
35+
triggerMaxRetries int
36+
triggerBaseDelay time.Duration
37+
triggerMaxDelay time.Duration
38+
triggerMaxRuns int
39+
}
40+
41+
type pubsubLauncher struct {
42+
flags *flag.FlagSet
43+
config *pubsubConfig
44+
}
45+
46+
// NewLauncher creates a new pubsub launcher. It extends Web launcher.
47+
func NewLauncher() web.Sublauncher {
48+
config := &pubsubConfig{}
49+
50+
fs := flag.NewFlagSet("pubsub", flag.ContinueOnError)
51+
fs.StringVar(&config.pathPrefix, "path_prefix", "/api", "Path prefix for the PubSub trigger endpoint. Default is '/api'.")
52+
fs.IntVar(&config.triggerMaxRetries, "trigger_max_retries", 3, "Maximum retries for HTTP 429 errors from triggers")
53+
fs.DurationVar(&config.triggerBaseDelay, "trigger_base_delay", 1*time.Second, "Base delay for trigger retry exponential backoff")
54+
fs.DurationVar(&config.triggerMaxDelay, "trigger_max_delay", 10*time.Second, "Maximum delay for trigger retry exponential backoff")
55+
fs.IntVar(&config.triggerMaxRuns, "trigger_max_concurrent_runs", 100, "Maximum concurrent trigger runs")
56+
57+
return &pubsubLauncher{
58+
config: config,
59+
flags: fs,
60+
}
61+
}
62+
63+
// Keyword implements web.Sublauncher. Returns the command-line keyword for pubsub launcher.
64+
func (p *pubsubLauncher) Keyword() string {
65+
return "pubsub"
66+
}
67+
68+
// Parse parses the command-line arguments for the pubsub launcher.
69+
func (p *pubsubLauncher) Parse(args []string) ([]string, error) {
70+
err := p.flags.Parse(args)
71+
if err != nil || !p.flags.Parsed() {
72+
return nil, fmt.Errorf("failed to parse pubsub flags: %v", err)
73+
}
74+
if p.config.triggerMaxRetries < 0 {
75+
return nil, fmt.Errorf("trigger_max_retries must be >= 0")
76+
}
77+
if p.config.triggerBaseDelay < 0 {
78+
return nil, fmt.Errorf("trigger_base_delay must be >= 0")
79+
}
80+
if p.config.triggerMaxDelay < 0 {
81+
return nil, fmt.Errorf("trigger_max_delay must be >= 0")
82+
}
83+
if p.config.triggerMaxRuns < 0 {
84+
return nil, fmt.Errorf("trigger_max_concurrent_runs must be >= 0")
85+
}
86+
87+
prefix := p.config.pathPrefix
88+
if !strings.HasPrefix(prefix, "/") {
89+
prefix = "/" + prefix
90+
}
91+
p.config.pathPrefix = strings.TrimSuffix(prefix, "/")
92+
93+
return p.flags.Args(), nil
94+
}
95+
96+
// CommandLineSyntax returns the command-line syntax for the pubsub launcher.
97+
func (p *pubsubLauncher) CommandLineSyntax() string {
98+
return util.FormatFlagUsage(p.flags)
99+
}
100+
101+
// SimpleDescription implements web.Sublauncher.
102+
func (p *pubsubLauncher) SimpleDescription() string {
103+
return "starts ADK PubSub trigger endpoint server"
104+
}
105+
106+
// SetupSubrouters adds the PubSub trigger endpoint to the parent router.
107+
func (p *pubsubLauncher) SetupSubrouters(router *mux.Router, config *launcher.Config) error {
108+
triggerConfig := triggers.TriggerConfig{
109+
MaxRetries: p.config.triggerMaxRetries,
110+
BaseDelay: p.config.triggerBaseDelay,
111+
MaxDelay: p.config.triggerMaxDelay,
112+
MaxConcurrentRuns: p.config.triggerMaxRuns,
113+
}
114+
115+
controller := triggers.NewPubSubController(
116+
config.SessionService,
117+
config.AgentLoader,
118+
config.MemoryService,
119+
config.ArtifactService,
120+
config.PluginConfig,
121+
triggerConfig,
122+
)
123+
124+
subrouter := router
125+
if p.config.pathPrefix != "" && p.config.pathPrefix != "/" {
126+
subrouter = router.PathPrefix(p.config.pathPrefix).Subrouter()
127+
}
128+
129+
subrouter.HandleFunc("/apps/{app_name}/trigger/pubsub", controller.PubSubTriggerHandler).Methods(http.MethodPost)
130+
return nil
131+
}
132+
133+
// UserMessage implements web.Sublauncher.
134+
func (p *pubsubLauncher) UserMessage(webURL string, printer func(v ...any)) {
135+
printer(fmt.Sprintf(" pubsub: PubSub trigger endpoint is available at %s%s/apps/{app_name}/trigger/pubsub", webURL, p.config.pathPrefix))
136+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package pubsub
16+
17+
import (
18+
"net/http"
19+
"net/http/httptest"
20+
"testing"
21+
22+
"github.com/gorilla/mux"
23+
24+
"google.golang.org/adk/cmd/launcher"
25+
)
26+
27+
func TestParse(t *testing.T) {
28+
tests := []struct {
29+
name string
30+
args []string
31+
wantPrefix string
32+
wantRetry int
33+
wantErr bool
34+
}{
35+
{
36+
name: "default values",
37+
args: []string{},
38+
wantPrefix: "/api",
39+
wantRetry: 3,
40+
wantErr: false,
41+
},
42+
{
43+
name: "custom prefix and retries",
44+
args: []string{"-path_prefix=/custom", "-trigger_max_retries=5"},
45+
wantPrefix: "/custom",
46+
wantRetry: 5,
47+
wantErr: false,
48+
},
49+
{
50+
name: "invalid retry count",
51+
args: []string{"-trigger_max_retries=-1"},
52+
wantPrefix: "/api",
53+
wantRetry: 3,
54+
wantErr: true,
55+
},
56+
}
57+
58+
for _, tt := range tests {
59+
t.Run(tt.name, func(t *testing.T) {
60+
l := NewLauncher().(*pubsubLauncher)
61+
_, err := l.Parse(tt.args)
62+
if (err != nil) != tt.wantErr {
63+
t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
64+
return
65+
}
66+
if tt.wantErr {
67+
return
68+
}
69+
if l.config.pathPrefix != tt.wantPrefix {
70+
t.Errorf("Parse() pathPrefix = %v, want %v", l.config.pathPrefix, tt.wantPrefix)
71+
}
72+
if l.config.triggerMaxRetries != tt.wantRetry {
73+
t.Errorf("Parse() triggerMaxRetries = %v, want %v", l.config.triggerMaxRetries, tt.wantRetry)
74+
}
75+
})
76+
}
77+
}
78+
79+
func TestSetupSubrouters(t *testing.T) {
80+
l := NewLauncher().(*pubsubLauncher)
81+
_, _ = l.Parse([]string{"-path_prefix=/api"})
82+
83+
router := mux.NewRouter()
84+
config := &launcher.Config{}
85+
86+
err := l.SetupSubrouters(router, config)
87+
if err != nil {
88+
t.Fatalf("SetupSubrouters() failed: %v", err)
89+
}
90+
91+
// Verify route is registered
92+
req := httptest.NewRequest(http.MethodPost, "/api/apps/my-app/trigger/pubsub", nil)
93+
var match mux.RouteMatch
94+
if !router.Match(req, &match) {
95+
t.Errorf("SetupSubrouters() did not register expected route")
96+
}
97+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package triggers
16+
17+
import "time"
18+
19+
// TriggerConfig contains configuration options for triggers.
20+
type TriggerConfig struct {
21+
// MaxRetries is the maximum number of times to retry a failed agent execution.
22+
MaxRetries int
23+
// BaseDelay is the base delay between retries.
24+
BaseDelay time.Duration
25+
// MaxDelay is the maximum delay between retries.
26+
MaxDelay time.Duration
27+
// MaxConcurrentRuns is the maximum number of concurrent runs.
28+
MaxConcurrentRuns int
29+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package triggers
16+
17+
import (
18+
"encoding/json"
19+
"fmt"
20+
"net/http"
21+
22+
"google.golang.org/adk/agent"
23+
"google.golang.org/adk/artifact"
24+
"google.golang.org/adk/memory"
25+
"google.golang.org/adk/runner"
26+
"google.golang.org/adk/server/adkrest/internal/models"
27+
"google.golang.org/adk/session"
28+
)
29+
30+
const defaultUserID = "pubsub-caller"
31+
32+
// PubSubController handles the PubSub trigger endpoints.
33+
type PubSubController struct {
34+
runner *RetriableRunner
35+
semaphore chan struct{}
36+
}
37+
38+
// NewPubSubController creates a new PubSubController.
39+
func NewPubSubController(sessionService session.Service, agentLoader agent.Loader, memoryService memory.Service, artifactService artifact.Service, pluginConfig runner.PluginConfig, triggerConfig TriggerConfig) *PubSubController {
40+
return &PubSubController{
41+
runner: &RetriableRunner{
42+
sessionService: sessionService,
43+
agentLoader: agentLoader,
44+
memoryService: memoryService,
45+
artifactService: artifactService,
46+
pluginConfig: pluginConfig,
47+
triggerConfig: triggerConfig,
48+
},
49+
semaphore: make(chan struct{}, triggerConfig.MaxConcurrentRuns),
50+
}
51+
}
52+
53+
// PubSubTriggerHandler handles the PubSub trigger endpoint.
54+
func (c *PubSubController) PubSubTriggerHandler(w http.ResponseWriter, r *http.Request) {
55+
if c.semaphore != nil {
56+
c.semaphore <- struct{}{}
57+
defer func() { <-c.semaphore }()
58+
}
59+
60+
// Parse the request to the request model.
61+
var req models.PubSubTriggerRequest
62+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
63+
respondError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode request: %v", err))
64+
return
65+
}
66+
67+
// Decode base64 message data.
68+
messageContent := make(map[string]any)
69+
if len(req.Message.Data) > 0 {
70+
// Avoids encoding the data twice later with json.Marshal.
71+
messageContent["data"] = string(req.Message.Data)
72+
}
73+
// Add attributes to the messageContent if present
74+
if len(req.Message.Attributes) > 0 {
75+
messageContent["attributes"] = req.Message.Attributes
76+
}
77+
78+
agentMessage, err := json.Marshal(messageContent)
79+
if err != nil {
80+
respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to marshal agent message: %v", err))
81+
return
82+
}
83+
84+
appName, err := appName(r)
85+
if err != nil {
86+
respondError(w, http.StatusInternalServerError, err.Error())
87+
return
88+
}
89+
90+
if _, err := c.runner.RunAgent(r.Context(), appName, req.Subscription, string(agentMessage)); err != nil {
91+
respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to run agent: %v", err))
92+
return
93+
}
94+
95+
respondSuccess(w)
96+
}

0 commit comments

Comments
 (0)