Skip to content

Commit 1615a34

Browse files
authoredFeb 20, 2024··
Merge pull request #994 from ripienaar/tracing
Support tracing messages for server 2.11
2 parents 9571e6e + 7a3d656 commit 1615a34

File tree

3 files changed

+377
-3
lines changed

3 files changed

+377
-3
lines changed
 

‎cli/trace_command.go

+374
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
// Copyright 2019-2023 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package cli
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"github.com/choria-io/fisk"
21+
"github.com/choria-io/fisk/units"
22+
"github.com/nats-io/jsm.go"
23+
"github.com/nats-io/jsm.go/api/server/tracing"
24+
"github.com/nats-io/nats-server/v2/server"
25+
"github.com/nats-io/nats.go"
26+
"os"
27+
"strings"
28+
"sync"
29+
"time"
30+
)
31+
32+
type traceCmd struct {
33+
subject string
34+
deliver bool
35+
showTs bool
36+
header map[string]string
37+
payload units.Base2Bytes
38+
}
39+
40+
type traceStats struct {
41+
egress map[string]int
42+
sync.Mutex
43+
}
44+
45+
func (s *traceStats) incEgressKind(kind int) {
46+
s.Lock()
47+
if s.egress == nil {
48+
s.egress = map[string]int{}
49+
}
50+
51+
kinds := jsm.ServerKindString(kind)
52+
53+
if _, ok := s.egress[kinds]; !ok {
54+
s.egress[kinds] = 0
55+
}
56+
57+
s.egress[kinds]++
58+
59+
s.Unlock()
60+
}
61+
62+
func configureTraceCommand(app commandHost) {
63+
c := &traceCmd{
64+
header: make(map[string]string),
65+
}
66+
67+
trace := app.Command("trace", "Trace message delivery within an NATS network").Action(c.traceAction)
68+
trace.Arg("subject", "The subject to publish to").Required().StringVar(&c.subject)
69+
trace.Arg("payload", "The message body to send").BytesVar(&c.payload)
70+
trace.Flag("deliver", "Deliver the message to the final destination").UnNegatableBoolVar(&c.deliver)
71+
trace.Flag("timestamp", "Show event timestamps").Short('T').UnNegatableBoolVar(&c.showTs)
72+
trace.Flag("header", "Adds headers to the trace message").Short('H').StringMapVar(&c.header)
73+
}
74+
75+
func init() {
76+
registerCommand("trace", 0, configureTraceCommand)
77+
}
78+
79+
func (c *traceCmd) traceAction(_ *fisk.ParseContext) error {
80+
nc, _, err := prepareHelper("", natsOpts()...)
81+
if err != nil {
82+
return err
83+
}
84+
85+
msg := nats.NewMsg(c.subject)
86+
for k, v := range c.header {
87+
msg.Header.Set(k, v)
88+
}
89+
msg.Data, err = c.payload.MarshalText()
90+
if err != nil {
91+
return err
92+
}
93+
94+
deliver := ""
95+
if c.deliver {
96+
deliver = "with delivery to the final destination"
97+
}
98+
99+
fmt.Printf("Tracing message route to subject %s %s\n\n", c.subject, deliver)
100+
101+
lctx, cancel := context.WithCancel(ctx)
102+
defer cancel()
103+
104+
wg := sync.WaitGroup{}
105+
var traces chan *nats.Msg
106+
if opts.Trace {
107+
traces = make(chan *nats.Msg, 10)
108+
wg.Add(1)
109+
go func(ctx context.Context, wg *sync.WaitGroup) {
110+
defer wg.Done()
111+
112+
for {
113+
select {
114+
case msg := <-traces:
115+
fmt.Printf(">>> %s\n", string(msg.Data))
116+
case <-ctx.Done():
117+
return
118+
}
119+
}
120+
121+
}(lctx, &wg)
122+
}
123+
124+
event, err := tracing.TraceMsg(nc, msg, c.deliver, opts.Timeout, traces)
125+
if (event == nil || !errors.Is(err, nats.ErrTimeout)) && err != nil {
126+
return err
127+
}
128+
129+
cancel()
130+
wg.Wait()
131+
132+
ingress := event.Ingress()
133+
ts := event.Server.Time
134+
if ingress != nil {
135+
ts = ingress.Timestamp
136+
}
137+
138+
stat := &traceStats{}
139+
140+
c.renderTrace(event, ts, stat, 0)
141+
142+
fmt.Println()
143+
fmt.Printf("Legend: Client: %s Router: %s Gateway: %s Leafnode: %s JetStream: %s Error: --X\n",
144+
c.arrowForKind(server.CLIENT),
145+
c.arrowForKind(server.ROUTER),
146+
c.arrowForKind(server.GATEWAY),
147+
c.arrowForKind(server.LEAF),
148+
c.arrowForKind(server.JETSTREAM),
149+
)
150+
fmt.Println()
151+
if len(stat.egress) > 0 {
152+
cols := newColumns("Egress Count:")
153+
stat.Lock()
154+
cols.AddMapInts(stat.egress, true, true)
155+
stat.Unlock()
156+
cols.Frender(os.Stdout)
157+
}
158+
159+
if v, ok := stat.egress[jsm.ServerKindString(server.JETSTREAM)]; ok && v > 1 {
160+
fmt.Printf("WARNING: The message was received by %d streams", v)
161+
fmt.Println()
162+
}
163+
164+
return nil
165+
}
166+
167+
func (c *traceCmd) renderTrace(event *server.MsgTraceEvent, ts time.Time, stat *traceStats, indent int) {
168+
if event == nil {
169+
return
170+
}
171+
172+
mu := sync.Mutex{}
173+
prefix := strings.Repeat(" ", indent*4)
174+
175+
printf := func(format string, a ...any) {
176+
mu.Lock()
177+
fmt.Printf(prefix+format+"\n", a...)
178+
mu.Unlock()
179+
}
180+
181+
printlines := func(lines []string) {
182+
for _, line := range lines {
183+
printf(line)
184+
}
185+
}
186+
187+
printlines(c.renderIngress(event, event.Ingress(), ts))
188+
printlines(c.renderSubjectMapping(event, event.SubjectMapping(), ts))
189+
printlines(c.renderServices(event, event.ServiceImports(), ts))
190+
printlines(c.renderStreams(event, event.StreamExports(), ts))
191+
printlines(c.renderJetStream(event, event.JetStream(), stat, ts))
192+
193+
egresses := event.Egresses()
194+
if len(egresses) == 0 && event.Ingress() != nil && event.Ingress().Kind == server.CLIENT && event.Ingress().Error == "" {
195+
printlines([]string{"--X No active interest"})
196+
}
197+
for _, egress := range egresses {
198+
printlines(c.renderEgress(event, egress, ts, stat))
199+
c.renderTrace(egress.Link, egress.Timestamp, stat, indent+1)
200+
}
201+
}
202+
203+
func (c *traceCmd) renderEgress(event *server.MsgTraceEvent, egress *server.MsgTraceEgress, ts time.Time, stat *traceStats) []string {
204+
if egress.Error != "" && event.Server.ID == "" {
205+
return []string{fmt.Sprintf("!!! Egress Error: %v", egress.Error)}
206+
}
207+
208+
parts := []string{c.arrowForKind(egress.Kind)}
209+
210+
if egress.Name == "" {
211+
parts = append(parts, fmt.Sprintf("%s %s", jsm.ServerKindString(egress.Kind), jsm.ServerCidString(egress.Kind, egress.CID)))
212+
} else {
213+
parts = append(parts, fmt.Sprintf("%s %q %s", jsm.ServerKindString(egress.Kind), egress.Name, jsm.ServerCidString(egress.Kind, egress.CID)))
214+
}
215+
216+
if egress.Account != "" {
217+
parts = append(parts, c.pairs("account", egress.Account)...)
218+
}
219+
220+
if egress.Subscription != "" {
221+
parts = append(parts, c.pairs("subject", egress.Subscription)...)
222+
}
223+
if egress.Queue != "" {
224+
parts = append(parts, c.pairs("queue", egress.Queue)...)
225+
}
226+
227+
lines := []string{strings.Join(parts, " ")}
228+
if egress.Error != "" {
229+
lines = append(lines, fmt.Sprintf(" --X Error: %s", egress.Error))
230+
}
231+
232+
stat.incEgressKind(egress.Kind)
233+
234+
return c.tsPrefixLines(lines, ts, egress.Timestamp)
235+
}
236+
237+
func (c *traceCmd) renderSubjectMapping(event *server.MsgTraceEvent, mapping *server.MsgTraceSubjectMapping, ts time.Time) []string {
238+
if mapping == nil {
239+
return nil
240+
}
241+
242+
return []string{c.tsPrefix(fmt.Sprintf(" Mapping subject:%q", mapping.MappedTo), ts, mapping.Timestamp)}
243+
}
244+
245+
func (c *traceCmd) renderJetStream(event *server.MsgTraceEvent, jetstream *server.MsgTraceJetStream, stat *traceStats, ts time.Time) []string {
246+
if jetstream == nil {
247+
return nil
248+
}
249+
250+
if jetstream.Error != "" && jetstream.Stream == "" {
251+
return []string{fmt.Sprintf("!!! Expected JetStream event: %v", jetstream.Error)}
252+
}
253+
254+
action := "stored"
255+
if jetstream.NoInterest {
256+
action = "no interest"
257+
}
258+
259+
parts := []string{fmt.Sprintf("%s JetStream action:%q stream:%q subject:%q", c.arrowForKind(server.JETSTREAM), action, jetstream.Stream, jetstream.Subject)}
260+
261+
lines := []string{strings.Join(parts, " ")}
262+
if jetstream.Error != "" {
263+
lines = append(lines, fmt.Sprintf("--X Error: %s", jetstream.Error))
264+
}
265+
266+
stat.incEgressKind(server.JETSTREAM)
267+
268+
return c.tsPrefixLines(lines, ts, jetstream.Timestamp)
269+
}
270+
271+
func (c *traceCmd) renderStreams(event *server.MsgTraceEvent, streams []*server.MsgTraceStreamExport, ts time.Time) []string {
272+
res := []string{}
273+
for _, stream := range streams {
274+
res = append(res, c.tsPrefix(fmt.Sprintf(" Stream Export subject:%q account:%q", stream.To, stream.Account), ts, stream.Timestamp))
275+
}
276+
return res
277+
}
278+
279+
func (c *traceCmd) renderServices(event *server.MsgTraceEvent, services []*server.MsgTraceServiceImport, ts time.Time) []string {
280+
res := []string{}
281+
for _, svc := range services {
282+
res = append(res, c.tsPrefix(fmt.Sprintf(" Service Import from:%q to:%q account:%q", svc.From, svc.To, svc.Account), ts, svc.Timestamp))
283+
}
284+
return res
285+
}
286+
287+
func (c *traceCmd) renderIngress(event *server.MsgTraceEvent, ingress *server.MsgTraceIngress, ts time.Time) []string {
288+
if ingress == nil {
289+
return nil
290+
}
291+
292+
if ingress.Error != "" && event.Server.ID == "" {
293+
return []string{fmt.Sprintf(" !!! Ingress Error: %v", ingress.Error)}
294+
}
295+
296+
var parts []string
297+
298+
if ingress.Name == "" {
299+
parts = append(parts, fmt.Sprintf("%s %s", jsm.ServerKindString(ingress.Kind), jsm.ServerCidString(ingress.Kind, ingress.CID)))
300+
} else {
301+
parts = append(parts, fmt.Sprintf("%s %q %s", jsm.ServerKindString(ingress.Kind), ingress.Name, jsm.ServerCidString(ingress.Kind, ingress.CID)))
302+
}
303+
304+
if event.Server.Cluster != "" && (ingress.Kind == server.GATEWAY || ingress.Kind == server.CLIENT) {
305+
parts = append(parts, c.pairs("cluster", event.Server.Cluster)...)
306+
}
307+
308+
parts = append(parts, c.pairs("server", event.Server.Name, "version", event.Server.Version)...)
309+
310+
lines := []string{strings.Join(parts, " ")}
311+
if ingress.Error != "" {
312+
lines = append(lines, fmt.Sprintf("--X Error: %s", ingress.Error))
313+
}
314+
315+
return c.tsPrefixLines(lines, ts, ingress.Timestamp)
316+
}
317+
318+
func (c *traceCmd) arrowForKind(kind int) string {
319+
switch kind {
320+
case server.CLIENT:
321+
return "--C"
322+
case server.JETSTREAM:
323+
return "--J"
324+
case server.ROUTER:
325+
return "-->"
326+
case server.GATEWAY:
327+
return "==>"
328+
case server.LEAF:
329+
return "~~>"
330+
default:
331+
return ""
332+
}
333+
}
334+
335+
func (c *traceCmd) tsPrefixLines(msgs []string, ots time.Time, ets time.Time) []string {
336+
res := []string{}
337+
338+
for _, line := range msgs {
339+
res = append(res, c.tsPrefix(line, ots, ets))
340+
}
341+
342+
return res
343+
}
344+
345+
func (c *traceCmd) tsPrefix(msg string, ots time.Time, ets time.Time) string {
346+
var since time.Duration
347+
if !(ots.IsZero() && ets.IsZero()) {
348+
since = ets.Sub(ots).Round(time.Microsecond)
349+
}
350+
351+
ts := since.String()
352+
if since < 0 {
353+
ts = "Skew"
354+
}
355+
356+
if c.showTs {
357+
return fmt.Sprintf("[%s] %s", ts, msg)
358+
}
359+
360+
return msg
361+
}
362+
363+
func (c *traceCmd) pairs(parts ...string) []string {
364+
if len(parts)%2 != 0 {
365+
panic("invalid pairs")
366+
}
367+
368+
var ret []string
369+
for i := 0; i < len(parts)-1; i = i + 2 {
370+
ret = append(ret, fmt.Sprintf("%s:%q", parts[i], parts[i+1]))
371+
}
372+
373+
return ret
374+
}

