Skip to content
140 changes: 117 additions & 23 deletions cmd/go-cache-plugin/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,38 @@ import (
"syscall"
"time"

"github.com/grafana/go-cache-plugin/lib/otel"

"github.com/creachadair/command"
"github.com/creachadair/gocache"
"github.com/creachadair/taskgroup"
)

var flags struct {
CacheDir string `flag:"cache-dir,default=$GOCACHE_DIR,Local cache directory (required)"`
S3Bucket string `flag:"bucket,default=$GOCACHE_S3_BUCKET,S3 bucket name (required)"`
S3Region string `flag:"region,default=$GOCACHE_S3_REGION,S3 region"`
S3Endpoint string `flag:"s3-endpoint-url,default=$GOCACHE_S3_ENDPOINT_URL,S3 custom endpoint URL (if unset, use AWS default)"`
S3PathStyle bool `flag:"s3-path-style,default=$GOCACHE_S3_PATH_STYLE,S3 path-style URLs (optional)"`
KeyPrefix string `flag:"prefix,default=$GOCACHE_KEY_PREFIX,S3 key prefix (optional)"`
MinUploadSize int64 `flag:"min-upload-size,default=$GOCACHE_MIN_SIZE,Minimum object size to upload to S3 (in bytes)"`
Concurrency int `flag:"c,default=$GOCACHE_CONCURRENCY,Maximum number of concurrent requests"`
S3Concurrency int `flag:"u,default=$GOCACHE_S3_CONCURRENCY,Maximum concurrency for upload to S3"`
PrintMetrics bool `flag:"metrics,default=$GOCACHE_METRICS,Print summary metrics to stderr at exit"`
Expiration time.Duration `flag:"expiry,default=$GOCACHE_EXPIRY,Cache expiration period (optional)"`
Verbose bool `flag:"v,default=$GOCACHE_VERBOSE,Enable verbose logging"`
DebugLog int `flag:"debug,default=$GOCACHE_DEBUG,Enable detailed per-request debug logging (noisy)"`
CacheDir string `flag:"cache-dir,default=$GOCACHE_DIR,Local cache directory (required)"`
S3Bucket string `flag:"bucket,default=$GOCACHE_S3_BUCKET,S3 bucket name (required if no --local flag provided)"`
S3Region string `flag:"region,default=$GOCACHE_S3_REGION,S3 region"`
S3Endpoint string `flag:"s3-endpoint-url,default=$GOCACHE_S3_ENDPOINT_URL,S3 custom endpoint URL (if unset, use AWS default)"`
S3PathStyle bool `flag:"s3-path-style,default=$GOCACHE_S3_PATH_STYLE,S3 path-style URLs (optional)"`
LocalOnlyCache bool `flag:"local-only,default=$GOCACHE_LOCAL_ONLY,Runs in no cache mode (no S3)"`
KeyPrefix string `flag:"prefix,default=$GOCACHE_KEY_PREFIX,S3 key prefix (optional)"`
MinUploadSize int64 `flag:"min-upload-size,default=$GOCACHE_MIN_SIZE,Minimum object size to upload to S3 (in bytes)"`
Concurrency int `flag:"c,default=$GOCACHE_CONCURRENCY,Maximum number of concurrent requests"`
S3Concurrency int `flag:"u,default=$GOCACHE_S3_CONCURRENCY,Maximum concurrency for upload to S3"`
PrintMetrics bool `flag:"metrics,default=$GOCACHE_METRICS,Print summary metrics to stderr at exit"`
Expiration time.Duration `flag:"expiry,default=$GOCACHE_EXPIRY,Cache expiration period (optional)"`
Verbose bool `flag:"v,default=$GOCACHE_VERBOSE,Enable verbose logging"`
DebugLog int `flag:"debug,default=$GOCACHE_DEBUG,Enable detailed per-request debug logging (noisy)"`
TracingEnabled bool `flag:"tracing,default=$GOCACHE_ENABLE_TRACING,Enable tracing (optional)"`
OtelCollectorAddress string `flag:"otel-collector,default=$GOCACHE_TRACING_OTEL_COLLECTOR,OTEL collector address (optional)"`
TracesLogFile string `flag:"traces-log-file,default=$GOCACHE_TRACING_TRACE_FILE,File used to write traces"`
TraceId string `flag:"traceId,default=$GOCAHE_TRACING_TRACE_ID,Trace Id (optional)"`
ParentSpanId string `flag:"parentSpanId,default=$GOCACHE_TRACING_PARENT_SPAN_ID,Parent Span Id (optional)"`
RunId string `flag:"runId,default=$RUN_ID,Run ID (optional)"`
RunAttempt string `flag:"runAttempt,default=$RUN_ATTEMPT,Run attempt (optional)"`
JobName string `flag:"jobName,default=$JOB_NAME,Job name (optional)"`
StepName string `flag:"stepName,default=$STEP_NAME,Step name (optional)"`
StepNumber string `flag:"stepNumber,default=$STEP_NUMBER,Step number (optional)"`
}

