Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
bridge/kelos-bridge
coverage.out

13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.PHONY: test test-go test-plugin fmt

test: test-go test-plugin

test-go:
cd bridge && go test ./...

test-plugin:
cd plugin && node --test --experimental-strip-types src/*.test.ts

fmt:
cd bridge && gofmt -w $$(find . -name '*.go' -type f)

23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# openclaw-kelos

Separate integration repo for running Kelos `Task` resources from OpenClaw without changing Kelos core.

Current contents:
- `bridge/`: Go HTTP bridge that validates requests, creates managed Kelos tasks, exposes task status, and reconciles background notifications.
- `plugin/`: TypeScript plugin package that exposes `kelos_run_task` and `kelos_task_status`.
- `docs/`: short architecture notes.
- `examples/`: sample bridge env and plugin wiring.

The bridge supports two backends today:
- `memory`: local development and tests
- `kubectl`: shells out to `kubectl` for real `Workspace` and `Task` resources

Quick start:

```bash
cp examples/bridge.env.example .env
make test
```

See [docs/architecture.md](docs/architecture.md) for the integration shape and current boundaries.

94 changes: 94 additions & 0 deletions bridge/cmd/kelos-bridge/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"context"
"errors"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/openclaw/openclaw-kelos/bridge/internal/config"
"github.com/openclaw/openclaw-kelos/bridge/internal/httpapi"
"github.com/openclaw/openclaw-kelos/bridge/internal/kelos"
"github.com/openclaw/openclaw-kelos/bridge/internal/notifier"
"github.com/openclaw/openclaw-kelos/bridge/internal/service"
)

func main() {
cfg, err := config.LoadFromEnv()
if err != nil {
log.Fatalf("load config: %v", err)
}

backend, err := newBackend(cfg)
if err != nil {
log.Fatalf("create backend: %v", err)
}

var taskNotifier service.Notifier = notifier.NoopNotifier{}
if cfg.Notification.Endpoint != "" {
taskNotifier = notifier.NewHTTP(
cfg.Notification.Endpoint,
cfg.Notification.BearerToken,
cfg.Notification.Timeout,
)
}

svc := service.New(cfg, backend, taskNotifier)
handler := httpapi.NewHandler(svc, cfg.AuthToken)

server := &http.Server{
Addr: cfg.ListenAddr,
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

if cfg.PollInterval > 0 {
go pollLoop(ctx, svc, cfg.PollInterval)
}

go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()

log.Printf("kelos bridge listening on %s using %s backend", cfg.ListenAddr, cfg.Backend)
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("listen: %v", err)
}
}

func newBackend(cfg config.Config) (kelos.Backend, error) {
switch cfg.Backend {
case "memory":
return kelos.NewMemoryBackend(cfg.SeedWorkspaces...), nil
case "kubectl":
return kelos.NewKubectlBackend(cfg.Namespace, cfg.KubectlPath), nil
default:
return nil, errors.New("unsupported backend: " + cfg.Backend)
}
}

func pollLoop(ctx context.Context, svc *service.Service, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := svc.ReconcileNotifications(ctx); err != nil {
log.Printf("reconcile notifications: %v", err)
}
}
}
}
4 changes: 4 additions & 0 deletions bridge/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module github.com/openclaw/openclaw-kelos/bridge

go 1.26.0

191 changes: 191 additions & 0 deletions bridge/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package config

import (
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/openclaw/openclaw-kelos/bridge/internal/kelos"
)

type Config struct {
ListenAddr string
Namespace string
AuthToken string
Backend string
KubectlPath string
SeedWorkspaces []string
PollInterval time.Duration
AllowedWorkspaces map[string]struct{}
DefaultTask TaskDefaults
Presets map[string]Preset
Notification NotificationConfig
}

type TaskDefaults struct {
Type string
CredentialType kelos.CredentialType
CredentialSecret string
}

type Preset struct {
Type string `json:"type,omitempty"`
Model string `json:"model,omitempty"`
AgentConfig string `json:"agentConfig,omitempty"`
TimeoutSeconds int64 `json:"timeoutSeconds,omitempty"`
CredentialType kelos.CredentialType `json:"credentialType,omitempty"`
CredentialSecret string `json:"credentialSecret,omitempty"`
AllowedWorkspaces []string `json:"allowedWorkspaces,omitempty"`
}

type NotificationConfig struct {
Endpoint string
BearerToken string
Timeout time.Duration
}

func LoadFromEnv() (Config, error) {
pollInterval, err := durationEnv("BRIDGE_POLL_INTERVAL", 15*time.Second)
if err != nil {
return Config{}, err
}

notificationTimeout, err := durationEnv("BRIDGE_NOTIFICATION_TIMEOUT", 5*time.Second)
if err != nil {
return Config{}, err
}

credentialType, err := parseCredentialType(envOrDefault("BRIDGE_DEFAULT_CREDENTIAL_TYPE", string(kelos.CredentialTypeOAuth)))
if err != nil {
return Config{}, err
}

presets, err := parsePresets(os.Getenv("BRIDGE_PRESETS_JSON"))
if err != nil {
return Config{}, err
}

cfg := Config{
ListenAddr: envOrDefault("BRIDGE_LISTEN_ADDR", ":8080"),
Namespace: envOrDefault("BRIDGE_NAMESPACE", "default"),
AuthToken: strings.TrimSpace(os.Getenv("BRIDGE_AUTH_TOKEN")),
Backend: envOrDefault("BRIDGE_BACKEND", "memory"),
KubectlPath: envOrDefault("BRIDGE_KUBECTL_PATH", "kubectl"),
SeedWorkspaces: parseCSV(os.Getenv("BRIDGE_SEED_WORKSPACES")),
PollInterval: pollInterval,
AllowedWorkspaces: parseSet(os.Getenv("BRIDGE_ALLOWED_WORKSPACES")),
DefaultTask: TaskDefaults{
Type: envOrDefault("BRIDGE_DEFAULT_TASK_TYPE", "codex"),
CredentialType: credentialType,
CredentialSecret: envOrDefault("BRIDGE_DEFAULT_CREDENTIAL_SECRET", "kelos-credentials"),
},
Presets: presets,
Notification: NotificationConfig{
Endpoint: strings.TrimSpace(os.Getenv("BRIDGE_NOTIFICATION_ENDPOINT")),
BearerToken: strings.TrimSpace(os.Getenv("BRIDGE_NOTIFICATION_BEARER_TOKEN")),
Timeout: notificationTimeout,
},
}

if cfg.AuthToken == "" {
return Config{}, errors.New("BRIDGE_AUTH_TOKEN is required")
}

if cfg.DefaultTask.Type == "" {
return Config{}, errors.New("BRIDGE_DEFAULT_TASK_TYPE must not be empty")
}

if cfg.DefaultTask.CredentialSecret == "" {
return Config{}, errors.New("BRIDGE_DEFAULT_CREDENTIAL_SECRET must not be empty")
}

return cfg, nil
}

func envOrDefault(key, fallback string) string {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback
}
return value
}

func durationEnv(key string, fallback time.Duration) (time.Duration, error) {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback, nil
}

parsed, err := time.ParseDuration(value)
if err != nil {
return 0, fmt.Errorf("%s: %w", key, err)
}

return parsed, nil
}

func parsePresets(raw string) (map[string]Preset, error) {
if strings.TrimSpace(raw) == "" {
return map[string]Preset{}, nil
}

var presets map[string]Preset
if err := json.Unmarshal([]byte(raw), &presets); err != nil {
return nil, fmt.Errorf("BRIDGE_PRESETS_JSON: %w", err)
}

for name, preset := range presets {
credentialType, err := parseCredentialType(string(preset.CredentialType))
if err != nil {
return nil, fmt.Errorf("preset %q: %w", name, err)
}
preset.CredentialType = credentialType
presets[name] = preset
}

return presets, nil
}

func parseCredentialType(value string) (kelos.CredentialType, error) {
switch strings.TrimSpace(value) {
case "", string(kelos.CredentialTypeOAuth):
return kelos.CredentialTypeOAuth, nil
case string(kelos.CredentialTypeAPIKey):
return kelos.CredentialTypeAPIKey, nil
default:
return "", errors.New("credential type must be api-key or oauth")
}
}

func parseCSV(raw string) []string {
if strings.TrimSpace(raw) == "" {
return nil
}

parts := strings.Split(raw, ",")
values := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
values = append(values, part)
}
}

return values
}

func parseSet(raw string) map[string]struct{} {
values := parseCSV(raw)
if len(values) == 0 {
return map[string]struct{}{}
}

set := make(map[string]struct{}, len(values))
for _, value := range values {
set[value] = struct{}{}
}
return set
}
Loading