Skip to content

Commit 405c4dd

Browse files
[FIXED] Don't decrement pending count twice after ack
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 519943f commit 405c4dd

File tree

2 files changed

+66
-12
lines changed

2 files changed

+66
-12
lines changed

server/consumer.go

+5-8
Original file line numberDiff line numberDiff line change
@@ -5836,21 +5836,18 @@ func (o *consumer) requestNextMsgSubject() string {
58365836
func (o *consumer) decStreamPending(sseq uint64, subj string) {
58375837
o.mu.Lock()
58385838

5839+
// Update our cached num pending only if we think deliverMsg has not done so.
5840+
if sseq >= o.sseq && o.isFilteredMatch(subj) {
5841+
o.npc--
5842+
}
5843+
58395844
// Check if this message was pending.
58405845
p, wasPending := o.pending[sseq]
58415846
var rdc uint64 = 1
58425847
if o.rdc != nil {
58435848
rdc = o.rdc[sseq]
58445849
}
58455850

5846-
// Update our cached num pending only if we think deliverMsg has not done so.
5847-
// Either we have not reached the message yet, or we've hit the race condition
5848-
// when there is contention at the beginning of the stream. In which case we can
5849-
// only decrement if the ack floor is still low enough to be able to detect it.
5850-
if sseq > o.asflr && (sseq >= o.sseq || !wasPending) && o.isFilteredMatch(subj) {
5851-
o.npc--
5852-
}
5853-
58545851
o.mu.Unlock()
58555852

58565853
// If it was pending process it like an ack.

server/jetstream_test.go

+61-4
Original file line numberDiff line numberDiff line change
@@ -24742,7 +24742,7 @@ func TestJetStreamWouldExceedLimits(t *testing.T) {
2474224742
require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1))
2474324743
}
2474424744

24745-
func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
24745+
func TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg(t *testing.T) {
2474624746
s := RunBasicJetStreamServer(t)
2474724747
defer s.Shutdown()
2474824748

@@ -24768,7 +24768,7 @@ func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
2476824768
npc := o.npc
2476924769
o.mu.RUnlock()
2477024770
if npc != expected {
24771-
return fmt.Errorf("expected npc=%d, got %d", npc, expected)
24771+
return fmt.Errorf("expected npc=%d, got %d", expected, npc)
2477224772
}
2477324773
return nil
2477424774
})
@@ -24807,9 +24807,66 @@ func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
2480724807
o.decStreamPending(2, "foo")
2480824808
requireExpected(1)
2480924809

24810-
// This is the deleted message that was skipped, and we can decrement the pending count
24811-
// because it's not pending and only as long as the ack floor hasn't moved up yet.
24810+
// This is the deleted message that was skipped, we've hit the race condition and are not able to
24811+
// fix it at this point. If we decrement then we could have decremented it twice if the message
24812+
// was removed as a result of an Ack with Interest or WorkQueue retention, instead of due to contention.
2481224813
o.decStreamPending(3, "foo")
24814+
requireExpected(1)
24815+
}
24816+
24817+
func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) {
24818+
s := RunBasicJetStreamServer(t)
24819+
defer s.Shutdown()
24820+
24821+
nc, js := jsClientConnect(t, s)
24822+
defer nc.Close()
24823+
24824+
_, err := js.AddStream(&nats.StreamConfig{
24825+
Name: "TEST",
24826+
Subjects: []string{"foo"},
24827+
Retention: nats.WorkQueuePolicy,
24828+
})
24829+
require_NoError(t, err)
24830+
24831+
for i := 0; i < 2; i++ {
24832+
_, err = js.Publish("foo", nil)
24833+
require_NoError(t, err)
24834+
}
24835+
24836+
sub, err := js.PullSubscribe("foo", "CONSUMER")
24837+
require_NoError(t, err)
24838+
24839+
acc, err := s.lookupAccount(globalAccountName)
24840+
require_NoError(t, err)
24841+
mset, err := acc.lookupStream("TEST")
24842+
require_NoError(t, err)
24843+
o := mset.lookupConsumer("CONSUMER")
24844+
24845+
requireExpected := func(expected int64) {
24846+
t.Helper()
24847+
checkFor(t, time.Second, 10*time.Millisecond, func() error {
24848+
o.mu.RLock()
24849+
npc := o.npc
24850+
o.mu.RUnlock()
24851+
if npc != expected {
24852+
return fmt.Errorf("expected npc=%d, got %d", expected, npc)
24853+
}
24854+
return nil
24855+
})
24856+
}
24857+
24858+
// Expect 2 messages pending.
24859+
requireExpected(2)
24860+
24861+
// Fetch 2 messages and ack the last.
24862+
msgs, err := sub.Fetch(2)
24863+
require_NoError(t, err)
24864+
require_Len(t, len(msgs), 2)
24865+
msg := msgs[1]
24866+
err = msg.AckSync()
24867+
require_NoError(t, err)
24868+
24869+
// We've fetched 2 message so should report 0 pending.
2481324870
requireExpected(0)
2481424871
}
2481524872

0 commit comments

Comments
 (0)