Skip to content

Commit 98f84bc

Browse files
committed
change proxy as discussed in a meet
Signed-off-by: Sandor Szücs <[email protected]>
1 parent c23f184 commit 98f84bc

File tree

1 file changed

+95
-43
lines changed

1 file changed

+95
-43
lines changed

proxy/proxy.go

+95-43
Original file line numberDiff line numberDiff line change
@@ -862,13 +862,43 @@ func WithParams(p Params) *Proxy {
862862
}
863863
}
864864

865-
var frURL *url.URL
865+
log := &logging.DefaultLog{}
866+
867+
var (
868+
frURL *url.URL
869+
frChannel = make(chan struct{}, 10240)
870+
)
866871
if p.FlightRecorder != nil {
867872
var err error
868873
frURL, err = url.Parse(p.FlightRecorderTargetURL)
869874
if err != nil {
870875
p.FlightRecorder.Stop()
871876
p.FlightRecorder = nil
877+
} else {
878+
go func() {
879+
d := 7 * 24 * time.Hour
880+
last := time.Now()
881+
882+
for {
883+
select {
884+
case <-frChannel:
885+
// range through all notifications until 1ms there is no notification
886+
d = time.Millisecond
887+
continue
888+
case <-quit:
889+
p.FlightRecorder.Stop()
890+
return
891+
case <-time.After(d):
892+
if time.Since(last) >= time.Hour {
893+
writeTrace(p.FlightRecorder, frURL, log, tr)
894+
}
895+
last = time.Now()
896+
897+
// reset d
898+
d = 7 * 24 * time.Hour
899+
}
900+
}
901+
}()
872902
}
873903
}
874904

@@ -891,7 +921,7 @@ func WithParams(p Params) *Proxy {
891921
maxLoops: p.MaxLoopbacks,
892922
breakers: p.CircuitBreakers,
893923
limiters: p.RateLimiters,
894-
log: &logging.DefaultLog{},
924+
log: log,
895925
defaultHTTPStatus: defaultHTTPStatus,
896926
tracing: newProxyTracing(p.OpenTracing),
897927
accessLogDisabled: p.AccessLogDisabled,
@@ -906,57 +936,81 @@ func WithParams(p Params) *Proxy {
906936
}
907937
}
908938

909-
func (p *Proxy) writeTraceIfTooSlow(ctx *context) {
910-
if p.flightRecorder == nil || p.flightRecorderURL == nil {
939+
func (p *Proxy) writeTraceIfTooSlow(ctx *context, span ot.Span) {
940+
took := time.Since(ctx.startServe)
941+
span.SetTag("proxy.took", took)
942+
943+
// signal too slow
944+
945+
d := p.flightRecorderPeriod
946+
if d < 1*time.Millisecond && d > took {
911947
return
912948
}
949+
}
913950

914-
d := p.flightRecorderPeriod
915-
if e, ok := ctx.StateBag()[filters.TraceName]; ok {
916-
d = e.(time.Duration)
951+
func writeTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) {
952+
n, err := flightRecorder.WriteTo(w)
953+
if err != nil {
954+
switch err {
955+
case trace.ErrSnapshotActive:
956+
return 0, fmt.Errorf("flightRecorder already in progress")
957+
default:
958+
return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err)
959+
}
960+
} else {
961+
log.Infof("FlightRecorder wrote %d bytes", n)
917962
}
918-
if d < 1*time.Microsecond {
963+
964+
return n, err
965+
}
966+
967+
func writeTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) {
968+
if flightRecorder == nil || flightRecorderURL == nil {
919969
return
920970
}
921971

922-
p.log.Infof("write trace if too slow: %s > %s", time.Since(ctx.startServe), d)
923-
if time.Since(ctx.startServe) > d {
972+
log.Info("write trace")
973+
974+
switch flightRecorderURL.Scheme {
975+
case "file":
976+
fd, err := os.Open(flightRecorderURL.Path)
977+
if err != nil {
978+
log.Errorf("Failed to write file %q: %v", err, flightRecorderURL.Path)
979+
return
980+
}
981+
982+
_, err = writeTraceTo(log, flightRecorder, fd)
983+
if err != nil {
984+
log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err)
985+
}
986+
987+
case "http", "https":
924988
var b bytes.Buffer
925-
_, err := p.flightRecorder.WriteTo(&b)
989+
_, err := writeTraceTo(log, flightRecorder, &b)
926990
if err != nil {
927-
p.log.Errorf("Failed to write flightrecorder data: %v", err)
991+
log.Errorf("Failed to write trace into in-memory buffer: %v", err)
928992
return
929993
}
930994

931-
switch p.flightRecorderURL.Scheme {
932-
case "file":
933-
if err := os.WriteFile(p.flightRecorderURL.Path, b.Bytes(), 0o644); err != nil {
934-
p.log.Errorf("Failed to write file trace.out: %v", err)
935-
return
936-
} else {
937-
p.log.Infof("FlightRecorder wrote %d bytes to trace file %q", b.Len(), p.flightRecorderURL.Path)
938-
}
939-
case "http", "https":
940-
req, err := http.NewRequest("PUT", p.flightRecorderURL.String(), &b)
941-
if err != nil {
942-
p.log.Errorf("Failed to create request to %q to send a trace: %v", p.flightRecorderURL.String(), err)
943-
}
995+
req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b)
996+
if err != nil {
997+
log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err)
998+
}
944999

945-
rsp, err := p.roundTripper.RoundTrip(req)
946-
if err != nil {
947-
p.log.Errorf("Failed to write trace to %q: %v", p.flightRecorderURL.String(), err)
948-
} else {
949-
rsp.Body.Close()
950-
}
951-
switch rsp.StatusCode {
952-
case 200, 201, 204:
953-
p.log.Infof("Successful send of a trace to %q", p.flightRecorderURL.String())
954-
default:
955-
p.log.Errorf("Failed to get successful response from %s: (%d) %s", p.flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
956-
}
1000+
rsp, err := roundTripper.RoundTrip(req)
1001+
if err != nil {
1002+
log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err)
1003+
} else {
1004+
rsp.Body.Close()
1005+
}
1006+
switch rsp.StatusCode {
1007+
case 200, 201, 204:
1008+
log.Infof("Successful send of a trace to %q", flightRecorderURL.String())
9571009
default:
958-
p.log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", p.flightRecorderURL.Scheme)
1010+
log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
9591011
}
1012+
default:
1013+
log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme)
9601014
}
9611015
}
9621016

@@ -1087,7 +1141,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
10871141
proxySpanOpts := []ot.StartSpanOption{ot.Tags{
10881142
SpanKindTag: SpanKindClient,
10891143
}}
1090-
if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil {
1144+
parentSpan := ot.SpanFromContext(req.Context())
1145+
if parentSpan != nil {
10911146
proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context()))
10921147
}
10931148
ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...)
@@ -1108,7 +1163,7 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
11081163
ctx.proxySpan.LogKV("http_roundtrip", StartEvent)
11091164
req = injectClientTrace(req, ctx.proxySpan)
11101165

1111-
p.writeTraceIfTooSlow(ctx)
1166+
p.writeTraceIfTooSlow(ctx, parentSpan)
11121167

11131168
response, err := roundTripper.RoundTrip(req)
11141169
if endpointMetrics != nil {
@@ -1742,9 +1797,6 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
17421797
func (p *Proxy) Close() error {
17431798
close(p.quit)
17441799
p.registry.Close()
1745-
if p.flightRecorder != nil {
1746-
p.flightRecorder.Stop()
1747-
}
17481800

17491801
return nil
17501802
}

0 commit comments

Comments
 (0)