‎go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
2020
github.com/klauspost/compress v1.17.6
2121
github.com/mattn/go-isatty v0.0.20
22-
github.com/nats-io/jsm.go v0.1.1-0.20240219122351-c1d05e68494f
22+
github.com/nats-io/jsm.go v0.1.1-0.20240220104900-2f16a88ec1ae
2323
github.com/nats-io/jwt/v2 v2.5.4
2424
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba
2525
github.com/nats-io/nats.go v1.33.1

‎go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
7676
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
7777
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
7878
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
79-
github.com/nats-io/jsm.go v0.1.1-0.20240219122351-c1d05e68494f h1:m82V3oI3pt6oJzBWbO0I2xgxM1U/irPS0GLAID5tqPw=
80-
github.com/nats-io/jsm.go v0.1.1-0.20240219122351-c1d05e68494f/go.mod h1:eZEYD+Sbv7Nd8Xr4kOt/reSwVRtNASzaFxU+oV9zp40=
79+
github.com/nats-io/jsm.go v0.1.1-0.20240220104900-2f16a88ec1ae h1:9Ja37hWVWUjgOEkq76tFVRZFhoSSh7ULZx7uoQ1jylI=
80+
github.com/nats-io/jsm.go v0.1.1-0.20240220104900-2f16a88ec1ae/go.mod h1:reLcqOlK/VTTB9S/FMSab6y581aeLIk2wGK20giiW9Q=
8181
github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI=
8282
github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
8383
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba h1:idIfFiRzXv2wHFpxHH4nSoPtg7UVr4vQPB1NraU0D4c=

0 commit comments

Comments
 (0)
Please sign in to comment.