const (
Expand All @@ -62,11 +75,12 @@ func runDirect(env *command.Env) error {
}

var serveFlags struct {
Plugin string `flag:"plugin,default=$GOCACHE_PLUGIN,Plugin service addr (or port) (required)"`
HTTP string `flag:"http,default=$GOCACHE_HTTP,HTTP service address ([host]:port)"`
ModProxy bool `flag:"modproxy,default=$GOCACHE_MODPROXY,Enable a Go module proxy (requires --http)"`
RevProxy string `flag:"revproxy,default=$GOCACHE_REVPROXY,Reverse proxy these hosts (comma-separated; requires --http)"`
SumDB string `flag:"sumdb,default=$GOCACHE_SUMDB,SumDB servers to proxy for (comma-separated)"`
Plugin string `flag:"plugin,default=$GOCACHE_PLUGIN,Plugin service addr (or port) (required)"`
HTTP string `flag:"http,default=$GOCACHE_HTTP,HTTP service address ([host]:port)"`
ModProxy bool `flag:"modproxy,default=$GOCACHE_MODPROXY,Enable a Go module proxy (requires --http)"`
ModNoCache bool `flag:"modproxy-nocache,default=$GOCACHE_MODPROXY_NOCACHE,Disable the module cache (requires --modproxy)"`
RevProxy string `flag:"revproxy,default=$GOCACHE_REVPROXY,Reverse proxy these hosts (comma-separated; requires --http)"`
SumDB string `flag:"sumdb,default=$GOCACHE_SUMDB,SumDB servers to proxy for (comma-separated)"`
}

func noopClose(context.Context) error { return nil }
Expand Down Expand Up @@ -126,14 +140,20 @@ func runServe(env *command.Env) error {
// If an HTTP server is enabled, start it up with debug routes
// and whatever other services were requested.
if serveFlags.HTTP != "" {
otelCleanup, tracingContext, err := initModTracing(ctx, "gobuild-modcache")
if err != nil {
return fmt.Errorf("tracing: %w", err)
}

srv := &http.Server{
Addr: serveFlags.HTTP,
Handler: makeHandler(modProxy, revProxy),
Handler: makeHandler(modProxy, revProxy, tracingContext),
}
g.Go(srv.ListenAndServe)
vprintf("HTTP server listening at %q", serveFlags.HTTP)
g.Run(func() {
<-ctx.Done()
otelCleanup(ctx)
vprintf("stopping HTTP service")
srv.Shutdown(context.Background())
})
Expand Down Expand Up @@ -169,8 +189,13 @@ func runServe(env *command.Env) error {

// runConnect implements a direct cache proxy by connecting to a remote server.
func runConnect(env *command.Env, plugin string) error {
addr := plugin

ctx := env.Context()
shutdownTracer, reportSpan, err := initTracing(ctx, "gobuild-gocacheprog-connect")
if err != nil {
return err
}
addr := plugin
// If the caller has not specified a host/port, then likely this is an older usage which only specifies port
if !strings.Contains(plugin, ":") {
port, err := strconv.Atoi(plugin)
Expand All @@ -190,23 +215,90 @@ func runConnect(env *command.Env, plugin string) error {

out := taskgroup.Go(func() error {
defer conn.(*net.TCPConn).CloseWrite() // let the server finish
return copy(conn, os.Stdin)
return copy(conn, os.Stdin, reportSpan)
})
if rerr := copy(os.Stdout, conn); rerr != nil {
if rerr := copy(os.Stdout, conn, reportSpan); rerr != nil {
vprintf("read responses: %v", err)
}
out.Wait()
conn.Close()

shutdownTracer(context.Background())
vprintf("connection closed (%v elapsed)", time.Since(start))
return nil
}

func initTracing(ctx context.Context, service string) (func(context.Context) error, func([]byte), error) {
if !flags.TracingEnabled {
return func(context.Context) error { return nil }, func([]byte) {}, nil
}

tracingContext, err := initTracingContext()
if err != nil {
return nil, nil, err
}

shutdown, err := initTracingProvider(ctx, service)
if err != nil {
return nil, nil, err
}

spanner := otel.NewGoCacheSpanner(tracingContext)

spanReporter := func(buffer []byte) {
_ = spanner.ProcessCacheRequest(ctx, buffer)
}

return shutdown, spanReporter, err
}

func initModTracing(ctx context.Context, service string) (func(context.Context) error, *otel.TracingContext, error) {
if !flags.TracingEnabled {
return func(context.Context) error { return nil }, nil, nil
}

tracingContext, err := initTracingContext()
if err != nil {
return nil, nil, err
}

shutdown, err := initTracingProvider(ctx, service)
if err != nil {
return nil, nil, err
}

return shutdown, tracingContext, err
}

func initTracingProvider(ctx context.Context, service string) (func(context.Context) error, error) {
var shutdown func(context.Context) error
var err error
if flags.OtelCollectorAddress != "" {
shutdown, err = otel.SetupOtelTraceProvider(ctx, service, flags.OtelCollectorAddress)
} else if flags.TracesLogFile != "" {
log.Printf("Otel Collector address not specified, starting with the logging reporter, log file: %s", flags.TracesLogFile)
shutdown, err = otel.SetupLoggingProvider(ctx, service, flags.TracesLogFile)
} else {
log.Printf("please specify either --otel-collector or --log-file to setup tracing or disable tracing")
return nil, errors.New("otel exporter not initialized")
}
return shutdown, err
}

func initTracingContext() (*otel.TracingContext, error) {
if flags.TraceId != "" && flags.ParentSpanId != "" {
return otel.NewTracingContext(flags.TraceId, flags.ParentSpanId)
} else {
return otel.NewTracingContextFromRunData(flags.RunId, flags.RunAttempt, flags.JobName, flags.StepName, flags.StepNumber), nil
}
}

// copy emulates the base case of io.Copy, but does not attempt to use the
// io.ReaderFrom or io.WriterTo implementations.
//
// TODO(creachadair): For some reason io.Copy does not work correctly when r is
// a pipe (e.g., stdin) and w is a TCP socket. Figure out why.
func copy(w io.Writer, r io.Reader) error {
func copy(w io.Writer, r io.Reader, reportTrace func([]byte)) error {
var buf [4096]byte
for {
nr, err := r.Read(buf[:])
Expand All @@ -216,6 +308,8 @@ func copy(w io.Writer, r io.Reader) error {
} else if nw < nr {
return fmt.Errorf("wrote %d < %d bytes: %w", nw, nr, io.ErrShortWrite)
}

reportTrace(buf[:])
}
if err == io.EOF {
return nil
Expand Down
1 change: 0 additions & 1 deletion cmd/go-cache-plugin/go-cache-plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ When --http is enabled, the following options are available:

This mode bridges stdin/stdout to a cache server (see the "serve" command)
listening on the specified port.`,

Run: command.Adapt(runConnect),
},
command.HelpCommand(helpTopics),
Expand Down
74 changes: 59 additions & 15 deletions cmd/go-cache-plugin/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"strings"
"time"

"github.com/grafana/go-cache-plugin/lib/otel"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
Expand All @@ -39,8 +41,34 @@ func initCacheServer(env *command.Env) (*gocache.Server, *s3util.Client, error)
switch {
case flags.CacheDir == "":
return nil, nil, env.Usagef("you must provide a --cache-dir")
case flags.S3Bucket == "":
return nil, nil, env.Usagef("you must provide an S3 --bucket name")
case flags.LocalOnlyCache:
dirCache, err := cachedir.New(flags.CacheDir)
if err != nil {
return nil, nil, fmt.Errorf("create local cache: %w", err)
}

cacheClose := func(context.Context) error { return nil }
if flags.Expiration > 0 {
dirClose := dirCache.Cleanup(flags.Expiration)
cacheClose = func(ctx context.Context) error {
return errors.Join(dirClose(ctx))
}
}

setMetrics := func(ctx context.Context, m *expvar.Map) {}
s := &gocache.Server{
Get: dirCache.Get,
Put: dirCache.Put,
Close: cacheClose,
SetMetrics: setMetrics,
MaxRequests: flags.Concurrency,
Logf: vprintf,
LogRequests: flags.DebugLog&debugBuildCache != 0,
}
return s, nil, nil

case flags.S3Bucket == "" && !flags.LocalOnlyCache:
return nil, nil, env.Usagef("you must provide an S3 --bucket name or run with --no-cache")
}
region, err := getBucketRegion(env.Context(), flags.S3Bucket)
if err != nil {
Expand Down Expand Up @@ -112,19 +140,30 @@ func initModProxy(env *command.Env, s3c *s3util.Client) (_ http.Handler, cleanup
return nil, nil, env.Usagef("you must set --http to enable --modproxy")
}

modCachePath := filepath.Join(flags.CacheDir, "module")
if err := os.MkdirAll(modCachePath, 0755); err != nil {
return nil, nil, fmt.Errorf("create module cache: %w", err)
if s3c == nil && !serveFlags.ModNoCache {
return nil, nil, errors.New("s3 client not configured")
}
cacher := &modproxy.S3Cacher{
Local: modCachePath,
S3Client: s3c,
KeyPrefix: path.Join(flags.KeyPrefix, "module"),
MaxTasks: flags.S3Concurrency,
Logf: vprintf,
LogRequests: flags.DebugLog&debugModProxy != 0,

var cacher goproxy.Cacher = nil
var metrics = func() *expvar.Map { return &expvar.Map{} }
if s3c != nil {
modCachePath := filepath.Join(flags.CacheDir, "module")
if err := os.MkdirAll(modCachePath, 0755); err != nil {
return nil, nil, fmt.Errorf("create module cache: %w", err)
}

cacher := &modproxy.S3Cacher{
Local: modCachePath,
S3Client: s3c,
KeyPrefix: path.Join(flags.KeyPrefix, "module"),
MaxTasks: flags.S3Concurrency,
Logf: vprintf,
LogRequests: flags.DebugLog&debugModProxy != 0,
}
cleanup = func() { vprintf("close cacher (err=%v)", cacher.Close()) }
metrics = cacher.Metrics
}
cleanup = func() { vprintf("close cacher (err=%v)", cacher.Close()) }

proxy := &goproxy.Goproxy{
Fetcher: &goproxy.GoFetcher{
// As configured, the fetcher should never shell out to the go
Expand All @@ -142,7 +181,7 @@ func initModProxy(env *command.Env, s3c *s3util.Client) (_ http.Handler, cleanup
proxy.ProxiedSumDBs = strings.Split(serveFlags.SumDB, ",")
vprintf("enabling sum DB proxy for %s", strings.Join(proxy.ProxiedSumDBs, ", "))
}
expvar.Publish("modcache", cacher.Metrics())
expvar.Publish("modcache", metrics())
return http.StripPrefix("/mod", proxy), cleanup, nil
}

Expand Down Expand Up @@ -268,7 +307,7 @@ func initServerCert(env *command.Env, hosts []string) (tls.Certificate, error) {

// makeHandler returns an HTTP handler that dispatches requests to debug
// handlers or to the specified proxies, if they are defined.
func makeHandler(modProxy, revProxy http.Handler) http.HandlerFunc {
func makeHandler(modProxy, revProxy http.Handler, tracingContext *otel.TracingContext) http.HandlerFunc {
mux := http.NewServeMux()
tsweb.Debugger(mux)
return func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -289,6 +328,11 @@ func makeHandler(modProxy, revProxy http.Handler) http.HandlerFunc {
return
}
if modProxy != nil && r.Method == http.MethodGet && strings.HasPrefix(path, "/mod/") {
if tracingContext != nil {
_, span := tracingContext.SpanWithContext(r.Context(), strings.TrimPrefix(path, "/mod/"))
defer span.End()
}

modProxy.ServeHTTP(w, r)
return
}
Expand Down
Loading