Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 18 additions & 23 deletions examples/substrate-openclaw/README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
# kagent agents and agentharness on substrate

Follow these instructions to install substrate on a kind cluster. This feature allows you to run AgentHarness (OpenClaw) and declerative Go agents in substrate.
# kagent agents and AgentHarness on Substrate

Follow these instructions to install Substrate on a kind cluster. This feature allows you to run AgentHarness (OpenClaw) and declarative Go SandboxAgents on Agent Substrate.

## 1. Install Substrate on your Kind cluster

This assumes you've configured kind cluster using `make create-kind-cluster`.
This assumes you've configured a kind cluster using `make create-kind-cluster`.

Create the substrate-values.yaml file:
Create a `substrate-values.yaml` file:

```yaml
atelet:
extraArgs:
- --localhost-registry-replacement=kind-registry:5000
```

Then install substrate and kagent:
Then install the Substrate platform and kagent:

```bash
export ATEOM_VERSION=v0.0.6
export ATEOM_VERSION=v0.0.7

helm upgrade --install substrate-crds \
Comment thread
supreme-gg-gg marked this conversation as resolved.
oci://ghcr.io/kagent-dev/substrate/helm/substrate-crds
Expand All @@ -36,15 +35,19 @@ make helm-install KAGENT_HELM_EXTRA_ARGS="\
--set substrateWorkerPool.ateomImage=ghcr.io/kagent-dev/substrate/ateom-gvisor:${ATEOM_VERSION}"
```

## kagent AgentHarness with substrate runtime
When `substrateWorkerPool.create=true`, the kagent chart installs a namespace-scoped `WorkerPool` with:

- `spec.sandboxClass: gvisor`
- label `kagent.dev/worker-pool: kagent-default` (matches generated `ActorTemplate` selectors)
- controller default `workerPool` name set to that pool when `create=true`

kagent generates a per-harness `ActorTemplate` and uses an existing `WorkerPool`.
## 2. AgentHarness with Substrate runtime

The generated `ActorTemplate` uses `controller.substrate.pauseImage`, `controller.substrate.runscAMD64URL`, `controller.substrate.runscAMD64SHA256`, `controller.substrate.runscARM64URL`, and `controller.substrate.runscARM64SHA256` from the Helm values Override them with `--set` or a values file when you need to pin a different gVisor build.
kagent generates a per-harness `ActorTemplate` and schedules actors onto an existing `WorkerPool`.

Create a harness. If `snapshotsConfig` is omitted, kagent defaults it to `gs://ate-snapshots/<namespace>/<agentharnessname>`.

- **Worker pool** — reference an existing pool (`workerPoolRef`) or configure a controller default WorkerPool
- **Worker pool** — reference an existing pool (`workerPoolRef`) or configure a controller default WorkerPool. The target pool must carry label `kagent.dev/worker-pool: <pool-name>`. The kagent Helm-managed pool gets this label automatically; externally owned pools must add it manually.

