Skip to content

Commit 7f92c34

Browse files
Stream ingest rate limiting (#5796)
Built on top of #5575. Tries to prevent the stream `msgs` queue from becoming overwhelmed by core NATS publishes. In this case, where a reply subject is known, the sender will receive a 429 "Too Many Requests". Otherwise it's rate-logged. Two new configuration options are added to the JetStream block: `max_buffered_size` and `max_buffered_msgs`. If not configured, defaults are used. Signed-off-by: Neil Twigg <[email protected]> Signed-off-by: Neil Twigg <[email protected]>
1 parent 69c57cb commit 7f92c34

File tree

3 files changed

+125
-7
lines changed

3 files changed

+125
-7
lines changed

server/jetstream_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -24253,3 +24253,79 @@ func TestJetStreamSourceRemovalAndReAdd(t *testing.T) {
2425324253
require_Equal(t, m.Subject, fmt.Sprintf("foo.%d", i))
2425424254
}
2425524255
}
24256+
24257+
func TestJetStreamRateLimitHighStreamIngest(t *testing.T) {
24258+
cfgFmt := []byte(fmt.Sprintf(`
24259+
jetstream: {
24260+
enabled: true
24261+
store_dir: %s
24262+
max_buffered_size: 1kb
24263+
max_buffered_msgs: 1
24264+
}
24265+
`, t.TempDir()))
24266+
24267+
conf := createConfFile(t, cfgFmt)
24268+
s, opts := RunServerWithConfig(conf)
24269+
defer s.Shutdown()
24270+
24271+
require_Equal(t, opts.StreamMaxBufferedSize, 1024)
24272+
require_Equal(t, opts.StreamMaxBufferedMsgs, 1)
24273+
24274+
nc, js := jsClientConnect(t, s)
24275+
defer nc.Close()
24276+
24277+
_, err := js.AddStream(&nats.StreamConfig{
24278+
Name: "TEST",
24279+
Subjects: []string{"test"},
24280+
})
24281+
require_NoError(t, err)
24282+
24283+
// Create a reply inbox that we can await API requests on.
24284+
// This is instead of using nc.Request().
24285+
inbox := nc.NewRespInbox()
24286+
resp := make(chan *nats.Msg, 1000)
24287+
_, err = nc.ChanSubscribe(inbox, resp)
24288+
require_NoError(t, err)
24289+
24290+
// Publish a large number of messages using Core NATS withou
24291+
// waiting for the responses from the API.
24292+
msg := &nats.Msg{
24293+
Subject: "test",
24294+
Reply: inbox,
24295+
}
24296+
for i := 0; i < 1000; i++ {
24297+
require_NoError(t, nc.PublishMsg(msg))
24298+
}
24299+
24300+
// Now sort through the API responses. We're looking for one
24301+
// that tells us that we were rate-limited. If we don't find
24302+
// one then we fail the test.
24303+
var rateLimited bool
24304+
for i, msg := 0, <-resp; i < 1000; i, msg = i+1, <-resp {
24305+
if msg.Header.Get("Status") == "429" {
24306+
rateLimited = true
24307+
break
24308+
}
24309+
}
24310+
require_True(t, rateLimited)
24311+
}
24312+
24313+
func TestJetStreamRateLimitHighStreamIngestDefaults(t *testing.T) {
24314+
s := RunBasicJetStreamServer(t)
24315+
defer s.Shutdown()
24316+
24317+
nc, js := jsClientConnect(t, s)
24318+
defer nc.Close()
24319+
24320+
_, err := js.AddStream(&nats.StreamConfig{
24321+
Name: "TEST",
24322+
Subjects: []string{"test"},
24323+
})
24324+
require_NoError(t, err)
24325+
24326+
stream, err := s.globalAccount().lookupStream("TEST")
24327+
require_NoError(t, err)
24328+
24329+
require_Equal(t, stream.msgs.mlen, streamDefaultMaxQueueMsgs)
24330+
require_Equal(t, stream.msgs.msz, streamDefaultMaxQueueBytes)
24331+
}

server/opts.go

