Skip to content

Commit

Permalink
Implement a CSI for kubernetes that prepopulates a mounted emptyDir with
Browse files Browse the repository at this point in the history
a cache
  • Loading branch information
airhorns authored and angelini committed May 8, 2024
1 parent 7b769bd commit 3d9d580
Show file tree
Hide file tree
Showing 10 changed files with 634 additions and 41 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ cached: export DL_TOKEN=$(DEV_SHARED_READER_TOKEN)
cached: internal/pb/cache.pb.go internal/pb/cache_grpc.pb.go
go run cmd/cached/main.go --upstream-host $(GRPC_HOST) --upstream-port $(GRPC_PORT) --port $(GRPC_CACHED_PORT) --staging-path tmp/cache-stage

cached-csi: export DL_ENV=dev
cached-csi: export DL_TOKEN=$(DEV_SHARED_READER_TOKEN)
cached-csi: internal/pb/cache.pb.go internal/pb/cache_grpc.pb.go
go run cmd/cached/main.go --upstream-host $(GRPC_HOST) --upstream-port $(GRPC_PORT) --staging-path tmp/cache-stage --csi-socket unix://tmp/csi.sock

client-update: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-update: export DL_SKIP_SSL_VERIFICATION=1
client-update:
Expand Down
22 changes: 16 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ module github.com/gadget-inc/dateilager
go 1.22

require (
github.com/container-storage-interface/spec v1.9.0
github.com/dgraph-io/ristretto v0.1.1
github.com/gadget-inc/fsdiff v0.4.4
github.com/gobwas/glob v0.2.3
github.com/golang/protobuf v1.5.4
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/jackc/pgx/v5 v5.5.0
github.com/jackc/puddle/v2 v2.2.1
github.com/klauspost/compress v1.16.5
github.com/kubernetes-csi/csi-test v2.2.0+incompatible
github.com/minio/sha256-simd v1.0.0
github.com/o1egl/paseto v1.0.0
github.com/spf13/cobra v1.6.0
Expand All @@ -23,9 +26,9 @@ require (
go.uber.org/zap v1.23.0
golang.org/x/oauth2 v0.15.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
golang.org/x/sys v0.19.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
google.golang.org/protobuf v1.33.0
)

require (
Expand All @@ -40,16 +43,21 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/glog v1.1.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/klauspost/cpuid/v2 v2.1.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.33.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
Expand All @@ -60,12 +68,14 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
73 changes: 59 additions & 14 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions internal/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
DurationMS = DurationKey("dl.duration_ms")
CloneToProject = Int64Key("dl.clone_to_project")
CachePath = StringKey("dl.cache_path")
VolumeID = StringKey("dl.volume_id")
TargetPath = StringKey("dl.target_path")
)

var (
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/gadget-inc/dateilager/internal/files"
"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/logger"
Expand All @@ -20,6 +21,9 @@ import (

type Cached struct {
pb.UnimplementedCacheServer
csi.UnimplementedIdentityServer
csi.UnimplementedNodeServer

Client *client.Client
StagingPath string
// the current directory holding a fully formed downloaded cache
Expand Down
209 changes: 209 additions & 0 deletions pkg/api/cachedcsi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package api

import (
"context"
"fmt"
"math"
"os"
"path"
"path/filepath"
"syscall"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/pkg/version"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
DriverName = "com.gadget.dateilager.cached"
)

// GetPluginInfo returns metadata of the plugin
func (c *Cached) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
resp := &csi.GetPluginInfoResponse{
Name: DriverName,
VendorVersion: version.Version,
}

return resp, nil
}

// GetPluginCapabilities returns available capabilities of the plugin
func (c *Cached) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
resp := &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{},
}

return resp, nil
}

// Probe returns the health and readiness of the plugin
func (c *Cached) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
ready := true
if c.currentDir == "" {
ready = false
logger.Warn(ctx, "csi probe failed as daemon hasn't prepared cache yet", key.Version.Field(c.currentVersion))
}

return &csi.ProbeResponse{
Ready: &wrappers.BoolValue{
Value: ready,
},
}, nil
}