```yaml
apiVersion: kagent.dev/v1alpha2
Expand Down Expand Up @@ -83,23 +86,15 @@ metadata:
kagent.dev/agent-harness: peterj-claw
spec:
pauseImage: gcr.io/gke-release/pause@sha256:bcbd57ba5653580ec647b16d8163cdd1112df3609129b01f912a8032e48265da
runsc:
amd64:
url: gs://gvisor/releases/nightly/2026-06-02/x86_64/runsc
sha256Hash: efd12935f6654c91a1389710eb8dfa4d12b6b9be00db87526dc2eb584ad00119
arm64:
url: gs://gvisor/releases/nightly/2026-05-19/aarch64/runsc
sha256Hash: 1ba2366ae2efceba166046f51a4104f9261c9cb72c6db8f5b3fe2dc57dea86b9
workerPoolRef:
name: peterj-claw-wp
namespace: kagent
sandboxClass: gvisor
workerSelector:
matchLabels:
kagent.dev/worker-pool: kagent-default
snapshotsConfig:
location: gs://ate-snapshots/kagent/peterj-claw
containers:
- name: openclaw
image: ghcr.io/kagent-dev/nemoclaw/sandbox-base@sha256:d52bee415dc4c0dba7164f9eabe727574c056d4f211781f20af249707883a3b4
ports:
- containerPort: 80
command:
- /bin/sh
- -c
Expand Down
6 changes: 4 additions & 2 deletions go/api/httpapi/substrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type SubstrateActorTemplateEntry struct {
Phase string `json:"phase,omitempty"`
GoldenActorID string `json:"goldenActorId,omitempty"`
GoldenSnapshot string `json:"goldenSnapshot,omitempty"`
WorkerPoolRef string `json:"workerPoolRef,omitempty"`
SandboxClass string `json:"sandboxClass,omitempty"`
WorkerSelector string `json:"workerSelector,omitempty"`
HarnessName string `json:"harnessName,omitempty"`
ManagedByKagent bool `json:"managedByKagent"`
}
Expand All @@ -42,7 +43,8 @@ type SubstrateActorEntry struct {
AteomPodNamespace string `json:"ateomPodNamespace,omitempty"`
AteomPodName string `json:"ateomPodName,omitempty"`
AteomPodIP string `json:"ateomPodIp,omitempty"`
LastSnapshot string `json:"lastSnapshot,omitempty"`
LatestSnapshot string `json:"latestSnapshot,omitempty"`
WorkerPoolName string `json:"workerPoolName,omitempty"`
InProgressSnapshot string `json:"inProgressSnapshot,omitempty"`
Version int64 `json:"version,omitempty"`
}
Expand Down
61 changes: 58 additions & 3 deletions go/core/internal/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"strings"
"time"

atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
"github.com/hashicorp/go-multierror"
reconcilerutils "github.com/kagent-dev/kagent/go/core/internal/controller/reconciler/utils"
"github.com/kagent-dev/kagent/go/core/internal/controller/translator"
"github.com/kagent-dev/kagent/go/core/pkg/egress"
"github.com/kagent-dev/kagent/go/core/pkg/sandboxbackend"
"github.com/kagent-dev/kagent/go/core/pkg/sandboxbackend/substrate"
"github.com/kagent-dev/kmcp/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -150,11 +152,19 @@ func (a *kagentReconciler) ReconcileKagentSandboxAgent(ctx context.Context, req
}

err := a.reconcileSandboxAgent(ctx, sandboxAgent)
if errors.Is(err, substrate.ErrActorTemplateReconcilePending) {
// A spec-drift recreate is mid-flight; report not-ready without failing
// the resource and return the sentinel so the controller requeues.
if statusErr := a.reconcileSandboxAgentStatus(ctx, sandboxAgent, nil, true); statusErr != nil {
return statusErr
}
return err
}
if err != nil {
reconcileLog.Error(err, "failed to reconcile sandboxagent", "sandboxagent", req.NamespacedName)
}

return a.reconcileSandboxAgentStatus(ctx, sandboxAgent, err)
return a.reconcileSandboxAgentStatus(ctx, sandboxAgent, err, false)
}

func (a *kagentReconciler) handleDeletedAgentResource(ctx context.Context, req ctrl.Request, resourceName string) error {
Expand Down Expand Up @@ -239,14 +249,18 @@ func (a *kagentReconciler) reconcileSandboxAgent(ctx context.Context, sa *v1alph
})
}

func (a *kagentReconciler) reconcileSandboxAgentStatus(ctx context.Context, sa *v1alpha2.SandboxAgent, reconcileErr error) error {
func (a *kagentReconciler) reconcileSandboxAgentStatus(ctx context.Context, sa *v1alpha2.SandboxAgent, reconcileErr error, actorTemplatePending bool) error {
deployedCondition := metav1.Condition{
Type: v1alpha2.AgentConditionTypeReady,
Status: metav1.ConditionUnknown,
ObservedGeneration: sa.Generation,
}

if a.sandboxBackend == nil {
if actorTemplatePending {
deployedCondition.Status = metav1.ConditionFalse
deployedCondition.Reason = "ActorTemplateRecreating"
deployedCondition.Message = "waiting for ActorTemplate golden actor deletion and recreate"
} else if a.sandboxBackend == nil {
deployedCondition.Status = metav1.ConditionUnknown
deployedCondition.Reason = "SandboxBackendNotConfigured"
deployedCondition.Message = "Sandbox backend is not configured"
Expand Down Expand Up @@ -924,12 +938,31 @@ func (r *kagentReconciler) GetOwnedResourceTypes() []client.Object {
// Function initially copied from https://github.com/open-telemetry/opentelemetry-operator/blob/e6d96f006f05cff0bc3808da1af69b6b636fbe88/internal/controllers/common.go#L141-L192
func (a *kagentReconciler) reconcileDesiredObjects(ctx context.Context, owner metav1.Object, desiredObjects []client.Object, ownedObjects map[types.UID]client.Object) error {
var errs []error
actorTemplatePending := false
for _, desired := range desiredObjects {
l := reconcileLog.WithValues(
"object_name", desired.GetName(),
"object_kind", desired.GetObjectKind(),
)

// Substrate ActorTemplate.spec is immutable, delegate to the sandbox backend to handle spec drift.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should separate these controllers in a follow-up, this is too specific to combine with the existing behavior IMO

if _, ok := desired.(*atev1alpha1.ActorTemplate); ok {
if r, ok := a.sandboxBackend.(actorTemplateReconciler); ok {
if err := r.ReconcileActorTemplate(ctx, desired); err != nil {
if errors.Is(err, substrate.ErrActorTemplateReconcilePending) {
actorTemplatePending = true
pruneOwnedActorTemplate(ownedObjects, desired)
continue
}
l.Error(err, "failed to reconcile ActorTemplate")
errs = append(errs, err)
continue
}
pruneOwnedActorTemplate(ownedObjects, desired)
continue
}
}

// existing is an object the controller runtime will hydrate for us
// we obtain the existing object by deep copying the desired object because it's the most convenient way
existing := desired.DeepCopyObject().(client.Object)
Expand All @@ -951,6 +984,9 @@ func (a *kagentReconciler) reconcileDesiredObjects(ctx context.Context, owner me
if len(errs) > 0 {
return fmt.Errorf("failed to create objects for %s: %w", owner.GetName(), errors.Join(errs...))
}
if actorTemplatePending {
return substrate.ErrActorTemplateReconcilePending
}

// Pruning owned objects in the cluster which are not should not be present after the reconciliation.
err := a.deleteObjects(ctx, ownedObjects)
Expand All @@ -961,6 +997,25 @@ func (a *kagentReconciler) reconcileDesiredObjects(ctx context.Context, owner me
return nil
}

// actorTemplateReconciler is implemented by sandbox backends that own substrate
// ActorTemplate objects and need immutable-spec aware create/recreate semantics.
type actorTemplateReconciler interface {
ReconcileActorTemplate(ctx context.Context, desired client.Object) error
}

// pruneOwnedActorTemplate removes the live ActorTemplate matching desired from the
// prune set so it is not garbage collected.
func pruneOwnedActorTemplate(owned map[types.UID]client.Object, desired client.Object) {
for uid, obj := range owned {
if _, ok := obj.(*atev1alpha1.ActorTemplate); !ok {
continue
}
if obj.GetName() == desired.GetName() && obj.GetNamespace() == desired.GetNamespace() {
delete(owned, uid)
}
}
}

// modified version of controllerutil.CreateOrUpdate to support proto based objects like istio
func createOrUpdate(ctx context.Context, c client.Client, obj client.Object, f controllerutil.MutateFn) (controllerutil.OperationResult, error) {
key := client.ObjectKeyFromObject(obj)
Expand Down
4 changes: 4 additions & 0 deletions go/core/internal/controller/sandboxagent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"errors"
"fmt"
"reflect"

Expand Down Expand Up @@ -86,6 +87,9 @@ func (r *SandboxAgentController) Reconcile(ctx context.Context, req ctrl.Request
}

if err := r.Reconciler.ReconcileKagentSandboxAgent(ctx, req); err != nil {
if errors.Is(err, substrate.ErrActorTemplateReconcilePending) {
return ctrl.Result{RequeueAfter: agentHarnessNotReadyRequeue}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
Expand Down
41 changes: 33 additions & 8 deletions go/core/internal/httpserver/handlers/substrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/kagent-dev/kagent/go/core/internal/httpserver/errors"
"github.com/kagent-dev/kagent/go/core/pkg/auth"
"github.com/kagent-dev/kagent/go/core/pkg/sandboxbackend/substrate"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilvalidation "k8s.io/apimachinery/pkg/util/validation"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -137,20 +138,15 @@ func (h *SubstrateHandler) listSubstrateCRs(ctx context.Context, namespace strin
Phase: string(tmpl.Status.Phase),
GoldenActorID: tmpl.Status.GoldenActorID,
GoldenSnapshot: tmpl.Status.GoldenSnapshot,
SandboxClass: string(tmpl.Spec.SandboxClass),
WorkerSelector: labelSelectorString(ctx, tmpl.Spec.WorkerSelector),
ManagedByKagent: tmpl.Labels["app.kubernetes.io/managed-by"] == "kagent",
}
if harness := strings.TrimSpace(tmpl.Labels[substrate.HarnessLabelKey]); harness != "" {
entry.HarnessName = harness
} else if agentName := substrate.SandboxAgentNameFromLabels(tmpl.Labels); agentName != "" {
entry.HarnessName = agentName
}
if ref := tmpl.Spec.WorkerPoolRef; ref.Name != "" {
wpNS := ref.Namespace
if wpNS == "" {
wpNS = tmpl.Namespace
}
entry.WorkerPoolRef = wpNS + "/" + ref.Name
}
templates = append(templates, entry)
}

Expand Down Expand Up @@ -215,12 +211,41 @@ func actorEntryFromPB(a *ateapipb.Actor) api.SubstrateActorEntry {
AteomPodNamespace: a.GetAteomPodNamespace(),
AteomPodName: a.GetAteomPodName(),
AteomPodIP: a.GetAteomPodIp(),
LastSnapshot: a.GetLastSnapshot(),
LatestSnapshot: snapshotInfoString(a.GetLatestSnapshotInfo()),
WorkerPoolName: a.GetWorkerPoolName(),
InProgressSnapshot: a.GetInProgressSnapshot(),
Version: a.GetVersion(),
}
}

// snapshotInfoString renders a SnapshotInfo as a single location string.
func snapshotInfoString(s *ateapipb.SnapshotInfo) string {
if s == nil {
return ""
}
if ext := s.GetExternal(); ext != nil {
return ext.GetSnapshotUriPrefix()
}
if loc := s.GetLocal(); loc != nil {
return loc.GetSnapshotPrefix()
}
return ""
}

// labelSelectorString renders a metav1.LabelSelector as a compact human-readable
// string (e.g. "kagent.dev/worker-pool=kagent-default") for UI display.
func labelSelectorString(ctx context.Context, sel *metav1.LabelSelector) string {
if sel == nil {
return ""
}
s, err := metav1.LabelSelectorAsSelector(sel)
if err != nil {
ctrllog.FromContext(ctx).Info("invalid ActorTemplate workerSelector", "error", err)
return "<invalid selector>"
}
return s.String()
}

func workerEntryFromPB(w *ateapipb.Worker) api.SubstrateWorkerEntry {
return api.SubstrateWorkerEntry{
WorkerNamespace: w.GetWorkerNamespace(),
Expand Down
8 changes: 6 additions & 2 deletions go/core/internal/httpserver/handlers/substrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/kagent-dev/kagent/go/core/pkg/sandboxbackend/substrate"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -96,7 +95,10 @@ func TestHandleGetSubstrateStatus(t *testing.T) {
},
},
Spec: atev1alpha1.ActorTemplateSpec{
WorkerPoolRef: corev1.ObjectReference{Name: "default-wp", Namespace: "kagent"},
SandboxClass: atev1alpha1.SandboxClassGvisor,
WorkerSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{substrate.WorkerPoolLabelKey: "default-wp"},
},
},
Status: atev1alpha1.ActorTemplateStatus{Phase: atev1alpha1.PhaseReady, GoldenActorID: "golden-1"},
},
Expand Down Expand Up @@ -135,6 +137,8 @@ func TestHandleGetSubstrateStatus(t *testing.T) {
require.Equal(t, "Ready", wrapped.Data.ActorTemplates[0].Phase)
require.True(t, wrapped.Data.ActorTemplates[0].ManagedByKagent)
require.Equal(t, "my-claw", wrapped.Data.ActorTemplates[0].HarnessName)
require.Equal(t, "gvisor", wrapped.Data.ActorTemplates[0].SandboxClass)
require.Equal(t, substrate.WorkerPoolLabelKey+"=default-wp", wrapped.Data.ActorTemplates[0].WorkerSelector)
require.Len(t, wrapped.Data.Actors, 1)
require.Equal(t, "Running", wrapped.Data.Actors[0].Status)
require.Len(t, wrapped.Data.Workers, 1)
Expand Down
15 changes: 1 addition & 14 deletions go/core/pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ type Config struct {
DefaultWorkerPoolNamespace string
DefaultWorkerPoolName string
PauseImage string
RunscAMD64URL string
RunscAMD64SHA256 string
RunscARM64URL string
RunscARM64SHA256 string
}
}

Expand Down Expand Up @@ -215,11 +211,6 @@ func (cfg *Config) SetFlags(commandLine *flag.FlagSet) {
commandLine.StringVar(&cfg.Substrate.DefaultWorkerPoolNamespace, "substrate-default-workerpool-namespace", kagentNamespace, "Default Agent Substrate WorkerPool namespace when spec.substrate.workerPoolRef is unset.")
commandLine.StringVar(&cfg.Substrate.DefaultWorkerPoolName, "substrate-default-workerpool-name", "", "Default Agent Substrate WorkerPool name when spec.substrate.workerPoolRef is unset.")
commandLine.StringVar(&cfg.Substrate.PauseImage, "substrate-pause-image", "gcr.io/gke-release/pause@sha256:bcbd57ba5653580ec647b16d8163cdd1112df3609129b01f912a8032e48265da", "Pause image for generated ActorTemplates.")
// Please note: the 2026-06-13 nightly breaks checkpoint, so don't jump straight to the latest. See https://github.com/kagent-dev/kagent/pull/2035.
commandLine.StringVar(&cfg.Substrate.RunscAMD64URL, "substrate-runsc-amd64-url", "gs://gvisor/releases/nightly/2026-06-02/x86_64/runsc", "gVisor runsc URL for amd64.")
commandLine.StringVar(&cfg.Substrate.RunscAMD64SHA256, "substrate-runsc-amd64-sha256", "efd12935f6654c91a1389710eb8dfa4d12b6b9be00db87526dc2eb584ad00119", "gVisor runsc sha256 for amd64.")
commandLine.StringVar(&cfg.Substrate.RunscARM64URL, "substrate-runsc-arm64-url", "gs://gvisor/releases/nightly/2026-05-19/aarch64/runsc", "gVisor runsc URL for arm64.")
commandLine.StringVar(&cfg.Substrate.RunscARM64SHA256, "substrate-runsc-arm64-sha256", "1ba2366ae2efceba166046f51a4104f9261c9cb72c6db8f5b3fe2dc57dea86b9", "gVisor runsc sha256 for arm64.")
commandLine.StringVar(&agent_translator.DefaultServiceAccountName, "default-service-account-name", "", "Global default ServiceAccount name for agent pods. When set, agents without an explicit serviceAccountName will use this instead of creating a per-agent ServiceAccount.")

commandLine.Var(&MapValue{Target: &agent_translator.DefaultAgentPodLabels}, "default-agent-pod-labels", "Comma-separated key=value pairs of labels to apply to all agent pod templates (e.g. 'team=platform,env=prod'). Per-agent labels take precedence.")
Expand Down Expand Up @@ -805,11 +796,7 @@ func substrateAppConfig(cfg *Config) substrate.Config {

func substrateLifecycleFromConfig(kubeClient client.Client, cfg *Config, ate *substrate.Client) *substrate.Lifecycle {
return substrate.NewLifecycle(kubeClient, substrate.LifecycleDefaults{
PauseImage: cfg.Substrate.PauseImage,
RunscAMD64URL: cfg.Substrate.RunscAMD64URL,
RunscAMD64SHA256: cfg.Substrate.RunscAMD64SHA256,
RunscARM64URL: cfg.Substrate.RunscARM64URL,
RunscARM64SHA256: cfg.Substrate.RunscARM64SHA256,
PauseImage: cfg.Substrate.PauseImage,
// ImageRegistry/ImageRepository mirror the declarative-agent image config
// (--image-registry/--image-repository) so digest-pinned acp-sandbox
// workload images resolve against the same (possibly private/mirrored)
Expand Down
5 changes: 4 additions & 1 deletion go/core/pkg/sandboxbackend/substrate/agent_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func (b *SandboxAgentActorBackend) EnsureSessionActor(ctx context.Context, sa *v

switch actor.GetStatus() {
case ateapipb.Actor_STATUS_RUNNING, ateapipb.Actor_STATUS_RESUMING:
case ateapipb.Actor_STATUS_SUSPENDED, ateapipb.Actor_STATUS_UNSPECIFIED:
case ateapipb.Actor_STATUS_SUSPENDED, ateapipb.Actor_STATUS_UNSPECIFIED,
ateapipb.Actor_STATUS_PAUSED, ateapipb.Actor_STATUS_PAUSING:
// PAUSED/PAUSING keep a node-local snapshot; ResumeActor brings them back
// the same as a suspended actor.
_, err = b.client.ResumeActor(ctx, actorID)
if err != nil {
return sandboxbackend.EnsureResult{}, wrapResumeActorError(actorID, err)
Expand Down
Loading
Loading