+14
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ type Options struct {
331331
JetStreamLimits JSLimitOpts
332332
JetStreamTpm JSTpmOpts
333333
JetStreamMaxCatchup int64
334+
StreamMaxBufferedMsgs int `json:"-"`
335+
StreamMaxBufferedSize int64 `json:"-"`
334336
StoreDir string `json:"-"`
335337
SyncInterval time.Duration `json:"-"`
336338
SyncAlways bool `json:"-"`
@@ -2373,6 +2375,18 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
23732375
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
23742376
}
23752377
opts.JetStreamMaxCatchup = s
2378+
case "max_buffered_size":
2379+
s, err := getStorageSize(mv)
2380+
if err != nil {
2381+
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
2382+
}
2383+
opts.StreamMaxBufferedSize = s
2384+
case "max_buffered_msgs":
2385+
mlen, ok := mv.(int64)
2386+
if !ok {
2387+
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
2388+
}
2389+
opts.StreamMaxBufferedMsgs = int(mlen)
23762390
default:
23772391
if !tk.IsUsedVariable() {
23782392
err := &unknownConfigFieldErr{

server/stream.go

+35-7
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,12 @@ type ExternalStream struct {
218218
DeliverPrefix string `json:"deliver"`
219219
}
220220

221+
// For managing stream ingest.
222+
const (
223+
streamDefaultMaxQueueMsgs = 10_000
224+
streamDefaultMaxQueueBytes = 1024 * 1024 * 128
225+
)
226+
221227
// Stream is a jetstream stream of messages. When we receive a message internally destined
222228
// for a Stream we will direct link from the client to this structure.
223229
type stream struct {
@@ -576,6 +582,16 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
576582
c := s.createInternalJetStreamClient()
577583
ic := s.createInternalJetStreamClient()
578584

585+
// Work out the stream ingest limits.
586+
mlen := s.opts.StreamMaxBufferedMsgs
587+
msz := uint64(s.opts.StreamMaxBufferedSize)
588+
if mlen == 0 {
589+
mlen = streamDefaultMaxQueueMsgs
590+
}
591+
if msz == 0 {
592+
msz = streamDefaultMaxQueueBytes
593+
}
594+
579595
qpfx := fmt.Sprintf("[ACC:%s] stream '%s' ", a.Name, config.Name)
580596
mset := &stream{
581597
acc: a,
@@ -588,12 +604,18 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
588604
tier: tier,
589605
stype: cfg.Storage,
590606
consumers: make(map[string]*consumer),
591-
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
592-
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
593-
qch: make(chan struct{}),
594-
mqch: make(chan struct{}),
595-
uch: make(chan struct{}, 4),
596-
sch: make(chan struct{}, 1),
607+
msgs: newIPQueue[*inMsg](s, qpfx+"messages",
608+
ipqSizeCalculation(func(msg *inMsg) uint64 {
609+
return uint64(len(msg.hdr) + len(msg.msg) + len(msg.rply) + len(msg.subj))
610+
}),
611+
ipqLimitByLen[*inMsg](mlen),
612+
ipqLimitBySize[*inMsg](msz),
613+
),
614+
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
615+
qch: make(chan struct{}),
616+
mqch: make(chan struct{}),
617+
uch: make(chan struct{}, 4),
618+
sch: make(chan struct{}, 1),
597619
}
598620

599621
// Start our signaling routine to process consumers.
@@ -4156,7 +4178,13 @@ func (im *inMsg) returnToPool() {
41564178
func (mset *stream) queueInbound(ib *ipQueue[*inMsg], subj, rply string, hdr, msg []byte, si *sourceInfo, mt *msgTrace) {
41574179
im := inMsgPool.Get().(*inMsg)
41584180
im.subj, im.rply, im.hdr, im.msg, im.si, im.mt = subj, rply, hdr, msg, si, mt
4159-
ib.push(im)
4181+
if _, err := ib.push(im); err != nil {
4182+
mset.srv.RateLimitWarnf("Dropping messages due to excessive stream ingest rate on '%s' > '%s': %s", mset.acc.Name, mset.name(), err)
4183+
if rply != _EMPTY_ {
4184+
hdr := []byte("NATS/1.0 429 Too Many Requests\r\n\r\n")
4185+
mset.outq.send(newJSPubMsg(rply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
4186+
}
4187+
}
41604188
}
41614189

41624190
var dgPool = sync.Pool{

0 commit comments

Comments
 (0)