diff --git a/cmd/launcher/full/full.go b/cmd/launcher/full/full.go index 9cf2c92a0..a6f46d3b2 100644 --- a/cmd/launcher/full/full.go +++ b/cmd/launcher/full/full.go @@ -22,10 +22,11 @@ import ( "google.golang.org/adk/cmd/launcher/web" "google.golang.org/adk/cmd/launcher/web/a2a" "google.golang.org/adk/cmd/launcher/web/api" + "google.golang.org/adk/cmd/launcher/web/triggers/pubsub" "google.golang.org/adk/cmd/launcher/web/webui" ) // NewLauncher returnes the most versatile universal launcher with all options built-in. func NewLauncher() launcher.Launcher { - return universal.NewLauncher(console.NewLauncher(), web.NewLauncher(webui.NewLauncher(), a2a.NewLauncher(), api.NewLauncher())) + return universal.NewLauncher(console.NewLauncher(), web.NewLauncher(webui.NewLauncher(), a2a.NewLauncher(), pubsub.NewLauncher(), api.NewLauncher())) } diff --git a/cmd/launcher/web/triggers/pubsub/pubsub.go b/cmd/launcher/web/triggers/pubsub/pubsub.go new file mode 100644 index 000000000..2e77c05b1 --- /dev/null +++ b/cmd/launcher/web/triggers/pubsub/pubsub.go @@ -0,0 +1,136 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package pubsub provides a sublauncher that adds PubSub trigger capabilities to ADK web server. +package pubsub + +import ( + "flag" + "fmt" + "net/http" + "strings" + "time" + + "github.com/gorilla/mux" + + "google.golang.org/adk/cmd/launcher" + "google.golang.org/adk/cmd/launcher/web" + "google.golang.org/adk/internal/cli/util" + "google.golang.org/adk/server/adkrest/controllers/triggers" +) + +type pubsubConfig struct { + pathPrefix string + triggerMaxRetries int + triggerBaseDelay time.Duration + triggerMaxDelay time.Duration + triggerMaxRuns int +} + +type pubsubLauncher struct { + flags *flag.FlagSet + config *pubsubConfig +} + +// NewLauncher creates a new pubsub launcher. It extends Web launcher. +func NewLauncher() web.Sublauncher { + config := &pubsubConfig{} + + fs := flag.NewFlagSet("pubsub", flag.ContinueOnError) + fs.StringVar(&config.pathPrefix, "path_prefix", "/api", "Path prefix for the PubSub trigger endpoint. Default is '/api'.") + fs.IntVar(&config.triggerMaxRetries, "trigger_max_retries", 3, "Maximum retries for HTTP 429 errors from triggers") + fs.DurationVar(&config.triggerBaseDelay, "trigger_base_delay", 1*time.Second, "Base delay for trigger retry exponential backoff") + fs.DurationVar(&config.triggerMaxDelay, "trigger_max_delay", 10*time.Second, "Maximum delay for trigger retry exponential backoff") + fs.IntVar(&config.triggerMaxRuns, "trigger_max_concurrent_runs", 100, "Maximum concurrent trigger runs") + + return &pubsubLauncher{ + config: config, + flags: fs, + } +} + +// Keyword implements web.Sublauncher. Returns the command-line keyword for pubsub launcher. +func (p *pubsubLauncher) Keyword() string { + return "pubsub" +} + +// Parse parses the command-line arguments for the pubsub launcher. +func (p *pubsubLauncher) Parse(args []string) ([]string, error) { + err := p.flags.Parse(args) + if err != nil || !p.flags.Parsed() { + return nil, fmt.Errorf("failed to parse pubsub flags: %v", err) + } + if p.config.triggerMaxRetries <= 0 { + return nil, fmt.Errorf("trigger_max_retries must be > 0") + } + if p.config.triggerBaseDelay < 0 { + return nil, fmt.Errorf("trigger_base_delay must be >= 0") + } + if p.config.triggerMaxDelay <= 0 { + return nil, fmt.Errorf("trigger_max_delay must be > 0") + } + if p.config.triggerMaxRuns <= 0 { + return nil, fmt.Errorf("trigger_max_concurrent_runs must be > 0") + } + + prefix := p.config.pathPrefix + if !strings.HasPrefix(prefix, "/") { + prefix = "/" + prefix + } + p.config.pathPrefix = strings.TrimSuffix(prefix, "/") + + return p.flags.Args(), nil +} + +// CommandLineSyntax returns the command-line syntax for the pubsub launcher. +func (p *pubsubLauncher) CommandLineSyntax() string { + return util.FormatFlagUsage(p.flags) +} + +// SimpleDescription implements web.Sublauncher. +func (p *pubsubLauncher) SimpleDescription() string { + return "starts ADK PubSub trigger endpoint server" +} + +// SetupSubrouters adds the PubSub trigger endpoint to the parent router. +func (p *pubsubLauncher) SetupSubrouters(router *mux.Router, config *launcher.Config) error { + triggerConfig := triggers.TriggerConfig{ + MaxRetries: p.config.triggerMaxRetries, + BaseDelay: p.config.triggerBaseDelay, + MaxDelay: p.config.triggerMaxDelay, + MaxConcurrentRuns: p.config.triggerMaxRuns, + } + + controller := triggers.NewPubSubController( + config.SessionService, + config.AgentLoader, + config.MemoryService, + config.ArtifactService, + config.PluginConfig, + triggerConfig, + ) + + subrouter := router + if p.config.pathPrefix != "" && p.config.pathPrefix != "/" { + subrouter = router.PathPrefix(p.config.pathPrefix).Subrouter() + } + + subrouter.HandleFunc("/apps/{app_name}/trigger/pubsub", controller.PubSubTriggerHandler).Methods(http.MethodPost) + return nil +} + +// UserMessage implements web.Sublauncher. +func (p *pubsubLauncher) UserMessage(webURL string, printer func(v ...any)) { + printer(fmt.Sprintf(" pubsub: PubSub trigger endpoint is available at %s%s/apps/{app_name}/trigger/pubsub", webURL, p.config.pathPrefix)) +} diff --git a/cmd/launcher/web/triggers/pubsub/pubsub_test.go b/cmd/launcher/web/triggers/pubsub/pubsub_test.go new file mode 100644 index 000000000..79f20098d --- /dev/null +++ b/cmd/launcher/web/triggers/pubsub/pubsub_test.go @@ -0,0 +1,97 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gorilla/mux" + + "google.golang.org/adk/cmd/launcher" +) + +func TestParse(t *testing.T) { + tests := []struct { + name string + args []string + wantPrefix string + wantRetry int + wantErr bool + }{ + { + name: "default values", + args: []string{}, + wantPrefix: "/api", + wantRetry: 3, + wantErr: false, + }, + { + name: "custom prefix and retries", + args: []string{"-path_prefix=/custom", "-trigger_max_retries=5"}, + wantPrefix: "/custom", + wantRetry: 5, + wantErr: false, + }, + { + name: "invalid retry count", + args: []string{"-trigger_max_retries=-1"}, + wantPrefix: "/api", + wantRetry: 3, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := NewLauncher().(*pubsubLauncher) + _, err := l.Parse(tt.args) + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr { + return + } + if l.config.pathPrefix != tt.wantPrefix { + t.Errorf("Parse() pathPrefix = %v, want %v", l.config.pathPrefix, tt.wantPrefix) + } + if l.config.triggerMaxRetries != tt.wantRetry { + t.Errorf("Parse() triggerMaxRetries = %v, want %v", l.config.triggerMaxRetries, tt.wantRetry) + } + }) + } +} + +func TestSetupSubrouters(t *testing.T) { + l := NewLauncher().(*pubsubLauncher) + _, _ = l.Parse([]string{"-path_prefix=/api"}) + + router := mux.NewRouter() + config := &launcher.Config{} + + err := l.SetupSubrouters(router, config) + if err != nil { + t.Fatalf("SetupSubrouters() failed: %v", err) + } + + // Verify route is registered + req := httptest.NewRequest(http.MethodPost, "/api/apps/my-app/trigger/pubsub", nil) + var match mux.RouteMatch + if !router.Match(req, &match) { + t.Errorf("SetupSubrouters() did not register expected route") + } +} diff --git a/server/adkrest/controllers/triggers/config.go b/server/adkrest/controllers/triggers/config.go new file mode 100644 index 000000000..3fae42b78 --- /dev/null +++ b/server/adkrest/controllers/triggers/config.go @@ -0,0 +1,29 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package triggers + +import "time" + +// TriggerConfig contains configuration options for triggers. +type TriggerConfig struct { + // MaxRetries is the maximum number of times to retry a failed agent execution. + MaxRetries int + // BaseDelay is the base delay between retries. + BaseDelay time.Duration + // MaxDelay is the maximum delay between retries. + MaxDelay time.Duration + // MaxConcurrentRuns is the maximum number of concurrent runs. + MaxConcurrentRuns int +} diff --git a/server/adkrest/controllers/triggers/pubsub.go b/server/adkrest/controllers/triggers/pubsub.go new file mode 100644 index 000000000..fea0912ca --- /dev/null +++ b/server/adkrest/controllers/triggers/pubsub.go @@ -0,0 +1,101 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package triggers + +import ( + "encoding/json" + "fmt" + "net/http" + + "google.golang.org/adk/agent" + "google.golang.org/adk/artifact" + "google.golang.org/adk/memory" + "google.golang.org/adk/runner" + "google.golang.org/adk/server/adkrest/internal/models" + "google.golang.org/adk/session" +) + +const defaultUserID = "pubsub-caller" + +// PubSubController handles the PubSub trigger endpoints. +type PubSubController struct { + runner *RetriableRunner + semaphore chan struct{} +} + +// NewPubSubController creates a new PubSubController. +func NewPubSubController(sessionService session.Service, agentLoader agent.Loader, memoryService memory.Service, artifactService artifact.Service, pluginConfig runner.PluginConfig, triggerConfig TriggerConfig) *PubSubController { + return &PubSubController{ + runner: &RetriableRunner{ + sessionService: sessionService, + agentLoader: agentLoader, + memoryService: memoryService, + artifactService: artifactService, + pluginConfig: pluginConfig, + triggerConfig: triggerConfig, + }, + semaphore: make(chan struct{}, triggerConfig.MaxConcurrentRuns), + } +} + +// PubSubTriggerHandler handles the PubSub trigger endpoint. +func (c *PubSubController) PubSubTriggerHandler(w http.ResponseWriter, r *http.Request) { + if c.semaphore != nil { + c.semaphore <- struct{}{} + defer func() { <-c.semaphore }() + } + + // Parse the request to the request model. + var req models.PubSubTriggerRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + respondError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode request: %v", err)) + return + } + + // Decode base64 message data. + messageContent := make(map[string]any) + if len(req.Message.Data) > 0 { + // Avoids encoding the data twice later with json.Marshal. + messageContent["data"] = string(req.Message.Data) + } + // Add attributes to the messageContent if present + if len(req.Message.Attributes) > 0 { + messageContent["attributes"] = req.Message.Attributes + } + + if len(messageContent) == 0 { + respondError(w, http.StatusBadRequest, "empty message data and attributes") + return + } + + agentMessage, err := json.Marshal(messageContent) + if err != nil { + respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to marshal agent message: %v", err)) + return + } + + appName, err := appName(r) + if err != nil { + respondError(w, http.StatusInternalServerError, err.Error()) + return + } + + if _, err := c.runner.RunAgent(r.Context(), appName, req.Subscription, string(agentMessage)); err != nil { + respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to run agent: %v", err)) + return + } + + respondSuccess(w) +} diff --git a/server/adkrest/controllers/triggers/pubsub_test.go b/server/adkrest/controllers/triggers/pubsub_test.go new file mode 100644 index 000000000..a7539389e --- /dev/null +++ b/server/adkrest/controllers/triggers/pubsub_test.go @@ -0,0 +1,181 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package triggers_test + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "iter" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gorilla/mux" + + "google.golang.org/adk/agent" + + "google.golang.org/adk/runner" + "google.golang.org/adk/server/adkrest/controllers/triggers" + "google.golang.org/adk/server/adkrest/internal/fakes" + "google.golang.org/adk/server/adkrest/internal/models" + "google.golang.org/adk/session" +) + +var defaultTriggerConfig = triggers.TriggerConfig{ + MaxConcurrentRuns: 10, + MaxRetries: 3, + BaseDelay: 1 * time.Millisecond, + MaxDelay: 5 * time.Millisecond, +} + +func TestPubSubTriggerHandler(t *testing.T) { + tests := []struct { + name string + mockAgentResults []error + expectedCode int + expectedRunCount int + requestAttributes map[string]string + expectedAttributes map[string]string + requestData string + }{ + { + name: "Success_Immediate", + mockAgentResults: nil, + expectedCode: http.StatusOK, + expectedRunCount: 1, + requestData: "Hello agent", + }, + { + name: "ResourceExhaustedRetry", + mockAgentResults: []error{fmt.Errorf("429 ResourceExhausted"), fmt.Errorf("429 ResourceExhausted")}, + expectedCode: http.StatusOK, + expectedRunCount: 3, + requestData: "Hello agent", + }, + { + name: "With_Attributes", + mockAgentResults: nil, + expectedCode: http.StatusOK, + expectedRunCount: 1, + requestAttributes: map[string]string{"key1": "val1", "key2": "val2"}, + expectedAttributes: map[string]string{"key1": "val1", "key2": "val2"}, + requestData: "Hello agent", + }, + { + name: "Empty Data", + mockAgentResults: nil, + expectedCode: http.StatusBadRequest, + expectedRunCount: 0, + requestData: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockAgentRunCount := 0 + testAgent := createMockAgent(t, tc.mockAgentResults, &mockAgentRunCount, tc.expectedAttributes) + + apiController := setupTest(t, testAgent) + + reqObj := models.PubSubTriggerRequest{ + Message: models.PubSubMessage{ + Data: []byte(base64.StdEncoding.EncodeToString([]byte(tc.requestData))), + Attributes: tc.requestAttributes, + }, + Subscription: "test-sub", + } + reqBytes, err := json.Marshal(reqObj) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req, err := http.NewRequest(http.MethodPost, "/apps/test-agent/triggers/pubsub", bytes.NewBuffer(reqBytes)) + if err != nil { + t.Fatalf("new request: %v", err) + } + req = mux.SetURLVars(req, map[string]string{"app_name": "test-agent"}) + rr := httptest.NewRecorder() + + apiController.PubSubTriggerHandler(rr, req) + + if rr.Code != tc.expectedCode { + t.Errorf("expected status %d, got %d. Body: %s", tc.expectedCode, rr.Code, rr.Body.String()) + } + + if mockAgentRunCount != tc.expectedRunCount { + t.Errorf("expected %d run attempts, got %d", tc.expectedRunCount, mockAgentRunCount) + } + }) + } +} + +func setupTest(t *testing.T, a agent.Agent) *triggers.PubSubController { + t.Helper() + sessionService := &fakes.FakeSessionService{Sessions: make(map[fakes.SessionKey]fakes.TestSession)} + agentLoader := agent.NewSingleLoader(a) + return triggers.NewPubSubController(sessionService, agentLoader, nil, nil, runner.PluginConfig{}, defaultTriggerConfig) +} + +func createMockAgent(t *testing.T, results []error, runCount *int, expectedAttributes map[string]string) agent.Agent { + t.Helper() + testAgent, err := agent.New(agent.Config{ + Name: "test-agent", + Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] { + return func(yield func(*session.Event, error) bool) { + *runCount++ + + userContent := ctx.UserContent() + if len(expectedAttributes) > 0 { + if userContent == nil || len(userContent.Parts) == 0 { + t.Errorf("expected user content but got none") + } else { + var msgMap map[string]any + err := json.Unmarshal([]byte(userContent.Parts[0].Text), &msgMap) + if err != nil { + t.Errorf("failed to unmarshal message content: %v", err) + } else { + gotAttrs, ok := msgMap["attributes"].(map[string]any) + if !ok { + t.Errorf("expected attributes map, got %T", msgMap["attributes"]) + } else { + for k, v := range expectedAttributes { + if gotAttrs[k] != v { + t.Errorf("expected attribute %s=%s, got %s", k, v, gotAttrs[k]) + } + } + } + } + } + } + + if *runCount <= len(results) { + err := results[*runCount-1] + if err != nil { + yield(nil, err) + return + } + } + yield(&session.Event{ID: "success-event"}, nil) + } + }, + }) + if err != nil { + t.Fatalf("agent.New failed: %v", err) + } + return testAgent +} diff --git a/server/adkrest/controllers/triggers/triggers.go b/server/adkrest/controllers/triggers/triggers.go new file mode 100644 index 000000000..6bf6e9aca --- /dev/null +++ b/server/adkrest/controllers/triggers/triggers.go @@ -0,0 +1,154 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package triggers + +import ( + "context" + "fmt" + "math" + "math/rand" + "net/http" + "strings" + "time" + + "github.com/gorilla/mux" + "google.golang.org/genai" + + "google.golang.org/adk/agent" + "google.golang.org/adk/artifact" + "google.golang.org/adk/memory" + "google.golang.org/adk/runner" + "google.golang.org/adk/server/adkrest/controllers" + "google.golang.org/adk/server/adkrest/internal/models" + "google.golang.org/adk/session" +) + +type RetriableRunner struct { + sessionService session.Service + agentLoader agent.Loader + memoryService memory.Service + artifactService artifact.Service + pluginConfig runner.PluginConfig + triggerConfig TriggerConfig +} + +func (r *RetriableRunner) RunAgent(ctx context.Context, appName, userID, messageContent string) ([]*session.Event, error) { + if userID == "" { + userID = defaultUserID + } + + // Each retry = new session + sessReq := &session.CreateRequest{ + AppName: appName, + UserID: userID, + } + sessResp, err := r.sessionService.Create(ctx, sessReq) + if err != nil { + return nil, fmt.Errorf("failed to create session: %v", err) + } + + userMessage := genai.Content{ + Role: "user", + Parts: []*genai.Part{ + {Text: messageContent}, + }, + } + + curAgent, err := r.agentLoader.LoadAgent(appName) + if err != nil { + return nil, fmt.Errorf("failed to load agent: %v", err) + } + + runR, err := runner.New(runner.Config{ + AppName: appName, + Agent: curAgent, + SessionService: r.sessionService, + MemoryService: r.memoryService, + ArtifactService: r.artifactService, + PluginConfig: r.pluginConfig, + }) + if err != nil { + return nil, fmt.Errorf("failed to create runner: %v", err) + } + + return r.runAgentWithRetry(ctx, runR, sessResp.Session.UserID(), sessResp.Session.ID(), &userMessage) +} + +// runAgentWithRetry uses exponential backoff with jitter to handle 429 rate-limit errors. +// After MaxRetries is exhausted, raises an error to signal the upstream service (Pub/Sub, Eventarc) to retry at a higher level. +func (r *RetriableRunner) runAgentWithRetry(ctx context.Context, runR *runner.Runner, userID, sessionID string, userMessage *genai.Content) ([]*session.Event, error) { + var runErr error + events := []*session.Event{} + for i := 0; i <= r.triggerConfig.MaxRetries; i++ { + resp := runR.Run(ctx, userID, sessionID, userMessage, agent.RunConfig{StreamingMode: agent.StreamingModeNone}) + + isThrottled := false + for event, err := range resp { + if err != nil { + runErr = err + if isResourceExhausted(err) { + isThrottled = true + } + break + } + events = append(events, event) + } + + if !isThrottled && runErr == nil { + return events, nil // Success + } + + if i < r.triggerConfig.MaxRetries && isThrottled { + delay := calculateBackoff(i, r.triggerConfig.BaseDelay, r.triggerConfig.MaxDelay) + time.Sleep(delay) + runErr = nil // Clear error for next attempt + continue + } + break // Not throttled (but error raised) or max retries reached + } + return nil, runErr +} + +func respondError(w http.ResponseWriter, code int, msg string) { + resp := models.TriggerResponse{Status: msg} + controllers.EncodeJSONResponse(resp, code, w) +} + +func respondSuccess(w http.ResponseWriter) { + resp := models.TriggerResponse{Status: "success"} + controllers.EncodeJSONResponse(resp, http.StatusOK, w) +} + +// Check if an exception represents a transient rate-limit error. +func isResourceExhausted(err error) bool { + return err != nil && (strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "ResourceExhausted")) +} + +func calculateBackoff(attempt int, base, maxDelay time.Duration) time.Duration { + backoff := float64(base) * math.Pow(2, float64(attempt)) + delay := min(time.Duration(backoff), maxDelay) + jitter := time.Duration(rand.Float64() * float64(delay) * 0.5) + return delay + jitter +} + +// Resolve the target app name from the request. +func appName(r *http.Request) (string, error) { + vars := mux.Vars(r) + appName := vars["app_name"] + if appName == "" { + return "", fmt.Errorf("no application name provided") + } + return appName, nil +} diff --git a/server/adkrest/internal/models/triggers.go b/server/adkrest/internal/models/triggers.go new file mode 100644 index 000000000..08e6e5a15 --- /dev/null +++ b/server/adkrest/internal/models/triggers.go @@ -0,0 +1,44 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +// PubSubTriggerRequest represents the request for the PubSub trigger. +// See: https://cloud.google.com/pubsub/docs/push#receive_push +type PubSubTriggerRequest struct { + // The Pub/Sub message. + Message PubSubMessage `json:"message"` + // The subscription this message was published to. + Subscription string `json:"subscription"` +} + +// PubSubMessage represents the message for the PubSub trigger. +type PubSubMessage struct { + // The message payload. This will always be a base64-encoded string. + Data []byte `json:"data"` + // ID of this message, assigned by the Pub/Sub server. + MessageID string `json:"messageId"` + // The time at which the message was published, populated by the server. + PublishTime string `json:"publishTime"` + // Optional attributes for this message. An object containing a list of 'key': 'value' string pairs. + Attributes map[string]string `json:"attributes,omitempty"` + // If message ordering is enabled, this identifies related messages for which publish order should be respected. + OrderingKey string `json:"orderingKey,omitempty"` +} + +// TriggerResponse represents the standard response for Pub/Sub and Eventarc triggers. +type TriggerResponse struct { + // Processing status: 'success' or error message. + Status string `json:"status"` +}