diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..454d2dd --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +bridge/kelos-bridge +coverage.out + diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..5724547 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.PHONY: test test-go test-plugin fmt + +test: test-go test-plugin + +test-go: + cd bridge && go test ./... + +test-plugin: + cd plugin && node --test --experimental-strip-types src/*.test.ts + +fmt: + cd bridge && gofmt -w $$(find . -name '*.go' -type f) + diff --git a/README.md b/README.md new file mode 100644 index 0000000..b9e4ac1 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# openclaw-kelos + +Separate integration repo for running Kelos `Task` resources from OpenClaw without changing Kelos core. + +Current contents: +- `bridge/`: Go HTTP bridge that validates requests, creates managed Kelos tasks, exposes task status, and reconciles background notifications. +- `plugin/`: TypeScript plugin package that exposes `kelos_run_task` and `kelos_task_status`. +- `docs/`: short architecture notes. +- `examples/`: sample bridge env and plugin wiring. + +The bridge supports two backends today: +- `memory`: local development and tests +- `kubectl`: shells out to `kubectl` for real `Workspace` and `Task` resources + +Quick start: + +```bash +cp examples/bridge.env.example .env +make test +``` + +See [docs/architecture.md](docs/architecture.md) for the integration shape and current boundaries. + diff --git a/bridge/cmd/kelos-bridge/main.go b/bridge/cmd/kelos-bridge/main.go new file mode 100644 index 0000000..124c7d5 --- /dev/null +++ b/bridge/cmd/kelos-bridge/main.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "errors" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/openclaw/openclaw-kelos/bridge/internal/config" + "github.com/openclaw/openclaw-kelos/bridge/internal/httpapi" + "github.com/openclaw/openclaw-kelos/bridge/internal/kelos" + "github.com/openclaw/openclaw-kelos/bridge/internal/notifier" + "github.com/openclaw/openclaw-kelos/bridge/internal/service" +) + +func main() { + cfg, err := config.LoadFromEnv() + if err != nil { + log.Fatalf("load config: %v", err) + } + + backend, err := newBackend(cfg) + if err != nil { + log.Fatalf("create backend: %v", err) + } + + var taskNotifier service.Notifier = notifier.NoopNotifier{} + if cfg.Notification.Endpoint != "" { + taskNotifier = notifier.NewHTTP( + cfg.Notification.Endpoint, + cfg.Notification.BearerToken, + cfg.Notification.Timeout, + ) + } + + svc := service.New(cfg, backend, taskNotifier) + handler := httpapi.NewHandler(svc, cfg.AuthToken) + + server := &http.Server{ + Addr: cfg.ListenAddr, + Handler: handler, + ReadHeaderTimeout: 5 * time.Second, + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + if cfg.PollInterval > 0 { + go pollLoop(ctx, svc, cfg.PollInterval) + } + + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = server.Shutdown(shutdownCtx) + }() + + log.Printf("kelos bridge listening on %s using %s backend", cfg.ListenAddr, cfg.Backend) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("listen: %v", err) + } +} + +func newBackend(cfg config.Config) (kelos.Backend, error) { + switch cfg.Backend { + case "memory": + return kelos.NewMemoryBackend(cfg.SeedWorkspaces...), nil + case "kubectl": + return kelos.NewKubectlBackend(cfg.Namespace, cfg.KubectlPath), nil + default: + return nil, errors.New("unsupported backend: " + cfg.Backend) + } +} + +func pollLoop(ctx context.Context, svc *service.Service, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := svc.ReconcileNotifications(ctx); err != nil { + log.Printf("reconcile notifications: %v", err) + } + } + } +} diff --git a/bridge/go.mod b/bridge/go.mod new file mode 100644 index 0000000..ac6d352 --- /dev/null +++ b/bridge/go.mod @@ -0,0 +1,4 @@ +module github.com/openclaw/openclaw-kelos/bridge + +go 1.26.0 + diff --git a/bridge/internal/config/config.go b/bridge/internal/config/config.go new file mode 100644 index 0000000..77e059d --- /dev/null +++ b/bridge/internal/config/config.go @@ -0,0 +1,191 @@ +package config + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "strings" + "time" + + "github.com/openclaw/openclaw-kelos/bridge/internal/kelos" +) + +type Config struct { + ListenAddr string + Namespace string + AuthToken string + Backend string + KubectlPath string + SeedWorkspaces []string + PollInterval time.Duration + AllowedWorkspaces map[string]struct{} + DefaultTask TaskDefaults + Presets map[string]Preset + Notification NotificationConfig +} + +type TaskDefaults struct { + Type string + CredentialType kelos.CredentialType + CredentialSecret string +} + +type Preset struct { + Type string `json:"type,omitempty"` + Model string `json:"model,omitempty"` + AgentConfig string `json:"agentConfig,omitempty"` + TimeoutSeconds int64 `json:"timeoutSeconds,omitempty"` + CredentialType kelos.CredentialType `json:"credentialType,omitempty"` + CredentialSecret string `json:"credentialSecret,omitempty"` + AllowedWorkspaces []string `json:"allowedWorkspaces,omitempty"` +} + +type NotificationConfig struct { + Endpoint string + BearerToken string + Timeout time.Duration +} + +func LoadFromEnv() (Config, error) { + pollInterval, err := durationEnv("BRIDGE_POLL_INTERVAL", 15*time.Second) + if err != nil { + return Config{}, err + } + + notificationTimeout, err := durationEnv("BRIDGE_NOTIFICATION_TIMEOUT", 5*time.Second) + if err != nil { + return Config{}, err + } + + credentialType, err := parseCredentialType(envOrDefault("BRIDGE_DEFAULT_CREDENTIAL_TYPE", string(kelos.CredentialTypeOAuth))) + if err != nil { + return Config{}, err + } + + presets, err := parsePresets(os.Getenv("BRIDGE_PRESETS_JSON")) + if err != nil { + return Config{}, err + } + + cfg := Config{ + ListenAddr: envOrDefault("BRIDGE_LISTEN_ADDR", ":8080"), + Namespace: envOrDefault("BRIDGE_NAMESPACE", "default"), + AuthToken: strings.TrimSpace(os.Getenv("BRIDGE_AUTH_TOKEN")), + Backend: envOrDefault("BRIDGE_BACKEND", "memory"), + KubectlPath: envOrDefault("BRIDGE_KUBECTL_PATH", "kubectl"), + SeedWorkspaces: parseCSV(os.Getenv("BRIDGE_SEED_WORKSPACES")), + PollInterval: pollInterval, + AllowedWorkspaces: parseSet(os.Getenv("BRIDGE_ALLOWED_WORKSPACES")), + DefaultTask: TaskDefaults{ + Type: envOrDefault("BRIDGE_DEFAULT_TASK_TYPE", "codex"), + CredentialType: credentialType, + CredentialSecret: envOrDefault("BRIDGE_DEFAULT_CREDENTIAL_SECRET", "kelos-credentials"), + }, + Presets: presets, + Notification: NotificationConfig{ + Endpoint: strings.TrimSpace(os.Getenv("BRIDGE_NOTIFICATION_ENDPOINT")), + BearerToken: strings.TrimSpace(os.Getenv("BRIDGE_NOTIFICATION_BEARER_TOKEN")), + Timeout: notificationTimeout, + }, + } + + if cfg.AuthToken == "" { + return Config{}, errors.New("BRIDGE_AUTH_TOKEN is required") + } + + if cfg.DefaultTask.Type == "" { + return Config{}, errors.New("BRIDGE_DEFAULT_TASK_TYPE must not be empty") + } + + if cfg.DefaultTask.CredentialSecret == "" { + return Config{}, errors.New("BRIDGE_DEFAULT_CREDENTIAL_SECRET must not be empty") + } + + return cfg, nil +} + +func envOrDefault(key, fallback string) string { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback + } + return value +} + +func durationEnv(key string, fallback time.Duration) (time.Duration, error) { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return fallback, nil + } + + parsed, err := time.ParseDuration(value) + if err != nil { + return 0, fmt.Errorf("%s: %w", key, err) + } + + return parsed, nil +} + +func parsePresets(raw string) (map[string]Preset, error) { + if strings.TrimSpace(raw) == "" { + return map[string]Preset{}, nil + } + + var presets map[string]Preset + if err := json.Unmarshal([]byte(raw), &presets); err != nil { + return nil, fmt.Errorf("BRIDGE_PRESETS_JSON: %w", err) + } + + for name, preset := range presets { + credentialType, err := parseCredentialType(string(preset.CredentialType)) + if err != nil { + return nil, fmt.Errorf("preset %q: %w", name, err) + } + preset.CredentialType = credentialType + presets[name] = preset + } + + return presets, nil +} + +func parseCredentialType(value string) (kelos.CredentialType, error) { + switch strings.TrimSpace(value) { + case "", string(kelos.CredentialTypeOAuth): + return kelos.CredentialTypeOAuth, nil + case string(kelos.CredentialTypeAPIKey): + return kelos.CredentialTypeAPIKey, nil + default: + return "", errors.New("credential type must be api-key or oauth") + } +} + +func parseCSV(raw string) []string { + if strings.TrimSpace(raw) == "" { + return nil + } + + parts := strings.Split(raw, ",") + values := make([]string, 0, len(parts)) + for _, part := range parts { + part = strings.TrimSpace(part) + if part != "" { + values = append(values, part) + } + } + + return values +} + +func parseSet(raw string) map[string]struct{} { + values := parseCSV(raw) + if len(values) == 0 { + return map[string]struct{}{} + } + + set := make(map[string]struct{}, len(values)) + for _, value := range values { + set[value] = struct{}{} + } + return set +} diff --git a/bridge/internal/httpapi/server.go b/bridge/internal/httpapi/server.go new file mode 100644 index 0000000..2536610 --- /dev/null +++ b/bridge/internal/httpapi/server.go @@ -0,0 +1,151 @@ +package httpapi + +import ( + "encoding/json" + "errors" + "io" + "net/http" + "strings" + + "github.com/openclaw/openclaw-kelos/bridge/internal/service" +) + +type handler struct { + service *service.Service + authToken string +} + +type errorEnvelope struct { + Error errorBody `json:"error"` +} + +type errorBody struct { + Code string `json:"code"` + Message string `json:"message"` +} + +func NewHandler(svc *service.Service, authToken string) http.Handler { + h := &handler{ + service: svc, + authToken: authToken, + } + + root := http.NewServeMux() + root.HandleFunc("GET /healthz", h.handleHealth) + root.Handle("/v1/", h.requireAuth(http.HandlerFunc(h.handleAPI))) + return root +} + +func (h *handler) handleHealth(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +func (h *handler) handleAPI(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/tasks/run": + h.handleRunTask(w, r) + case r.Method == http.MethodGet && strings.HasPrefix(r.URL.Path, "/v1/tasks/"): + h.handleGetTask(w, r) + default: + writeError(w, service.Err(service.CodeNotFound, "route not found")) + } +} + +func (h *handler) handleRunTask(w http.ResponseWriter, r *http.Request) { + var req service.RunTaskRequest + if err := decodeJSON(r.Body, &req); err != nil { + writeError(w, service.Err(service.CodeInvalidArgument, err.Error())) + return + } + + response, err := h.service.RunTask(r.Context(), req) + if err != nil { + writeError(w, err) + return + } + + writeJSON(w, http.StatusCreated, response) +} + +func (h *handler) handleGetTask(w http.ResponseWriter, r *http.Request) { + taskID := strings.TrimPrefix(r.URL.Path, "/v1/tasks/") + if taskID == "" || strings.Contains(taskID, "/") { + writeError(w, service.Err(service.CodeInvalidArgument, "task id is required")) + return + } + + response, err := h.service.GetTaskStatus(r.Context(), taskID) + if err != nil { + writeError(w, err) + return + } + + writeJSON(w, http.StatusOK, response) +} + +func (h *handler) requireAuth(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + authHeader := strings.TrimSpace(r.Header.Get("Authorization")) + expected := "Bearer " + h.authToken + if authHeader != expected { + writeError(w, service.Err(service.CodeUnauthorized, "missing or invalid bearer token")) + return + } + + next.ServeHTTP(w, r) + }) +} + +func decodeJSON(body io.ReadCloser, target any) error { + defer body.Close() + + decoder := json.NewDecoder(io.LimitReader(body, 1<<20)) + decoder.DisallowUnknownFields() + + if err := decoder.Decode(target); err != nil { + if errors.Is(err, io.EOF) { + return errors.New("request body is required") + } + return err + } + + return nil +} + +func writeError(w http.ResponseWriter, err error) { + code := http.StatusInternalServerError + appErr := &service.AppError{Code: service.CodeInternal, Message: "internal error"} + if errors.As(err, &appErr) { + code = statusForCode(appErr.Code) + } + + writeJSON(w, code, errorEnvelope{ + Error: errorBody{ + Code: appErr.Code, + Message: appErr.Message, + }, + }) +} + +func statusForCode(code string) int { + switch code { + case service.CodeInvalidArgument: + return http.StatusBadRequest + case service.CodeUnauthorized: + return http.StatusUnauthorized + case service.CodeForbidden: + return http.StatusForbidden + case service.CodeNotFound: + return http.StatusNotFound + case service.CodeConflict: + return http.StatusConflict + default: + return http.StatusInternalServerError + } +} + +func writeJSON(w http.ResponseWriter, status int, value any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(value) +} diff --git a/bridge/internal/httpapi/server_test.go b/bridge/internal/httpapi/server_test.go new file mode 100644 index 0000000..57a0ad0 --- /dev/null +++ b/bridge/internal/httpapi/server_test.go @@ -0,0 +1,101 @@ +package httpapi + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/openclaw/openclaw-kelos/bridge/internal/config" + "github.com/openclaw/openclaw-kelos/bridge/internal/kelos" + "github.com/openclaw/openclaw-kelos/bridge/internal/notifier" + "github.com/openclaw/openclaw-kelos/bridge/internal/service" +) + +func TestRunTaskRequiresAuth(t *testing.T) { + backend := kelos.NewMemoryBackend("demo-workspace") + svc := service.New(testConfig(), backend, notifier.NoopNotifier{}) + handler := NewHandler(svc, "bridge-token") + + request := httptest.NewRequest(http.MethodPost, "/v1/tasks/run", bytes.NewBufferString(`{}`)) + response := httptest.NewRecorder() + + handler.ServeHTTP(response, request) + + if response.Code != http.StatusUnauthorized { + t.Fatalf("expected status 401, got %d", response.Code) + } +} + +func TestRunTaskAndGetStatus(t *testing.T) { + backend := kelos.NewMemoryBackend("demo-workspace") + svc := service.New(testConfig(), backend, notifier.NoopNotifier{}) + handler := NewHandler(svc, "bridge-token") + + body, err := json.Marshal(service.RunTaskRequest{ + Prompt: "review the latest change", + Workspace: "demo-workspace", + SessionKey: "session-1", + RequestID: "req-1", + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + createRequest := httptest.NewRequest(http.MethodPost, "/v1/tasks/run", bytes.NewReader(body)) + createRequest.Header.Set("Authorization", "Bearer bridge-token") + createResponse := httptest.NewRecorder() + + handler.ServeHTTP(createResponse, createRequest) + + if createResponse.Code != http.StatusCreated { + t.Fatalf("expected status 201, got %d: %s", createResponse.Code, createResponse.Body.String()) + } + + var created service.RunTaskResponse + if err := json.NewDecoder(createResponse.Body).Decode(&created); err != nil { + t.Fatalf("decode create response: %v", err) + } + if created.TaskID == "" { + t.Fatal("expected task id to be set") + } + + statusRequest := httptest.NewRequest(http.MethodGet, "/v1/tasks/"+created.TaskID, nil) + statusRequest.Header.Set("Authorization", "Bearer bridge-token") + statusResponse := httptest.NewRecorder() + + handler.ServeHTTP(statusResponse, statusRequest) + + if statusResponse.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", statusResponse.Code, statusResponse.Body.String()) + } + + var status service.TaskStatusResponse + if err := json.NewDecoder(statusResponse.Body).Decode(&status); err != nil { + t.Fatalf("decode status response: %v", err) + } + + if status.TaskID != created.TaskID { + t.Fatalf("expected task id %q, got %q", created.TaskID, status.TaskID) + } + if status.Workspace != "demo-workspace" { + t.Fatalf("expected workspace demo-workspace, got %q", status.Workspace) + } + if status.RequestID != "req-1" { + t.Fatalf("expected request id req-1, got %q", status.RequestID) + } +} + +func testConfig() config.Config { + return config.Config{ + Namespace: "default", + DefaultTask: config.TaskDefaults{ + Type: "codex", + CredentialType: kelos.CredentialTypeOAuth, + CredentialSecret: "kelos-credentials", + }, + Presets: map[string]config.Preset{}, + AllowedWorkspaces: map[string]struct{}{"demo-workspace": {}}, + } +} diff --git a/bridge/internal/kelos/backend.go b/bridge/internal/kelos/backend.go new file mode 100644 index 0000000..45aa583 --- /dev/null +++ b/bridge/internal/kelos/backend.go @@ -0,0 +1,84 @@ +package kelos + +import ( + "context" + "errors" + "time" +) + +var ErrNotFound = errors.New("kelos resource not found") + +type Backend interface { + WorkspaceExists(ctx context.Context, name string) (bool, error) + CreateTask(ctx context.Context, task Task) (*Task, error) + GetTask(ctx context.Context, name string) (*Task, error) + ListManagedTasks(ctx context.Context) ([]Task, error) + UpdateAnnotations(ctx context.Context, name string, annotations map[string]string) error +} + +func CloneTask(task Task) Task { + cloned := task + cloned.Metadata.Labels = cloneStringMap(task.Metadata.Labels) + cloned.Metadata.Annotations = cloneStringMap(task.Metadata.Annotations) + cloned.Spec.DependsOn = cloneStringSlice(task.Spec.DependsOn) + cloned.Status.Outputs = cloneStringSlice(task.Status.Outputs) + cloned.Status.Results = cloneStringMap(task.Status.Results) + + if task.Spec.WorkspaceRef != nil { + workspaceRef := *task.Spec.WorkspaceRef + cloned.Spec.WorkspaceRef = &workspaceRef + } + + if task.Spec.AgentConfigRef != nil { + agentConfigRef := *task.Spec.AgentConfigRef + cloned.Spec.AgentConfigRef = &agentConfigRef + } + + if task.Spec.PodOverrides != nil { + podOverrides := *task.Spec.PodOverrides + if task.Spec.PodOverrides.ActiveDeadlineSeconds != nil { + value := *task.Spec.PodOverrides.ActiveDeadlineSeconds + podOverrides.ActiveDeadlineSeconds = &value + } + cloned.Spec.PodOverrides = &podOverrides + } + + if task.Status.StartTime != nil { + startTime := *task.Status.StartTime + cloned.Status.StartTime = &startTime + } + + if task.Status.CompletionTime != nil { + completionTime := *task.Status.CompletionTime + cloned.Status.CompletionTime = &completionTime + } + + return cloned +} + +func cloneStringMap(values map[string]string) map[string]string { + if len(values) == 0 { + return nil + } + + cloned := make(map[string]string, len(values)) + for key, value := range values { + cloned[key] = value + } + return cloned +} + +func cloneStringSlice(values []string) []string { + if len(values) == 0 { + return nil + } + + cloned := make([]string, len(values)) + copy(cloned, values) + return cloned +} + +func nowPtr() *time.Time { + now := time.Now().UTC() + return &now +} diff --git a/bridge/internal/kelos/kubectl.go b/bridge/internal/kelos/kubectl.go new file mode 100644 index 0000000..5ed68d5 --- /dev/null +++ b/bridge/internal/kelos/kubectl.go @@ -0,0 +1,125 @@ +package kelos + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "os/exec" + "strings" +) + +type KubectlBackend struct { + namespace string + kubectlPath string +} + +type taskList struct { + Items []Task `json:"items"` +} + +type workspaceResource struct { + Metadata Metadata `json:"metadata,omitempty"` +} + +func NewKubectlBackend(namespace, kubectlPath string) *KubectlBackend { + return &KubectlBackend{ + namespace: namespace, + kubectlPath: kubectlPath, + } +} + +func (b *KubectlBackend) WorkspaceExists(ctx context.Context, name string) (bool, error) { + _, err := b.run(ctx, "get", "workspace", name, "-o", "json") + if err == nil { + return true, nil + } + if strings.Contains(strings.ToLower(err.Error()), "not found") { + return false, nil + } + return false, err +} + +func (b *KubectlBackend) CreateTask(ctx context.Context, task Task) (*Task, error) { + payload, err := json.Marshal(task) + if err != nil { + return nil, fmt.Errorf("marshal task: %w", err) + } + + output, err := b.runWithInput(ctx, payload, "create", "-f", "-", "-o", "json") + if err != nil { + return nil, err + } + + var created Task + if err := json.Unmarshal(output, &created); err != nil { + return nil, fmt.Errorf("decode created task: %w", err) + } + return &created, nil +} + +func (b *KubectlBackend) GetTask(ctx context.Context, name string) (*Task, error) { + output, err := b.run(ctx, "get", "task", name, "-o", "json") + if err != nil { + if strings.Contains(strings.ToLower(err.Error()), "not found") { + return nil, ErrNotFound + } + return nil, err + } + + var task Task + if err := json.Unmarshal(output, &task); err != nil { + return nil, fmt.Errorf("decode task: %w", err) + } + + return &task, nil +} + +func (b *KubectlBackend) ListManagedTasks(ctx context.Context) ([]Task, error) { + output, err := b.run(ctx, "get", "tasks", "-l", ManagedLabelKey+"="+ManagedLabelValue, "-o", "json") + if err != nil { + return nil, err + } + + var list taskList + if err := json.Unmarshal(output, &list); err != nil { + return nil, fmt.Errorf("decode task list: %w", err) + } + + return list.Items, nil +} + +func (b *KubectlBackend) UpdateAnnotations(ctx context.Context, name string, annotations map[string]string) error { + patch := map[string]any{ + "metadata": map[string]any{ + "annotations": annotations, + }, + } + + payload, err := json.Marshal(patch) + if err != nil { + return fmt.Errorf("marshal annotation patch: %w", err) + } + + _, err = b.run(ctx, "patch", "task", name, "--type", "merge", "-p", string(payload)) + return err +} + +func (b *KubectlBackend) run(ctx context.Context, args ...string) ([]byte, error) { + return b.runWithInput(ctx, nil, args...) +} + +func (b *KubectlBackend) runWithInput(ctx context.Context, input []byte, args ...string) ([]byte, error) { + commandArgs := append([]string{"-n", b.namespace}, args...) + cmd := exec.CommandContext(ctx, b.kubectlPath, commandArgs...) + if len(input) > 0 { + cmd.Stdin = bytes.NewReader(input) + } + + output, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("kubectl %s: %w: %s", strings.Join(commandArgs, " "), err, strings.TrimSpace(string(output))) + } + + return output, nil +} diff --git a/bridge/internal/kelos/memory.go b/bridge/internal/kelos/memory.go new file mode 100644 index 0000000..79f05eb --- /dev/null +++ b/bridge/internal/kelos/memory.go @@ -0,0 +1,133 @@ +package kelos + +import ( + "context" + "errors" + "sync" + "time" +) + +type MemoryBackend struct { + mu sync.RWMutex + workspaces map[string]struct{} + tasks map[string]Task +} + +func NewMemoryBackend(workspaces ...string) *MemoryBackend { + backend := &MemoryBackend{ + workspaces: make(map[string]struct{}, len(workspaces)), + tasks: make(map[string]Task), + } + + for _, workspace := range workspaces { + backend.workspaces[workspace] = struct{}{} + } + + return backend +} + +func (b *MemoryBackend) SeedWorkspace(name string) { + b.mu.Lock() + defer b.mu.Unlock() + b.workspaces[name] = struct{}{} +} + +func (b *MemoryBackend) WorkspaceExists(_ context.Context, name string) (bool, error) { + b.mu.RLock() + defer b.mu.RUnlock() + _, ok := b.workspaces[name] + return ok, nil +} + +func (b *MemoryBackend) CreateTask(_ context.Context, task Task) (*Task, error) { + b.mu.Lock() + defer b.mu.Unlock() + + if task.Metadata.Name == "" { + return nil, errors.New("task name is required") + } + if _, exists := b.tasks[task.Metadata.Name]; exists { + return nil, errors.New("task already exists") + } + + task.APIVersion = defaultString(task.APIVersion, "kelos.dev/v1alpha1") + task.Kind = defaultString(task.Kind, "Task") + if task.Metadata.CreationTimestamp.IsZero() { + task.Metadata.CreationTimestamp = time.Now().UTC() + } + if task.Status.Phase == "" { + task.Status.Phase = TaskPhasePending + } + + stored := CloneTask(task) + b.tasks[task.Metadata.Name] = stored + + cloned := CloneTask(stored) + return &cloned, nil +} + +func (b *MemoryBackend) GetTask(_ context.Context, name string) (*Task, error) { + b.mu.RLock() + defer b.mu.RUnlock() + + task, ok := b.tasks[name] + if !ok { + return nil, ErrNotFound + } + + cloned := CloneTask(task) + return &cloned, nil +} + +func (b *MemoryBackend) ListManagedTasks(_ context.Context) ([]Task, error) { + b.mu.RLock() + defer b.mu.RUnlock() + + tasks := make([]Task, 0, len(b.tasks)) + for _, task := range b.tasks { + if task.Metadata.Labels[ManagedLabelKey] != ManagedLabelValue { + continue + } + tasks = append(tasks, CloneTask(task)) + } + return tasks, nil +} + +func (b *MemoryBackend) UpdateAnnotations(_ context.Context, name string, annotations map[string]string) error { + b.mu.Lock() + defer b.mu.Unlock() + + task, ok := b.tasks[name] + if !ok { + return ErrNotFound + } + if task.Metadata.Annotations == nil { + task.Metadata.Annotations = map[string]string{} + } + for key, value := range annotations { + task.Metadata.Annotations[key] = value + } + b.tasks[name] = task + return nil +} + +func (b *MemoryBackend) UpdateTaskStatus(name string, status TaskStatus) error { + b.mu.Lock() + defer b.mu.Unlock() + + task, ok := b.tasks[name] + if !ok { + return ErrNotFound + } + + task.Status = status + b.tasks[name] = task + return nil +} + +func defaultString(value, fallback string) string { + if value == "" { + return fallback + } + return value +} diff --git a/bridge/internal/kelos/types.go b/bridge/internal/kelos/types.go new file mode 100644 index 0000000..0b07658 --- /dev/null +++ b/bridge/internal/kelos/types.go @@ -0,0 +1,86 @@ +package kelos + +import "time" + +type CredentialType string + +const ( + CredentialTypeAPIKey CredentialType = "api-key" + CredentialTypeOAuth CredentialType = "oauth" +) + +const ( + TaskPhasePending = "Pending" + TaskPhaseRunning = "Running" + TaskPhaseSucceeded = "Succeeded" + TaskPhaseFailed = "Failed" + TaskPhaseWaiting = "Waiting" +) + +const ( + ManagedLabelKey = "openclaw.kelos.dev/managed" + ManagedLabelValue = "true" + AnnotationSessionKey = "openclaw.kelos.dev/sessionKey" + AnnotationRequestID = "openclaw.kelos.dev/requestId" + AnnotationPreset = "openclaw.kelos.dev/preset" + AnnotationLastNotifiedPhase = "openclaw.kelos.dev/lastNotifiedPhase" + AnnotationLastFinalHash = "openclaw.kelos.dev/lastFinalHash" +) + +type SecretReference struct { + Name string `json:"name"` +} + +type Credentials struct { + Type CredentialType `json:"type"` + SecretRef SecretReference `json:"secretRef"` +} + +type WorkspaceReference struct { + Name string `json:"name"` +} + +type AgentConfigReference struct { + Name string `json:"name"` +} + +type PodOverrides struct { + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` +} + +type TaskSpec struct { + Type string `json:"type"` + Prompt string `json:"prompt"` + Credentials Credentials `json:"credentials"` + Model string `json:"model,omitempty"` + WorkspaceRef *WorkspaceReference `json:"workspaceRef,omitempty"` + AgentConfigRef *AgentConfigReference `json:"agentConfigRef,omitempty"` + DependsOn []string `json:"dependsOn,omitempty"` + Branch string `json:"branch,omitempty"` + PodOverrides *PodOverrides `json:"podOverrides,omitempty"` +} + +type TaskStatus struct { + Phase string `json:"phase,omitempty"` + StartTime *time.Time `json:"startTime,omitempty"` + CompletionTime *time.Time `json:"completionTime,omitempty"` + Message string `json:"message,omitempty"` + Outputs []string `json:"outputs,omitempty"` + Results map[string]string `json:"results,omitempty"` +} + +type Metadata struct { + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + CreationTimestamp time.Time `json:"creationTimestamp,omitempty"` +} + +type Task struct { + APIVersion string `json:"apiVersion,omitempty"` + Kind string `json:"kind,omitempty"` + Metadata Metadata `json:"metadata,omitempty"` + Spec TaskSpec `json:"spec,omitempty"` + Status TaskStatus `json:"status,omitempty"` +} diff --git a/bridge/internal/notifier/http.go b/bridge/internal/notifier/http.go new file mode 100644 index 0000000..b6aaa1c --- /dev/null +++ b/bridge/internal/notifier/http.go @@ -0,0 +1,70 @@ +package notifier + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/openclaw/openclaw-kelos/bridge/internal/service" +) + +type NoopNotifier struct{} + +func (NoopNotifier) NotifyTaskUpdate(context.Context, string, service.TaskUpdate) error { + return nil +} + +type HTTPNotifier struct { + endpoint string + bearerToken string + client *http.Client +} + +type payload struct { + SessionKey string `json:"sessionKey"` + Update service.TaskUpdate `json:"update"` +} + +func NewHTTP(endpoint, bearerToken string, timeout time.Duration) HTTPNotifier { + return HTTPNotifier{ + endpoint: endpoint, + bearerToken: bearerToken, + client: &http.Client{ + Timeout: timeout, + }, + } +} + +func (n HTTPNotifier) NotifyTaskUpdate(ctx context.Context, sessionKey string, update service.TaskUpdate) error { + body, err := json.Marshal(payload{ + SessionKey: sessionKey, + Update: update, + }) + if err != nil { + return fmt.Errorf("marshal notification: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, n.endpoint, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("build notification request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if n.bearerToken != "" { + req.Header.Set("Authorization", "Bearer "+n.bearerToken) + } + + resp, err := n.client.Do(req) + if err != nil { + return fmt.Errorf("send notification: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + return fmt.Errorf("notification endpoint returned %s", resp.Status) + } + + return nil +} diff --git a/bridge/internal/service/service.go b/bridge/internal/service/service.go new file mode 100644 index 0000000..012ba4e --- /dev/null +++ b/bridge/internal/service/service.go @@ -0,0 +1,422 @@ +package service + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "regexp" + "strings" + + "github.com/openclaw/openclaw-kelos/bridge/internal/config" + "github.com/openclaw/openclaw-kelos/bridge/internal/kelos" +) + +const ( + CodeInvalidArgument = "invalid_argument" + CodeUnauthorized = "unauthorized" + CodeForbidden = "forbidden" + CodeNotFound = "not_found" + CodeConflict = "conflict" + CodeInternal = "internal" +) + +var nameSanitizer = regexp.MustCompile(`[^a-z0-9-]+`) + +type AppError struct { + Code string + Message string +} + +func (e *AppError) Error() string { + return e.Message +} + +func Err(code, message string) *AppError { + return &AppError{ + Code: code, + Message: message, + } +} + +type Notifier interface { + NotifyTaskUpdate(ctx context.Context, sessionKey string, update TaskUpdate) error +} + +type Service struct { + cfg config.Config + backend kelos.Backend + notifier Notifier + newID func() string +} + +type RunTaskRequest struct { + Prompt string `json:"prompt"` + Workspace string `json:"workspace"` + Preset string `json:"preset,omitempty"` + Type string `json:"type,omitempty"` + Model string `json:"model,omitempty"` + Branch string `json:"branch,omitempty"` + AgentConfig string `json:"agentConfig,omitempty"` + TimeoutSeconds int64 `json:"timeoutSeconds,omitempty"` + SessionKey string `json:"sessionKey"` + RequestID string `json:"requestId"` +} + +type RunTaskResponse struct { + TaskID string `json:"taskId"` + Phase string `json:"phase"` + RequestID string `json:"requestId"` +} + +type TaskStatusResponse struct { + TaskID string `json:"taskId"` + Phase string `json:"phase"` + Message string `json:"message,omitempty"` + Outputs []string `json:"outputs,omitempty"` + Results map[string]string `json:"results,omitempty"` + Workspace string `json:"workspace,omitempty"` + RequestID string `json:"requestId,omitempty"` + SessionKey string `json:"sessionKey,omitempty"` + Preset string `json:"preset,omitempty"` +} + +type TaskUpdate struct { + TaskID string `json:"taskId"` + Phase string `json:"phase"` + Message string `json:"message,omitempty"` + Outputs []string `json:"outputs,omitempty"` + Results map[string]string `json:"results,omitempty"` + Final bool `json:"final"` + RequestID string `json:"requestId,omitempty"` + Workspace string `json:"workspace,omitempty"` + Preset string `json:"preset,omitempty"` + SessionKey string `json:"sessionKey,omitempty"` +} + +func New(cfg config.Config, backend kelos.Backend, notifier Notifier) *Service { + if notifier == nil { + panic("notifier must not be nil") + } + + return &Service{ + cfg: cfg, + backend: backend, + notifier: notifier, + newID: randomID, + } +} + +func (s *Service) RunTask(ctx context.Context, req RunTaskRequest) (*RunTaskResponse, error) { + req.Prompt = strings.TrimSpace(req.Prompt) + req.Workspace = strings.TrimSpace(req.Workspace) + req.Preset = strings.TrimSpace(req.Preset) + req.Type = strings.TrimSpace(req.Type) + req.Model = strings.TrimSpace(req.Model) + req.Branch = strings.TrimSpace(req.Branch) + req.AgentConfig = strings.TrimSpace(req.AgentConfig) + req.SessionKey = strings.TrimSpace(req.SessionKey) + req.RequestID = strings.TrimSpace(req.RequestID) + + if req.Prompt == "" { + return nil, Err(CodeInvalidArgument, "prompt is required") + } + if req.Workspace == "" { + return nil, Err(CodeInvalidArgument, "workspace is required") + } + if req.SessionKey == "" { + return nil, Err(CodeInvalidArgument, "sessionKey is required") + } + if req.RequestID == "" { + return nil, Err(CodeInvalidArgument, "requestId is required") + } + if req.TimeoutSeconds < 0 { + return nil, Err(CodeInvalidArgument, "timeoutSeconds must be >= 0") + } + + preset, err := s.lookupPreset(req.Preset) + if err != nil { + return nil, err + } + + if !s.workspaceAllowed(req.Workspace, preset) { + return nil, Err(CodeForbidden, "workspace is not allowed") + } + + exists, err := s.backend.WorkspaceExists(ctx, req.Workspace) + if err != nil { + return nil, Err(CodeInternal, fmt.Sprintf("check workspace: %v", err)) + } + if !exists { + return nil, Err(CodeNotFound, "workspace does not exist") + } + + taskType := firstNonEmpty(req.Type, preset.Type, s.cfg.DefaultTask.Type) + model := firstNonEmpty(req.Model, preset.Model) + agentConfig := firstNonEmpty(req.AgentConfig, preset.AgentConfig) + credentialType := firstNonEmpty(string(preset.CredentialType), string(s.cfg.DefaultTask.CredentialType)) + credentialSecret := firstNonEmpty(preset.CredentialSecret, s.cfg.DefaultTask.CredentialSecret) + timeoutSeconds := firstPositive(req.TimeoutSeconds, preset.TimeoutSeconds) + + task := kelos.Task{ + APIVersion: "kelos.dev/v1alpha1", + Kind: "Task", + Metadata: kelos.Metadata{ + Name: taskName(req.RequestID, s.newID()), + Namespace: s.cfg.Namespace, + Labels: map[string]string{ + kelos.ManagedLabelKey: kelos.ManagedLabelValue, + }, + Annotations: map[string]string{ + kelos.AnnotationSessionKey: req.SessionKey, + kelos.AnnotationRequestID: req.RequestID, + kelos.AnnotationPreset: req.Preset, + }, + }, + Spec: kelos.TaskSpec{ + Type: taskType, + Prompt: req.Prompt, + Credentials: kelos.Credentials{ + Type: kelos.CredentialType(credentialType), + SecretRef: kelos.SecretReference{ + Name: credentialSecret, + }, + }, + Model: model, + WorkspaceRef: &kelos.WorkspaceReference{ + Name: req.Workspace, + }, + Branch: req.Branch, + }, + Status: kelos.TaskStatus{ + Phase: kelos.TaskPhasePending, + }, + } + + if agentConfig != "" { + task.Spec.AgentConfigRef = &kelos.AgentConfigReference{Name: agentConfig} + } + if timeoutSeconds > 0 { + task.Spec.PodOverrides = &kelos.PodOverrides{ + ActiveDeadlineSeconds: &timeoutSeconds, + } + } + + created, err := s.backend.CreateTask(ctx, task) + if err != nil { + if strings.Contains(strings.ToLower(err.Error()), "already exists") { + return nil, Err(CodeConflict, "task already exists") + } + return nil, Err(CodeInternal, fmt.Sprintf("create task: %v", err)) + } + + return &RunTaskResponse{ + TaskID: created.Metadata.Name, + Phase: created.Status.Phase, + RequestID: req.RequestID, + }, nil +} + +func (s *Service) GetTaskStatus(ctx context.Context, taskID string) (*TaskStatusResponse, error) { + task, err := s.backend.GetTask(ctx, taskID) + if err != nil { + if errors.Is(err, kelos.ErrNotFound) { + return nil, Err(CodeNotFound, "task does not exist") + } + return nil, Err(CodeInternal, fmt.Sprintf("get task: %v", err)) + } + + return &TaskStatusResponse{ + TaskID: task.Metadata.Name, + Phase: task.Status.Phase, + Message: task.Status.Message, + Outputs: task.Status.Outputs, + Results: task.Status.Results, + Workspace: workspaceName(task), + RequestID: task.Metadata.Annotations[kelos.AnnotationRequestID], + SessionKey: task.Metadata.Annotations[kelos.AnnotationSessionKey], + Preset: task.Metadata.Annotations[kelos.AnnotationPreset], + }, nil +} + +func (s *Service) ReconcileNotifications(ctx context.Context) error { + tasks, err := s.backend.ListManagedTasks(ctx) + if err != nil { + return Err(CodeInternal, fmt.Sprintf("list managed tasks: %v", err)) + } + + for _, task := range tasks { + if err := s.reconcileTask(ctx, task); err != nil { + return err + } + } + + return nil +} + +func (s *Service) reconcileTask(ctx context.Context, task kelos.Task) error { + sessionKey := task.Metadata.Annotations[kelos.AnnotationSessionKey] + if sessionKey == "" || task.Status.Phase == "" { + return nil + } + + lastPhase := task.Metadata.Annotations[kelos.AnnotationLastNotifiedPhase] + finalHash := "" + lastFinalHash := task.Metadata.Annotations[kelos.AnnotationLastFinalHash] + + isFinal := isTerminal(task.Status.Phase) + if isFinal { + finalHash = hashFinalStatus(task.Status) + if task.Status.Phase == lastPhase && finalHash == lastFinalHash { + return nil + } + } else if task.Status.Phase == lastPhase { + return nil + } + + update := TaskUpdate{ + TaskID: task.Metadata.Name, + Phase: task.Status.Phase, + Message: task.Status.Message, + Outputs: task.Status.Outputs, + Results: task.Status.Results, + Final: isFinal, + RequestID: task.Metadata.Annotations[kelos.AnnotationRequestID], + Workspace: workspaceName(&task), + Preset: task.Metadata.Annotations[kelos.AnnotationPreset], + SessionKey: sessionKey, + } + + if err := s.notifier.NotifyTaskUpdate(ctx, sessionKey, update); err != nil { + return Err(CodeInternal, fmt.Sprintf("notify task update: %v", err)) + } + + annotations := map[string]string{ + kelos.AnnotationLastNotifiedPhase: task.Status.Phase, + } + if isFinal { + annotations[kelos.AnnotationLastFinalHash] = finalHash + } + + if err := s.backend.UpdateAnnotations(ctx, task.Metadata.Name, annotations); err != nil { + if errors.Is(err, kelos.ErrNotFound) { + return Err(CodeNotFound, "task disappeared during reconcile") + } + return Err(CodeInternal, fmt.Sprintf("persist notification annotations: %v", err)) + } + + return nil +} + +func (s *Service) lookupPreset(name string) (config.Preset, error) { + if name == "" { + return config.Preset{}, nil + } + preset, ok := s.cfg.Presets[name] + if !ok { + return config.Preset{}, Err(CodeInvalidArgument, "unknown preset") + } + return preset, nil +} + +func (s *Service) workspaceAllowed(workspace string, preset config.Preset) bool { + if len(s.cfg.AllowedWorkspaces) > 0 { + if _, ok := s.cfg.AllowedWorkspaces[workspace]; !ok { + return false + } + } + + if len(preset.AllowedWorkspaces) == 0 { + return true + } + + for _, allowed := range preset.AllowedWorkspaces { + if workspace == allowed { + return true + } + } + + return false +} + +func taskName(requestID, suffix string) string { + base := strings.ToLower(strings.TrimSpace(requestID)) + base = strings.ReplaceAll(base, "_", "-") + base = nameSanitizer.ReplaceAllString(base, "-") + base = strings.Trim(base, "-") + if base == "" { + base = "task" + } + if len(base) > 36 { + base = base[:36] + base = strings.TrimRight(base, "-") + } + + if len(suffix) > 8 { + suffix = suffix[:8] + } + + return "openclaw-" + base + "-" + suffix +} + +func randomID() string { + raw := make([]byte, 6) + if _, err := rand.Read(raw); err != nil { + panic(err) + } + return hex.EncodeToString(raw) +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return value + } + } + return "" +} + +func firstPositive(values ...int64) int64 { + for _, value := range values { + if value > 0 { + return value + } + } + return 0 +} + +func isTerminal(phase string) bool { + switch phase { + case kelos.TaskPhaseSucceeded, kelos.TaskPhaseFailed: + return true + default: + return false + } +} + +func hashFinalStatus(status kelos.TaskStatus) string { + payload, _ := json.Marshal(struct { + Phase string `json:"phase"` + Message string `json:"message,omitempty"` + Outputs []string `json:"outputs,omitempty"` + Results map[string]string `json:"results,omitempty"` + }{ + Phase: status.Phase, + Message: status.Message, + Outputs: status.Outputs, + Results: status.Results, + }) + + sum := sha256.Sum256(payload) + return hex.EncodeToString(sum[:]) +} + +func workspaceName(task *kelos.Task) string { + if task.Spec.WorkspaceRef == nil { + return "" + } + return task.Spec.WorkspaceRef.Name +} diff --git a/bridge/internal/service/service_test.go b/bridge/internal/service/service_test.go new file mode 100644 index 0000000..f9e93d4 --- /dev/null +++ b/bridge/internal/service/service_test.go @@ -0,0 +1,162 @@ +package service + +import ( + "context" + "testing" + + "github.com/openclaw/openclaw-kelos/bridge/internal/config" + "github.com/openclaw/openclaw-kelos/bridge/internal/kelos" +) + +type recordingNotifier struct { + updates []TaskUpdate +} + +func (n *recordingNotifier) NotifyTaskUpdate(_ context.Context, _ string, update TaskUpdate) error { + n.updates = append(n.updates, update) + return nil +} + +func TestRunTaskAppliesPreset(t *testing.T) { + backend := kelos.NewMemoryBackend("demo-workspace") + notifier := &recordingNotifier{} + svc := New(config.Config{ + Namespace: "default", + AllowedWorkspaces: map[string]struct{}{ + "demo-workspace": {}, + }, + DefaultTask: config.TaskDefaults{ + Type: "codex", + CredentialType: kelos.CredentialTypeOAuth, + CredentialSecret: "kelos-credentials", + }, + Presets: map[string]config.Preset{ + "review": { + Model: "gpt-5-codex", + AgentConfig: "reviewer", + TimeoutSeconds: 900, + }, + }, + }, backend, notifier) + + response, err := svc.RunTask(context.Background(), RunTaskRequest{ + Prompt: "review the latest change", + Workspace: "demo-workspace", + Preset: "review", + SessionKey: "session-1", + RequestID: "req-1", + }) + if err != nil { + t.Fatalf("run task: %v", err) + } + + task, err := backend.GetTask(context.Background(), response.TaskID) + if err != nil { + t.Fatalf("get task: %v", err) + } + + if task.Spec.Model != "gpt-5-codex" { + t.Fatalf("expected preset model gpt-5-codex, got %q", task.Spec.Model) + } + if task.Spec.AgentConfigRef == nil || task.Spec.AgentConfigRef.Name != "reviewer" { + t.Fatalf("expected agent config reviewer, got %#v", task.Spec.AgentConfigRef) + } + if task.Spec.PodOverrides == nil || task.Spec.PodOverrides.ActiveDeadlineSeconds == nil || *task.Spec.PodOverrides.ActiveDeadlineSeconds != 900 { + t.Fatalf("expected timeout 900, got %#v", task.Spec.PodOverrides) + } + if task.Metadata.Labels[kelos.ManagedLabelKey] != kelos.ManagedLabelValue { + t.Fatalf("expected managed label %q", kelos.ManagedLabelValue) + } + if task.Metadata.Annotations[kelos.AnnotationSessionKey] != "session-1" { + t.Fatalf("expected session annotation session-1, got %q", task.Metadata.Annotations[kelos.AnnotationSessionKey]) + } +} + +func TestReconcileNotificationsDeduplicatesFinalUpdates(t *testing.T) { + backend := kelos.NewMemoryBackend("demo-workspace") + notifier := &recordingNotifier{} + svc := New(config.Config{ + Namespace: "default", + DefaultTask: config.TaskDefaults{ + Type: "codex", + CredentialType: kelos.CredentialTypeOAuth, + CredentialSecret: "kelos-credentials", + }, + }, backend, notifier) + + response, err := svc.RunTask(context.Background(), RunTaskRequest{ + Prompt: "review the latest change", + Workspace: "demo-workspace", + SessionKey: "session-1", + RequestID: "req-1", + }) + if err != nil { + t.Fatalf("run task: %v", err) + } + + if err := backend.UpdateTaskStatus(response.TaskID, kelos.TaskStatus{ + Phase: kelos.TaskPhaseRunning, + Message: "task is running", + }); err != nil { + t.Fatalf("set running status: %v", err) + } + + if err := svc.ReconcileNotifications(context.Background()); err != nil { + t.Fatalf("reconcile running status: %v", err) + } + if len(notifier.updates) != 1 { + t.Fatalf("expected 1 notification, got %d", len(notifier.updates)) + } + if notifier.updates[0].Final { + t.Fatal("expected running update to be non-final") + } + + if err := svc.ReconcileNotifications(context.Background()); err != nil { + t.Fatalf("reconcile running status again: %v", err) + } + if len(notifier.updates) != 1 { + t.Fatalf("expected running phase to stay deduplicated, got %d notifications", len(notifier.updates)) + } + + if err := backend.UpdateTaskStatus(response.TaskID, kelos.TaskStatus{ + Phase: kelos.TaskPhaseSucceeded, + Message: "task completed", + Outputs: []string{"https://example.com/pr/1"}, + Results: map[string]string{"branch": "review/pr-1"}, + }); err != nil { + t.Fatalf("set final status: %v", err) + } + + if err := svc.ReconcileNotifications(context.Background()); err != nil { + t.Fatalf("reconcile final status: %v", err) + } + if len(notifier.updates) != 2 { + t.Fatalf("expected 2 notifications after final update, got %d", len(notifier.updates)) + } + if !notifier.updates[1].Final { + t.Fatal("expected succeeded update to be final") + } + + if err := svc.ReconcileNotifications(context.Background()); err != nil { + t.Fatalf("reconcile final status again: %v", err) + } + if len(notifier.updates) != 2 { + t.Fatalf("expected final update to stay deduplicated, got %d notifications", len(notifier.updates)) + } + + if err := backend.UpdateTaskStatus(response.TaskID, kelos.TaskStatus{ + Phase: kelos.TaskPhaseSucceeded, + Message: "task completed with new metadata", + Outputs: []string{"https://example.com/pr/1"}, + Results: map[string]string{"branch": "review/pr-1"}, + }); err != nil { + t.Fatalf("set mutated final status: %v", err) + } + + if err := svc.ReconcileNotifications(context.Background()); err != nil { + t.Fatalf("reconcile changed final status: %v", err) + } + if len(notifier.updates) != 3 { + t.Fatalf("expected changed final payload to trigger a third notification, got %d", len(notifier.updates)) + } +} diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..e4309ad --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,32 @@ +# Architecture + +Locked decisions from the handoff: +- Keep this integration in a separate repo owned by the Kelos side. +- Run the OpenClaw plugin inside the Gateway process. +- Run the Kelos bridge as a separate Go service on the same host as the Gateway. +- Create native Kelos `Task` resources only. No `TaskSpawner`, cancellation, live log streaming, or Kelos core changes in v1. + +Current flow: +1. `kelos_run_task` is called from the OpenClaw plugin. +2. The plugin sends a signed HTTP request to the bridge and includes the originating OpenClaw `sessionKey` plus a `requestId`. +3. The bridge validates auth, checks workspace/preset allowlists, verifies the target `Workspace`, and creates a managed Kelos `Task`. +4. `kelos_task_status` fetches the latest task status over the same bridge API. +5. The bridge periodically reconciles managed tasks and emits milestone/final updates through a notifier interface. + +Managed task metadata: +- Label: `openclaw.kelos.dev/managed=true` +- Annotations: + - `openclaw.kelos.dev/sessionKey` + - `openclaw.kelos.dev/requestId` + - `openclaw.kelos.dev/preset` + - `openclaw.kelos.dev/lastNotifiedPhase` + - `openclaw.kelos.dev/lastFinalHash` + +Backends: +- `memory`: self-contained local development and tests +- `kubectl`: a pragmatic shell-backed adapter that uses the current kube context to create and inspect `Workspace` and `Task` resources + +Notification path: +- The bridge ships a generic HTTP notifier. +- A production OpenClaw deployment can point that notifier at a small Gateway-facing adapter that translates the update payload into `chat.inject` calls. + diff --git a/examples/bridge.env.example b/examples/bridge.env.example new file mode 100644 index 0000000..8cf85db --- /dev/null +++ b/examples/bridge.env.example @@ -0,0 +1,15 @@ +BRIDGE_LISTEN_ADDR=:8080 +BRIDGE_AUTH_TOKEN=replace-me +BRIDGE_BACKEND=memory +BRIDGE_NAMESPACE=default +BRIDGE_SEED_WORKSPACES=demo-workspace,review-workspace +BRIDGE_ALLOWED_WORKSPACES=demo-workspace,review-workspace +BRIDGE_DEFAULT_TASK_TYPE=codex +BRIDGE_DEFAULT_CREDENTIAL_TYPE=oauth +BRIDGE_DEFAULT_CREDENTIAL_SECRET=kelos-credentials +BRIDGE_POLL_INTERVAL=15s +BRIDGE_PRESETS_JSON={"review":{"model":"gpt-5-codex","agentConfig":"reviewer","timeoutSeconds":1800,"allowedWorkspaces":["review-workspace"]}} +BRIDGE_NOTIFICATION_ENDPOINT=http://localhost:3000/internal/openclaw/session-update +BRIDGE_NOTIFICATION_BEARER_TOKEN=replace-me +BRIDGE_NOTIFICATION_TIMEOUT=5s + diff --git a/examples/plugin-config.example.ts b/examples/plugin-config.example.ts new file mode 100644 index 0000000..0ce209e --- /dev/null +++ b/examples/plugin-config.example.ts @@ -0,0 +1,7 @@ +import { createKelosPlugin } from "../plugin/src/index.ts"; + +export default createKelosPlugin({ + bridgeUrl: process.env.KELOS_BRIDGE_URL ?? "http://127.0.0.1:8080", + authToken: process.env.KELOS_BRIDGE_TOKEN ?? "replace-me", +}); + diff --git a/plugin/package.json b/plugin/package.json new file mode 100644 index 0000000..65a3002 --- /dev/null +++ b/plugin/package.json @@ -0,0 +1,10 @@ +{ + "name": "@openclaw/kelos-plugin", + "private": true, + "version": "0.1.0", + "type": "module", + "scripts": { + "test": "node --test --experimental-strip-types src/*.test.ts" + } +} + diff --git a/plugin/src/client.ts b/plugin/src/client.ts new file mode 100644 index 0000000..2e6a8ee --- /dev/null +++ b/plugin/src/client.ts @@ -0,0 +1,95 @@ +import type { + BridgeClientConfig, + BridgeRunTaskRequest, + RunTaskResponse, + TaskStatusResponse, +} from "./types.ts"; + +export class BridgeError extends Error { + status: number; + code?: string; + + constructor(status: number, message: string, code?: string) { + super(message); + this.name = "BridgeError"; + this.status = status; + this.code = code; + } +} + +export class KelosBridgeClient { + readonly bridgeUrl: string; + readonly authToken: string; + readonly fetchImpl: typeof fetch; + + constructor(config: BridgeClientConfig) { + this.bridgeUrl = config.bridgeUrl.replace(/\/+$/, ""); + this.authToken = config.authToken; + this.fetchImpl = config.fetchImpl ?? fetch; + } + + async runTask(input: BridgeRunTaskRequest): Promise { + return this.request("/v1/tasks/run", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(input), + }); + } + + async getTaskStatus(taskId: string): Promise { + return this.request(`/v1/tasks/${encodeURIComponent(taskId)}`, { + method: "GET", + }); + } + + private async request(path: string, init: RequestInit): Promise { + const response = await this.fetchImpl(`${this.bridgeUrl}${path}`, { + ...init, + headers: { + Authorization: `Bearer ${this.authToken}`, + ...(init.headers ?? {}), + }, + }); + + const payload = await this.readPayload(response); + if (!response.ok) { + const message = + this.extractErrorMessage(payload) ?? + `bridge request failed with status ${response.status}`; + const code = this.extractErrorCode(payload); + throw new BridgeError(response.status, message, code); + } + + return payload as T; + } + + private async readPayload(response: Response): Promise { + const contentType = response.headers.get("content-type") ?? ""; + if (contentType.includes("application/json")) { + return response.json(); + } + + return response.text(); + } + + private extractErrorMessage(payload: unknown): string | undefined { + if (typeof payload !== "object" || payload === null) { + return undefined; + } + + const error = (payload as { error?: { message?: string } }).error; + return error?.message; + } + + private extractErrorCode(payload: unknown): string | undefined { + if (typeof payload !== "object" || payload === null) { + return undefined; + } + + const error = (payload as { error?: { code?: string } }).error; + return error?.code; + } +} + diff --git a/plugin/src/index.ts b/plugin/src/index.ts new file mode 100644 index 0000000..3b1c45f --- /dev/null +++ b/plugin/src/index.ts @@ -0,0 +1,115 @@ +import { randomUUID } from "node:crypto"; + +import { KelosBridgeClient } from "./client.ts"; +import type { + JsonSchema, + KelosPlugin, + PluginConfig, + RunTaskInput, + RunTaskResponse, + TaskStatusInput, + TaskStatusResponse, + ToolContext, + ToolDefinition, +} from "./types.ts"; + +const runTaskSchema: JsonSchema = { + type: "object", + description: "Create a native Kelos Task in a specific Workspace.", + additionalProperties: false, + required: ["prompt", "workspace"], + properties: { + prompt: { type: "string" }, + workspace: { type: "string" }, + preset: { type: "string" }, + type: { type: "string" }, + model: { type: "string" }, + branch: { type: "string" }, + agentConfig: { type: "string" }, + timeoutSeconds: { type: "number", minimum: 0 }, + }, +}; + +const taskStatusSchema: JsonSchema = { + type: "object", + description: "Fetch the latest phase and outputs for an existing Kelos Task.", + additionalProperties: false, + required: ["taskId"], + properties: { + taskId: { type: "string" }, + }, +}; + +export function createKelosPlugin(config: PluginConfig): KelosPlugin { + const client = new KelosBridgeClient(config); + + const runTaskTool: ToolDefinition = { + name: "kelos_run_task", + description: "Run a Kelos Task in a pre-provisioned Workspace.", + inputSchema: runTaskSchema, + async execute(input: RunTaskInput, context: ToolContext) { + const sessionKey = context.sessionKey?.trim(); + if (!sessionKey) { + throw new Error("sessionKey is required in the OpenClaw tool context"); + } + + const response = await client.runTask({ + ...input, + sessionKey, + requestId: context.requestIdFactory?.() ?? randomUUID(), + }); + + return { + content: `Started Kelos task ${response.taskId} (${response.phase}).`, + data: response, + }; + }, + }; + + const statusTool: ToolDefinition = { + name: "kelos_task_status", + description: "Check the latest status for a Kelos Task previously created by the bridge.", + inputSchema: taskStatusSchema, + async execute(input: TaskStatusInput) { + const response = await client.getTaskStatus(input.taskId); + return { + content: summarizeStatus(response), + data: response, + }; + }, + }; + + return { + name: config.pluginName ?? "openclaw-kelos", + version: config.version ?? "0.1.0", + tools: [runTaskTool, statusTool], + }; +} + +function summarizeStatus(response: TaskStatusResponse): string { + const parts = [`Task ${response.taskId}`, `phase=${response.phase}`]; + if (response.workspace) { + parts.push(`workspace=${response.workspace}`); + } + if (response.message) { + parts.push(response.message); + } + return parts.join(" | "); +} + +export { BridgeError, KelosBridgeClient } from "./client.ts"; +export type { + BridgeClientConfig, + BridgeRunTaskRequest, + JsonSchema, + KelosPlugin, + PluginConfig, + RunTaskInput, + RunTaskResponse, + TaskStatusInput, + TaskStatusResponse, + ToolContext, + ToolDefinition, + ToolResult, +} from "./types.ts"; + diff --git a/plugin/src/plugin.test.ts b/plugin/src/plugin.test.ts new file mode 100644 index 0000000..5dea113 --- /dev/null +++ b/plugin/src/plugin.test.ts @@ -0,0 +1,167 @@ +import assert from "node:assert/strict"; +import { once } from "node:events"; +import http from "node:http"; +import test from "node:test"; + +import { BridgeError, KelosBridgeClient, createKelosPlugin } from "./index.ts"; + +test("client sends auth and task payload to the bridge", async (t) => { + let capturedAuth = ""; + let capturedBody = ""; + + const server = http.createServer((req, res) => { + capturedAuth = req.headers.authorization ?? ""; + req.setEncoding("utf8"); + req.on("data", (chunk) => { + capturedBody += chunk; + }); + req.on("end", () => { + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ taskId: "task-1", phase: "Pending", requestId: "req-1" })); + }); + }); + + server.listen(0, "127.0.0.1"); + await once(server, "listening"); + t.after(() => server.close()); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("expected TCP server address"); + } + + const client = new KelosBridgeClient({ + bridgeUrl: `http://127.0.0.1:${address.port}`, + authToken: "bridge-token", + }); + + const response = await client.runTask({ + prompt: "review the latest change", + workspace: "demo-workspace", + sessionKey: "session-1", + requestId: "req-1", + }); + + assert.equal(response.taskId, "task-1"); + assert.equal(capturedAuth, "Bearer bridge-token"); + + const body = JSON.parse(capturedBody) as { workspace: string; requestId: string }; + assert.equal(body.workspace, "demo-workspace"); + assert.equal(body.requestId, "req-1"); +}); + +test("plugin exposes both tools and uses the session key from context", async (t) => { + let capturedPath = ""; + let capturedBody = ""; + + const server = http.createServer((req, res) => { + capturedPath = req.url ?? ""; + if (req.method === "POST") { + req.setEncoding("utf8"); + req.on("data", (chunk) => { + capturedBody += chunk; + }); + req.on("end", () => { + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ taskId: "task-2", phase: "Pending", requestId: "req-42" })); + }); + return; + } + + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ taskId: "task-2", phase: "Running", workspace: "demo-workspace" })); + }); + + server.listen(0, "127.0.0.1"); + await once(server, "listening"); + t.after(() => server.close()); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("expected TCP server address"); + } + + const plugin = createKelosPlugin({ + bridgeUrl: `http://127.0.0.1:${address.port}`, + authToken: "bridge-token", + }); + + assert.equal(plugin.tools[0].name, "kelos_run_task"); + assert.equal(plugin.tools[1].name, "kelos_task_status"); + + const runResult = await plugin.tools[0].execute( + { + prompt: "review the latest change", + workspace: "demo-workspace", + }, + { + sessionKey: "session-1", + requestIdFactory: () => "req-42", + }, + ); + + assert.match(runResult.content, /Started Kelos task task-2/); + assert.equal(capturedPath, "/v1/tasks/run"); + + const runBody = JSON.parse(capturedBody) as { sessionKey: string; requestId: string }; + assert.equal(runBody.sessionKey, "session-1"); + assert.equal(runBody.requestId, "req-42"); + + const statusResult = await plugin.tools[1].execute({ taskId: "task-2" }, {}); + assert.match(statusResult.content, /phase=Running/); +}); + +test("client surfaces bridge errors and the run tool rejects missing session keys", async (t) => { + const server = http.createServer((_req, res) => { + res.statusCode = 400; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: { code: "invalid_argument", message: "prompt is required" } })); + }); + + server.listen(0, "127.0.0.1"); + await once(server, "listening"); + t.after(() => server.close()); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("expected TCP server address"); + } + + const client = new KelosBridgeClient({ + bridgeUrl: `http://127.0.0.1:${address.port}`, + authToken: "bridge-token", + }); + + await assert.rejects( + async () => { + await client.runTask({ + prompt: "", + workspace: "demo-workspace", + sessionKey: "session-1", + requestId: "req-1", + }); + }, + (error: unknown) => { + assert.ok(error instanceof BridgeError); + assert.equal(error.status, 400); + assert.equal(error.code, "invalid_argument"); + return true; + }, + ); + + const plugin = createKelosPlugin({ + bridgeUrl: `http://127.0.0.1:${address.port}`, + authToken: "bridge-token", + }); + + await assert.rejects( + plugin.tools[0].execute( + { + prompt: "review the latest change", + workspace: "demo-workspace", + }, + {}, + ), + /sessionKey is required/, + ); +}); diff --git a/plugin/src/types.ts b/plugin/src/types.ts new file mode 100644 index 0000000..51e159e --- /dev/null +++ b/plugin/src/types.ts @@ -0,0 +1,83 @@ +export type JsonSchema = { + type: "object"; + description?: string; + additionalProperties?: boolean; + required?: string[]; + properties: Record; +}; + +export interface RunTaskInput { + prompt: string; + workspace: string; + preset?: string; + type?: string; + model?: string; + branch?: string; + agentConfig?: string; + timeoutSeconds?: number; +} + +export interface BridgeRunTaskRequest extends RunTaskInput { + sessionKey: string; + requestId: string; +} + +export interface RunTaskResponse { + taskId: string; + phase: string; + requestId: string; +} + +export interface TaskStatusInput { + taskId: string; +} + +export interface TaskStatusResponse { + taskId: string; + phase: string; + message?: string; + outputs?: string[]; + results?: Record; + workspace?: string; + requestId?: string; + sessionKey?: string; + preset?: string; +} + +export interface ToolContext { + sessionKey?: string; + requestIdFactory?: () => string; +} + +export interface ToolResult { + content: string; + data: T; +} + +export interface ToolDefinition { + name: string; + description: string; + inputSchema: JsonSchema; + execute(input: I, context: ToolContext): Promise>; +} + +export interface KelosPlugin { + name: string; + version: string; + tools: [ + ToolDefinition, + ToolDefinition + ]; +} + +export interface BridgeClientConfig { + bridgeUrl: string; + authToken: string; + fetchImpl?: typeof fetch; +} + +export interface PluginConfig extends BridgeClientConfig { + pluginName?: string; + version?: string; +} + diff --git a/plugin/tsconfig.json b/plugin/tsconfig.json new file mode 100644 index 0000000..f02d526 --- /dev/null +++ b/plugin/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "strict": true, + "noEmit": true, + "allowImportingTsExtensions": true, + "verbatimModuleSyntax": true + }, + "include": ["src/**/*.ts"] +} +