Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ run:
gen:
go generate -v ./...

.PHONY: build
build:
go build -tags="nomsgpack,remote,exclude_graphdriver_btrfs,containers_image_openpgp" -v ./cmd/sablier

Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ module github.com/sablierapp/sablier
go 1.25.4

require (
github.com/containers/image/v5 v5.36.2
github.com/containers/podman/v5 v5.6.2
github.com/docker/docker v28.5.2+incompatible
github.com/gin-gonic/gin v1.11.0
github.com/google/go-cmp v0.7.0
github.com/lmittmann/tint v1.1.2
github.com/moby/moby/api v1.52.0
github.com/moby/moby/client v0.1.0
github.com/neilotoole/slogt v1.1.0
github.com/pkg/errors v0.9.1
github.com/samber/slog-gin v1.18.0
Expand All @@ -16,7 +18,7 @@ require (
github.com/spf13/viper v1.21.0
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.40.0
github.com/testcontainers/testcontainers-go/modules/dind v0.40.0 // Waiting for fix to be released on v0.36.1
github.com/testcontainers/testcontainers-go/modules/dind v0.40.0
github.com/testcontainers/testcontainers-go/modules/k3s v0.40.0
github.com/testcontainers/testcontainers-go/modules/valkey v0.40.0
github.com/tniswong/go.rfcx v0.0.0-20181019234604-07783c52761f
Expand All @@ -29,8 +31,6 @@ require (
k8s.io/client-go v0.34.2
)

require github.com/containers/image/v5 v5.36.2

require (
dario.cat/mergo v1.0.2 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
Expand Down Expand Up @@ -79,6 +79,7 @@ require (
github.com/disiqueira/gotree/v3 v3.0.2 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker v28.5.1+incompatible // indirect
github.com/docker/docker-credential-helpers v0.9.3 // indirect
github.com/docker/go-connections v0.6.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand Down
258 changes: 254 additions & 4 deletions go.sum

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions pkg/provider/docker/container_inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@ import (
"fmt"
"log/slog"

"github.com/docker/docker/api/types/container"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/client"
"github.com/sablierapp/sablier/pkg/sablier"
)

func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.InstanceInfo, error) {
spec, err := p.Client.ContainerInspect(ctx, name)
spec, err := p.Client.ContainerInspect(ctx, name, client.ContainerInspectOptions{})
if err != nil {
return sablier.InstanceInfo{}, fmt.Errorf("cannot inspect container: %w", err)
}

p.l.DebugContext(ctx, "container inspected", slog.String("container", name), slog.String("status", spec.State.Status), slog.String("health", healthStatus(spec.State.Health)))
p.l.DebugContext(ctx, "container inspected", slog.String("container", name), slog.String("status", string(spec.Container.State.Status)), slog.String("health", healthStatus(spec.Container.State.Health)))

// "created", "running", "paused", "restarting", "removing", "exited", or "dead"
switch spec.State.Status {
switch spec.Container.State.Status {
case "created", "paused", "restarting", "removing":
return sablier.NotReadyInstanceState(name, 0, p.desiredReplicas), nil
case "running":
if spec.State.Health != nil {
if spec.Container.State.Health != nil {
// // "starting", "healthy" or "unhealthy"
switch spec.State.Health.Status {
switch spec.Container.State.Health.Status {
case "healthy":
return sablier.ReadyInstanceState(name, p.desiredReplicas), nil
case "unhealthy":
Expand All @@ -36,14 +37,14 @@ func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.In
p.l.WarnContext(ctx, "container running without healthcheck, you should define a healthcheck on your container so that Sablier properly detects when the container is ready to handle requests.", slog.String("container", name))
return sablier.ReadyInstanceState(name, p.desiredReplicas), nil
case "exited":
if spec.State.ExitCode != 0 {
return sablier.UnrecoverableInstanceState(name, fmt.Sprintf("container exited with code \"%d\"", spec.State.ExitCode), p.desiredReplicas), nil
if spec.Container.State.ExitCode != 0 {
return sablier.UnrecoverableInstanceState(name, fmt.Sprintf("container exited with code \"%d\"", spec.Container.State.ExitCode), p.desiredReplicas), nil
}
return sablier.NotReadyInstanceState(name, 0, p.desiredReplicas), nil
case "dead":
return sablier.UnrecoverableInstanceState(name, "container in \"dead\" state cannot be restarted", p.desiredReplicas), nil
default:
return sablier.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), p.desiredReplicas), nil
return sablier.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.Container.State.Status), p.desiredReplicas), nil
}
}

Expand All @@ -52,5 +53,5 @@ func healthStatus(health *container.Health) string {
return "no healthcheck defined"
}

return health.Status
return string(health.Status)
}
2 changes: 1 addition & 1 deletion pkg/provider/docker/container_inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"testing"
"time"

"github.com/docker/docker/api/types/container"
"github.com/google/go-cmp/cmp"
"github.com/moby/moby/api/types/container"
"github.com/neilotoole/slogt"
"github.com/sablierapp/sablier/pkg/provider/docker"
"github.com/sablierapp/sablier/pkg/sablier"
Expand Down Expand Up @@ -58,7 +58,7 @@
return "", err
}

return c.ID, dind.client.ContainerStart(ctx, c.ID, container.StartOptions{})

Check failure on line 61 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: container.StartOptions

Check failure on line 61 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

multiple-value dind.client.ContainerStart(ctx, c.ID, container.StartOptions{}) (value of type ("github.com/moby/moby/client".ContainerStartResult, error)) in single-value context

Check failure on line 61 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

undefined: container.StartOptions
},
},
want: sablier.InstanceInfo{
Expand Down Expand Up @@ -88,7 +88,7 @@
return "", err
}

return c.ID, dind.client.ContainerStart(ctx, c.ID, container.StartOptions{})

Check failure on line 91 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: container.StartOptions

Check failure on line 91 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

multiple-value dind.client.ContainerStart(ctx, c.ID, container.StartOptions{}) (value of type ("github.com/moby/moby/client".ContainerStartResult, error)) in single-value context

Check failure on line 91 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

undefined: container.StartOptions

Check failure on line 91 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

multiple-value dind.client.ContainerStart(ctx, c.ID, container.StartOptions{}) (value of type ("github.com/moby/moby/client".ContainerStartResult, error)) in single-value context
},
},
want: sablier.InstanceInfo{
Expand Down Expand Up @@ -117,7 +117,7 @@
return "", err
}

