Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.19-RC.2 #5718

Merged
merged 16 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion scripts/runTestsOnTravis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ elif [ "$1" = "srv_pkg_non_js_tests" ]; then
# by using `skip_js_tests`, MQTT tests by using `skip_mqtt_tests` and
# message tracing tests by using `skip_msgtrace_tests`.

go test -race -v -p=1 ./server/... -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast
# Also including the ldflag with the version since this includes the `TestVersionMatchesTag`.
go test -race -v -p=1 ./server/... -ldflags="-X=github.com/nats-io/nats-server/v2/server.serverVersion=$TRAVIS_TAG" -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast

elif [ "$1" = "non_srv_pkg_tests" ]; then

Expand Down
4 changes: 3 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5436,7 +5436,9 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {

if !ok {
if c.kind == ROUTER && len(c.route.accName) > 0 {
acc = c.acc
if acc = c.acc; acc == nil {
return nil, nil
}
} else {
// Match correct account and sublist.
if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil {
Expand Down
26 changes: 25 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2898,6 +2898,28 @@ func (o *consumer) isFiltered() bool {
return false
}

// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}

// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
Expand Down Expand Up @@ -5499,7 +5521,9 @@ func (o *consumer) checkStateForInterestStream() error {
}

for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
mset.ackMsg(o, seq)
if o.matchAck(seq) {
mset.ackMsg(o, seq)
}
}

o.mu.RLock()
Expand Down
10 changes: 9 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,11 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
if filter == _EMPTY_ {
filter = fwcs
wc = true
}

start, stop := uint32(math.MaxUint32), uint32(0)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
Expand Down Expand Up @@ -6424,7 +6429,10 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Nothing found in this block. We missed, if first block (bi) check psim.
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
// We should not do this at all if we are already on the last block.
// Also if we are a wildcard do not check if large subject space.
const wcMaxSizeToCheck = 64 * 1024
if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) {
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if nbi < 0 || lbi <= bi {
Expand Down
53 changes: 53 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7248,6 +7248,28 @@ func TestFileStoreCheckSkipFirstBlockBug(t *testing.T) {
require_NoError(t, err)
}

// https://github.com/nats-io/nats-server/issues/5705
func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")
// Create 4 blocks, each block holds 2 msgs
for i := 0; i < 4; i++ {
fs.StoreMsg("foo.22.bar", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)

nbi, lbi := fs.checkSkipFirstBlock(_EMPTY_, false)
require_Equal(t, nbi, 0)
require_Equal(t, lbi, 3)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -7506,6 +7528,37 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
}
}

func Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)
// Add in a bunch of msgs.
// We need to make sure we have a range of subjects that could kick in a linear scan.
for i := 0; i < 1_000_000; i++ {
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
fs.StoreMsg(subj, nil, msg)
}
// Make last msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 999_990, &smv)
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
Expand Down
164 changes: 164 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1839,3 +1840,166 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) {
}
}
}

// https://github.com/nats-io/nats-server/pull/5600
func TestJetStreamClusterConsumerLeak(t *testing.T) {
N := 2000 // runs in under 10s, but significant enough to see the difference.
NConcurrent := 100

clusterConf := `
listen: 127.0.0.1:-1

server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

leafnodes {
listen: 127.0.0.1:-1
}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3)
defer cl.shutdown()
cl.waitOnLeader()

s := cl.randomNonLeader()

// Create the test stream.
streamName := "LEAK_TEST_STREAM"
nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p"))
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"$SOMETHING.>"},
Storage: nats.FileStorage,
Retention: nats.InterestPolicy,
Replicas: 3,
})
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}

concurrent := make(chan struct{}, NConcurrent)
for i := 0; i < NConcurrent; i++ {
concurrent <- struct{}{}
}
errors := make(chan error, N)

wg := sync.WaitGroup{}
wg.Add(N)

// Gather the stats for comparison.
before := &runtime.MemStats{}
runtime.GC()
runtime.ReadMemStats(before)

for i := 0; i < N; {
// wait for a slot to open up
<-concurrent
i++
go func() {
defer func() {
concurrent <- struct{}{}
wg.Done()
}()

nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p"))
defer nc.Close()

consumerName := "sessid_" + nuid.Next()
_, err := js.AddConsumer(streamName, &nats.ConsumerConfig{
DeliverSubject: "inbox",
Durable: consumerName,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverNewPolicy,
FilterSubject: "$SOMETHING.ELSE.subject",
AckWait: 30 * time.Second,
MaxAckPending: 1024,
})
if err != nil {
errors <- fmt.Errorf("Error on JetStream consumer creation: %v", err)
return
}

err = js.DeleteConsumer(streamName, consumerName)
if err != nil {
errors <- fmt.Errorf("Error on JetStream consumer deletion: %v", err)
}
}()
}

wg.Wait()
if len(errors) > 0 {
for err := range errors {
t.Fatalf("%v", err)
}
}

after := &runtime.MemStats{}
runtime.GC()
runtime.ReadMemStats(after)

// Before https://github.com/nats-io/nats-server/pull/5600 this test was
// adding 180Mb+ to HeapInuse. Now it's under 40Mb (ran locally on a Mac)
limit := before.HeapInuse + 100*1024*1024 // 100MB
if after.HeapInuse > before.HeapInuse+limit {
t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse)
}
}

func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "wq_stream",
Subjects: []string{"something.>"},
Storage: nats.FileStorage,
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
n := (i % 5) + 1
_, err := js.Publish(fmt.Sprintf("something.%d", n), nil)
require_NoError(t, err)
}

sub, err := js.PullSubscribe(
"something.5",
"wq_consumer_5",
nats.BindStream("wq_stream"),
nats.ConsumerReplicas(3),
)
require_NoError(t, err)

for {
msgs, _ := sub.Fetch(5)
if len(msgs) == 0 {
break
}
for _, msg := range msgs {
require_NoError(t, msg.AckSync())
}
}

si, err := js.StreamInfo("wq_stream")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 80)
require_Equal(t, si.State.NumDeleted, 20)
require_Equal(t, si.State.NumSubjects, 4)
}
1 change: 1 addition & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4258,6 +4258,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
state = mset.state()
mset.delete()

req, _ = json.Marshal(rreq)
rmsg, err = nc2.Request(strings.ReplaceAll(JSApiStreamRestoreT, JSApiPrefix, "$JS.domain.API"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
Expand Down
Loading