// NodeGetCapabilities returns the supported capabilities of the node server
// this driver has no capabilities like expansion or staging, because we only use it for node local volumes
func (c *Cached) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
nscaps := []*csi.NodeServiceCapability{}

return &csi.NodeGetCapabilitiesResponse{
Capabilities: nscaps,
}, nil
}

// NodeGetInfo returns the supported capabilities of the node server. This
// Usually, a CSI driver would return some interesting stuff about the node here for the controller to use to place volumes, but because we're only supporting node local volumes, we return something very basic
func (c *Cached) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: first(os.Getenv("NODE_NAME"), "dev"),
MaxVolumesPerNode: 110,
}, nil
}

func (c *Cached) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume ID must be provided")
}

if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Target Path must be provided")
}

if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
}

targetPath := req.GetTargetPath()
volumeID := req.GetVolumeId()
volumeAttributes := req.GetVolumeContext()

var cachePath string
var targetPermissions os.FileMode

if suffix, exists := volumeAttributes["placeCacheAtPath"]; exists {
// running in suffix mode, desired outcome:
// - the mount point is writable by the pod
// - the cache is mounted at the suffix, and is not writable
cachePath = path.Join(targetPath, suffix)
targetPermissions = 0777
} else {
// running in unsuffixed mode, desired outcome:
// - the mount point *is* the cache, and is not writable by the pod
cachePath = targetPath
targetPermissions = 0755
}

if err := os.MkdirAll(targetPath, targetPermissions); err != nil {
return nil, fmt.Errorf("failed to create target directory %s: %s", targetPath, err)
}

if err := os.Chmod(targetPath, targetPermissions); err != nil {
return nil, fmt.Errorf("failed to change ownership of target directory %s: %s", targetPath, err)
}

version, err := c.WriteCache(cachePath)
if err != nil {
return nil, err
}

logger.Info(ctx, "volume published", key.VolumeID.Field(volumeID), key.TargetPath.Field(targetPath), key.Version.Field(version))

return &csi.NodePublishVolumeResponse{}, nil
}

func (s *Cached) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Volume ID must be provided")
}

if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
}

targetPath := req.GetTargetPath()

// Clean up directory
if err := os.RemoveAll(targetPath); err != nil {
return nil, fmt.Errorf("failed to remove directory %s: %s", targetPath, err)
}

logger.Info(ctx, "volume unpublished and data removed", key.TargetPath.Field(targetPath))
return &csi.NodeUnpublishVolumeResponse{}, nil
}

// NodeGetVolumeStats returns the volume capacity statistics available for the given volume.
func (c *Cached) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats Volume ID must be provided")
}

volumePath := req.VolumePath
if volumePath == "" {
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats Volume Path must be provided")
}

usedBytes, err := getFolderSize(volumePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve used size statistics for volume path %s: %v", volumePath, err)
}

var stat syscall.Statfs_t
err = syscall.Statfs(volumePath, &stat)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve total size statistics for volume path %s: %v", volumePath, err)
}

// Calculate free space in bytes
freeBytes := stat.Bavail * uint64(stat.Bsize)
if freeBytes > math.MaxInt64 {
return nil, status.Errorf(codes.Internal, "total size statistics for volume path too big for int64: %d", freeBytes)
}
signedFreeBytes := int64(freeBytes)

return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Available: signedFreeBytes,
Total: signedFreeBytes + usedBytes,
Used: usedBytes,
Unit: csi.VolumeUsage_BYTES,
},
},
}, nil
}

func first(one, two string) string {
if one == "" {
return two
}
return one
}

func getFolderSize(path string) (int64, error) {
var totalSize int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
totalSize += info.Size()
}
return nil
})
return totalSize, err
}
Loading

0 comments on commit 3d9d580

Please sign in to comment.