err = dind.client.ContainerStart(ctx, c.ID, container.StartOptions{})

Check failure on line 120 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: container.StartOptions

Check failure on line 120 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

assignment mismatch: 1 variable but dind.client.ContainerStart returns 2 values

Check failure on line 120 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

undefined: container.StartOptions

Check failure on line 120 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

assignment mismatch: 1 variable but dind.client.ContainerStart returns 2 values
if err != nil {
return "", err
}
Expand Down Expand Up @@ -154,7 +154,7 @@
return "", err
}

err = dind.client.ContainerStart(ctx, c.ID, container.StartOptions{})

Check failure on line 157 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: container.StartOptions

Check failure on line 157 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

assignment mismatch: 1 variable but dind.client.ContainerStart returns 2 values

Check failure on line 157 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

undefined: container.StartOptions

Check failure on line 157 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

assignment mismatch: 1 variable but dind.client.ContainerStart returns 2 values
if err != nil {
return "", err
}
Expand All @@ -180,7 +180,7 @@
return "", err
}

err = dind.client.ContainerStart(ctx, c.ID, container.StartOptions{})

Check failure on line 183 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: container.StartOptions

Check failure on line 183 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / lint

assignment mismatch: 1 variable but dind.client.ContainerStart returns 2 values

Check failure on line 183 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

undefined: container.StartOptions

Check failure on line 183 in pkg/provider/docker/container_inspect_test.go

View workflow job for this annotation

GitHub Actions / Build

assignment mismatch: 1 variable but dind.client.ContainerStart returns 2 values
if err != nil {
return "", err
}
Expand Down
34 changes: 17 additions & 17 deletions pkg/provider/docker/container_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@ import (
"log/slog"
"strings"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/client"
"github.com/sablierapp/sablier/pkg/provider"
"github.com/sablierapp/sablier/pkg/sablier"
)

