44package main
55
66import (
7+ "bytes"
78 "context"
9+ "encoding/json"
810 "errors"
911 "fmt"
1012 "github.com/grafana/go-cache-plugin/lib/otel"
@@ -25,22 +27,24 @@ import (
2527)
2628
2729var flags struct {
28- CacheDir string `flag:"cache-dir,default=$GOCACHE_DIR,Local cache directory (required)"`
29- LogFile string `flag:"log-file,default=trace.log,File used for logs"`
30- S3Bucket string `flag:"bucket,default=$GOCACHE_S3_BUCKET,S3 bucket name (required if no --local flag provided)"`
31- S3Region string `flag:"region,default=$GOCACHE_S3_REGION,S3 region"`
32- S3Endpoint string `flag:"s3-endpoint-url,default=$GOCACHE_S3_ENDPOINT_URL,S3 custom endpoint URL (if unset, use AWS default)"`
33- S3PathStyle bool `flag:"s3-path-style,default=$GOCACHE_S3_PATH_STYLE,S3 path-style URLs (optional)"`
34- LocalCache bool `flag:"local,default=false,Runs cache in local mode (no S3)"`
35- KeyPrefix string `flag:"prefix,default=$GOCACHE_KEY_PREFIX,S3 key prefix (optional)"`
36- MinUploadSize int64 `flag:"min-upload-size,default=$GOCACHE_MIN_SIZE,Minimum object size to upload to S3 (in bytes)"`
37- Concurrency int `flag:"c,default=$GOCACHE_CONCURRENCY,Maximum number of concurrent requests"`
38- S3Concurrency int `flag:"u,default=$GOCACHE_S3_CONCURRENCY,Maximum concurrency for upload to S3"`
39- PrintMetrics bool `flag:"metrics,default=$GOCACHE_METRICS,Print summary metrics to stderr at exit"`
40- Expiration time.Duration `flag:"expiry,default=$GOCACHE_EXPIRY,Cache expiration period (optional)"`
41- Verbose bool `flag:"v,default=$GOCACHE_VERBOSE,Enable verbose logging"`
42- DebugLog int `flag:"debug,default=$GOCACHE_DEBUG,Enable detailed per-request debug logging (noisy)"`
43- TracingParams string `flag:"tracing,default=runId:runAttempt:jobName:stepName,Tracing params"`
30+ CacheDir string `flag:"cache-dir,default=$GOCACHE_DIR,Local cache directory (required)"`
31+ LogFile string `flag:"log-file,default=trace.log,File used for logs"`
32+ S3Bucket string `flag:"bucket,default=$GOCACHE_S3_BUCKET,S3 bucket name (required if no --local flag provided)"`
33+ S3Region string `flag:"region,default=$GOCACHE_S3_REGION,S3 region"`
34+ S3Endpoint string `flag:"s3-endpoint-url,default=$GOCACHE_S3_ENDPOINT_URL,S3 custom endpoint URL (if unset, use AWS default)"`
35+ S3PathStyle bool `flag:"s3-path-style,default=$GOCACHE_S3_PATH_STYLE,S3 path-style URLs (optional)"`
36+ LocalCache bool `flag:"local-cache,default=false,Runs in no cache mode (no S3)"`
37+ KeyPrefix string `flag:"prefix,default=$GOCACHE_KEY_PREFIX,S3 key prefix (optional)"`
38+ MinUploadSize int64 `flag:"min-upload-size,default=$GOCACHE_MIN_SIZE,Minimum object size to upload to S3 (in bytes)"`
39+ Concurrency int `flag:"c,default=$GOCACHE_CONCURRENCY,Maximum number of concurrent requests"`
40+ S3Concurrency int `flag:"u,default=$GOCACHE_S3_CONCURRENCY,Maximum concurrency for upload to S3"`
41+ PrintMetrics bool `flag:"metrics,default=$GOCACHE_METRICS,Print summary metrics to stderr at exit"`
42+ Expiration time.Duration `flag:"expiry,default=$GOCACHE_EXPIRY,Cache expiration period (optional)"`
43+ Verbose bool `flag:"v,default=$GOCACHE_VERBOSE,Enable verbose logging"`
44+ DebugLog int `flag:"debug,default=$GOCACHE_DEBUG,Enable detailed per-request debug logging (noisy)"`
45+ TracingEnabled bool `flag:"tracing,default=false,Enable tracing (optional)"`
46+ TracingContext string `flag:"context,default=runId:runAttempt:jobName:stepName:stepNumber,Tracing params"`
47+ OtelCollectorAddress string `flag:"otel-collector,default,OTEL collector address (optional)"`
4448}
4549
4650const (
@@ -66,11 +70,12 @@ func runDirect(env *command.Env) error {
6670}
6771
6872var serveFlags struct {
69- Plugin string `flag:"plugin,default=$GOCACHE_PLUGIN,Plugin service addr (or port) (required)"`
70- HTTP string `flag:"http,default=$GOCACHE_HTTP,HTTP service address ([host]:port)"`
71- ModProxy bool `flag:"modproxy,default=$GOCACHE_MODPROXY,Enable a Go module proxy (requires --http)"`
72- RevProxy string `flag:"revproxy,default=$GOCACHE_REVPROXY,Reverse proxy these hosts (comma-separated; requires --http)"`
73- SumDB string `flag:"sumdb,default=$GOCACHE_SUMDB,SumDB servers to proxy for (comma-separated)"`
73+ Plugin string `flag:"plugin,default=$GOCACHE_PLUGIN,Plugin service addr (or port) (required)"`
74+ HTTP string `flag:"http,default=$GOCACHE_HTTP,HTTP service address ([host]:port)"`
75+ ModProxy bool `flag:"modproxy,default=$GOCACHE_MODPROXY,Enable a Go module proxy (requires --http)"`
76+ ModNoCache bool `flag:"mod-nocache,default=false,Disable the local module cache (requires --modproxy)"`
77+ RevProxy string `flag:"revproxy,default=$GOCACHE_REVPROXY,Reverse proxy these hosts (comma-separated; requires --http)"`
78+ SumDB string `flag:"sumdb,default=$GOCACHE_SUMDB,SumDB servers to proxy for (comma-separated)"`
7479}
7580
7681var logger * log.Logger
@@ -83,7 +88,7 @@ func runServe(env *command.Env) error {
8388 return env .Usagef ("you must provide a --plugin addr (or port)" )
8489 }
8590
86- otel . Init ( context . Background (), otel. Config { Mode : otel . ModeStdout , LogFile : flags .LogFile } )
91+ log . Printf ( "Otel exporter initialized with address: %s" , flags .OtelCollectorAddress )
8792 // Initialize the cache server. Unlike a direct server, only close down and
8893 // wait for cache cleanup when the whole process exits.
8994 s , s3c , err := initCacheServer (env )
@@ -176,8 +181,11 @@ func runServe(env *command.Env) error {
176181
177182// runConnect implements a direct cache proxy by connecting to a remote server.
178183func runConnect (env * command.Env , plugin string ) error {
179- addr := plugin
180184
185+ ctx := env .Context ()
186+ shutdownTracer , reportSpan , err := initTracing (ctx )
187+
188+ addr := plugin
181189 // If the caller has not specified a host/port, then likely this is an older usage which only specifies port
182190 if ! strings .Contains (plugin , ":" ) {
183191 port , err := strconv .Atoi (plugin )
@@ -197,23 +205,61 @@ func runConnect(env *command.Env, plugin string) error {
197205
198206 out := taskgroup .Go (func () error {
199207 defer conn .(* net.TCPConn ).CloseWrite () // let the server finish
200- return copy (conn , os .Stdin )
208+ return copy (conn , os .Stdin , reportSpan )
201209 })
202- if rerr := copy (os .Stdout , conn ); rerr != nil {
210+ if rerr := copy (os .Stdout , conn , reportSpan ); rerr != nil {
203211 vprintf ("read responses: %v" , err )
204212 }
205213 out .Wait ()
206214 conn .Close ()
207- vprintf ("connection closed (%v elapsed)" , time .Since (start ))
215+
216+ shutdownTracer (context .Background ())
217+ println ("@@@@@ - connection closed @@@@@" )
218+ println (fmt .Sprintf ("connection closed (%v elapsed)" , time .Since (start )))
208219 return nil
209220}
210221
222+ func initTracing (ctx context.Context ) (func (context.Context ) error , func ([]byte ), error ) {
223+ if ! flags .TracingEnabled {
224+ return func (context.Context ) error { return nil }, func ([]byte ) {}, nil
225+ }
226+
227+ var shutdown func (context.Context ) error
228+ var err error
229+ if flags .OtelCollectorAddress != "" {
230+ shutdown , err = otel .SetupOtelTraceProvider (ctx , flags .OtelCollectorAddress )
231+ } else if flags .LogFile != "" {
232+ log .Printf ("Otel Collector address not specified, starting with the logging reporter, log file: %s" , flags .LogFile )
233+ shutdown , err = otel .SetupLoggingProvider (ctx , flags .LogFile )
234+ } else {
235+ log .Printf ("please specify either --otel-collector or --log-file to setup tracing or disable tracing" )
236+ return nil , nil , errors .New ("otel exporter not initialized" )
237+ }
238+ if err != nil {
239+ return nil , nil , err
240+ }
241+
242+ tracingContext := otel .NewTracedFromString (flags .TracingContext )
243+
244+ spanner := otel .NewAwesomeSpanner (tracingContext )
245+
246+ spanReporter := func (buffer []byte ) {
247+ id , err := parseId (buffer [:])
248+ if err != nil {
249+ log .Printf ("failed to parse id from buffer: %v" , err )
250+ }
251+ spanner .ProcessId (ctx , id )
252+ }
253+
254+ return shutdown , spanReporter , err
255+ }
256+
211257// copy emulates the base case of io.Copy, but does not attempt to use the
212258// io.ReaderFrom or io.WriterTo implementations.
213259//
214260// TODO(creachadair): For some reason io.Copy does not work correctly when r is
215261// a pipe (e.g., stdin) and w is a TCP socket. Figure out why.
216- func copy (w io.Writer , r io.Reader ) error {
262+ func copy (w io.Writer , r io.Reader , reportTrace func ([] byte ) ) error {
217263 var buf [4096 ]byte
218264 for {
219265 nr , err := r .Read (buf [:])
@@ -223,6 +269,8 @@ func copy(w io.Writer, r io.Reader) error {
223269 } else if nw < nr {
224270 return fmt .Errorf ("wrote %d < %d bytes: %w" , nw , nr , io .ErrShortWrite )
225271 }
272+
273+ reportTrace (buf [:])
226274 }
227275 if err == io .EOF {
228276 return nil
@@ -231,3 +279,21 @@ func copy(w io.Writer, r io.Reader) error {
231279 }
232280 }
233281}
282+
283+ func parseId (buf []byte ) (string , error ) {
284+ map1 := make (map [string ]any )
285+
286+ err := json .Unmarshal (buf , & map1 )
287+ if err != nil {
288+ slice , _ , found := bytes .Cut (buf , []byte {'\n' })
289+ if found {
290+ err2 := json .Unmarshal (slice , & map1 )
291+ if err2 != nil {
292+ return "" , err
293+ }
294+ }
295+ }
296+
297+ value := map1 ["ID" ]
298+ return fmt .Sprintf ("map1 = %v" , value ), nil
299+ }
0 commit comments