func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceListOptions) ([]sablier.InstanceConfiguration, error) {
args := filters.NewArgs()
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
filters := client.Filters{}
filters.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))

p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", options.All), slog.Any("filters", args)))
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", options.All), slog.Any("filters", filters)))
containers, err := p.Client.ContainerList(ctx, client.ContainerListOptions{
All: options.All,
Filters: args,
Filters: filters,
})
if err != nil {
return nil, fmt.Errorf("cannot list containers: %w", err)
}

p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(containers)))
p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(containers.Items)))

instances := make([]sablier.InstanceConfiguration, 0, len(containers))
for _, c := range containers {
instances := make([]sablier.InstanceConfiguration, 0, len(containers.Items))
for _, c := range containers.Items {
instance := containerToInstance(c)
instances = append(instances, instance)
}
Expand All @@ -54,23 +54,23 @@ func containerToInstance(c container.Summary) sablier.InstanceConfiguration {
}

func (p *Provider) InstanceGroups(ctx context.Context) (map[string][]string, error) {
args := filters.NewArgs()
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
filters := client.Filters{}
filters.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))

p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", true), slog.Any("filters", args)))
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", true), slog.Any("filters", filters)))
containers, err := p.Client.ContainerList(ctx, client.ContainerListOptions{
All: true,
Filters: args,
Filters: filters,
})

if err != nil {
return nil, fmt.Errorf("cannot list containers: %w", err)
}

p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(containers)))
p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(containers.Items)))

groups := make(map[string][]string)
for _, c := range containers {
for _, c := range containers.Items {
groupName := c.Labels["sablier.group"]
if len(groupName) == 0 {
groupName = "default"
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/docker/container_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"fmt"
"log/slog"

"github.com/docker/docker/api/types/container"
"github.com/moby/moby/client"
)

func (p *Provider) InstanceStart(ctx context.Context, name string) error {
// TODO: InstanceStart should block until the container is ready.
p.l.DebugContext(ctx, "starting container", "name", name)
err := p.Client.ContainerStart(ctx, name, container.StartOptions{})
_, err := p.Client.ContainerStart(ctx, name, client.ContainerStartOptions{})
if err != nil {
p.l.ErrorContext(ctx, "cannot start container", slog.String("name", name), slog.Any("error", err))
return fmt.Errorf("cannot start container %s: %w", name, err)
Expand Down
13 changes: 8 additions & 5 deletions pkg/provider/docker/container_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ import (
"fmt"
"log/slog"

"github.com/docker/docker/api/types/container"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/client"
)

func (p *Provider) InstanceStop(ctx context.Context, name string) error {
p.l.DebugContext(ctx, "stopping container", slog.String("name", name))
err := p.Client.ContainerStop(ctx, name, container.StopOptions{})
_, err := p.Client.ContainerStop(ctx, name, client.ContainerStopOptions{})
if err != nil {
p.l.ErrorContext(ctx, "cannot stop container", slog.String("name", name), slog.Any("error", err))
return fmt.Errorf("cannot stop container %s: %w", name, err)
}

p.l.DebugContext(ctx, "waiting for container to stop", slog.String("name", name))
waitC, errC := p.Client.ContainerWait(ctx, name, container.WaitConditionNotRunning)
result := p.Client.ContainerWait(ctx, name, client.ContainerWaitOptions{
Condition: container.WaitConditionNotRunning,
})
select {
case response := <-waitC:
case response := <-result.Result:
p.l.DebugContext(ctx, "container stopped", slog.String("name", name), slog.Int64("exit_code", response.StatusCode))
return nil
case err := <-errC:
case err := <-result.Error:
p.l.ErrorContext(ctx, "cannot wait for container to stop", slog.String("name", name), slog.Any("error", err))
return fmt.Errorf("cannot wait for container %s to stop: %w", name, err)
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/docker/container_stop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"testing"

"github.com/docker/docker/api/types/container"
"github.com/moby/moby/api/types/container"
"github.com/neilotoole/slogt"
"github.com/sablierapp/sablier/pkg/provider/docker"
"gotest.tools/v3/assert"
Expand Down
7 changes: 4 additions & 3 deletions pkg/provider/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package docker
import (
"context"
"fmt"
"github.com/docker/docker/client"
"github.com/sablierapp/sablier/pkg/sablier"
"log/slog"

"github.com/moby/moby/client"
"github.com/sablierapp/sablier/pkg/sablier"
)

// Interface guard
Expand All @@ -20,7 +21,7 @@ type Provider struct {
func New(ctx context.Context, cli *client.Client, logger *slog.Logger) (*Provider, error) {
logger = logger.With(slog.String("provider", "docker"))

serverVersion, err := cli.ServerVersion(ctx)
serverVersion, err := cli.ServerVersion(ctx, client.ServerVersionOptions{})
if err != nil {
return nil, fmt.Errorf("cannot connect to docker host: %v", err)
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/provider/docker/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import (
"log/slog"
"strings"

"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/moby/moby/api/types/events"
"github.com/moby/moby/client"
)

func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
msgs, errs := p.Client.Events(ctx, events.ListOptions{
Filters: filters.NewArgs(
filters.Arg("scope", "local"),
filters.Arg("type", string(events.ContainerEventType)),
filters.Arg("event", "die"),
),
filters := client.Filters{}
filters.Add("scope", "local")
filters.Add("type", string(events.ContainerEventType))
filters.Add("event", "die")
result := p.Client.Events(ctx, client.EventsListOptions{
Filters: filters,
})
for {
select {
case msg, ok := <-msgs:
case msg, ok := <-result.Messages:
if !ok {
p.l.ErrorContext(ctx, "event stream closed")
close(instance)
Expand All @@ -30,7 +30,7 @@ func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- st
// Send the container that has died to the channel
p.l.DebugContext(ctx, "event received", "event", msg)
instance <- strings.TrimPrefix(msg.Actor.Attributes["name"], "/")
case err, ok := <-errs:
case err, ok := <-result.Err:
if !ok {
p.l.ErrorContext(ctx, "event stream closed")
close(instance)
Expand Down
7 changes: 4 additions & 3 deletions pkg/provider/docker/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package docker_test

import (
"context"
"github.com/docker/docker/api/types/container"
"testing"
"time"

"github.com/moby/moby/api/types/container"
"github.com/neilotoole/slogt"
"github.com/sablierapp/sablier/pkg/provider/docker"
"gotest.tools/v3/assert"
"testing"
"time"
)

func TestDockerClassicProvider_NotifyInstanceStopped(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/provider/docker/testcontainers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package docker_test

import (
"context"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"testing"

"github.com/moby/moby/api/types/container"
"github.com/moby/moby/client"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/dind"
"gotest.tools/v3/assert"
"testing"
)

type dindContainer struct {
Expand Down
11 changes: 6 additions & 5 deletions pkg/provider/dockerswarm/docker_swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import (
"log/slog"
"strings"

"github.com/docker/docker/api/types/swarm"
"github.com/moby/moby/client"
"github.com/sablierapp/sablier/pkg/sablier"

"github.com/docker/docker/client"
)

// Interface guard
Expand All @@ -26,7 +24,7 @@ type Provider struct {
func New(ctx context.Context, cli *client.Client, logger *slog.Logger) (*Provider, error) {
logger = logger.With(slog.String("provider", "swarm"))

serverVersion, err := cli.ServerVersion(ctx)
serverVersion, err := cli.ServerVersion(ctx, client.ServerVersionOptions{})
if err != nil {
return nil, fmt.Errorf("cannot connect to docker host: %w", err)
}
Expand Down Expand Up @@ -57,7 +55,10 @@ func (p *Provider) ServiceUpdateReplicas(ctx context.Context, name string, repli
p.l.DebugContext(ctx, "scaling service", "name", name, "current_replicas", service.Spec.Mode.Replicated.Replicas, "desired_replicas", p.desiredReplicas)
service.Spec.Mode.Replicated.Replicas = &replicas

response, err := p.Client.ServiceUpdate(ctx, service.ID, service.Version, service.Spec, swarm.ServiceUpdateOptions{})
response, err := p.Client.ServiceUpdate(ctx, service.ID, client.ServiceUpdateOptions{
Version: service.Version,
Spec: service.Spec,
})
if err != nil {
return fmt.Errorf("cannot update service: %w", err)
}
Expand Down
Loading
Loading