Skip to content

Commit 3fefcf3

Browse files
[FIXED] Don't decrement pending count twice after ack (#6343)
Reverts the change made in #6297. The change did (somewhat) improve the reliability of the drifting tests if the `o.npc--` was done as a result of contention, but resulted in a regression if `o.npc--` was done twice when a message was acked that did not move the ack floor up for WorkQueue/Interest retention. This PR fixes what would otherwise have been a regression. We really should try to fix the race condition itself though (outside of this PR). Without fixing it the pending count can be incorrect with no way to be resolved unless all messages are consumed, or we'd need to manually recalculate. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 519943f + dd27b48 commit 3fefcf3

File tree

2 files changed

+66
-12
lines changed

2 files changed

+66
-12
lines changed

server/consumer.go

+5-8
Original file line numberDiff line numberDiff line change
@@ -5836,21 +5836,18 @@ func (o *consumer) requestNextMsgSubject() string {
58365836
func (o *consumer) decStreamPending(sseq uint64, subj string) {
58375837
o.mu.Lock()
58385838

5839+
// Update our cached num pending only if we think deliverMsg has not done so.
5840+
if sseq >= o.sseq && o.isFilteredMatch(subj) {
5841+
o.npc--
5842+
}
5843+
58395844
// Check if this message was pending.
58405845
p, wasPending := o.pending[sseq]
58415846
var rdc uint64 = 1
58425847
if o.rdc != nil {
58435848
rdc = o.rdc[sseq]
58445849
}
58455850

5846-
// Update our cached num pending only if we think deliverMsg has not done so.
5847-
// Either we have not reached the message yet, or we've hit the race condition
5848-
// when there is contention at the beginning of the stream. In which case we can
5849-
// only decrement if the ack floor is still low enough to be able to detect it.
5850-
if sseq > o.asflr && (sseq >= o.sseq || !wasPending) && o.isFilteredMatch(subj) {
5851-
o.npc--
5852-
}
5853-
58545851
o.mu.Unlock()
58555852

58565853
// If it was pending process it like an ack.

server/jetstream_test.go

+61-4
Original file line numberDiff line numberDiff line change
@@ -24742,7 +24742,7 @@ func TestJetStreamWouldExceedLimits(t *testing.T) {
2474224742
require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1))
2474324743
}
2474424744

24745-
func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
24745+
func TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg(t *testing.T) {
2474624746
s := RunBasicJetStreamServer(t)
2474724747
defer s.Shutdown()
2474824748

@@ -24768,7 +24768,7 @@ func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
2476824768
npc := o.npc
2476924769
o.mu.RUnlock()
2477024770
if npc != expected {
24771-
return fmt.Errorf("expected npc=%d, got %d", npc, expected)
24771+
return fmt.Errorf("expected npc=%d, got %d", expected, npc)
2477224772
}
2477324773
return nil
2477424774
})
@@ -24807,9 +24807,66 @@ func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
2480724807
o.decStreamPending(2, "foo")
2480824808
requireExpected(1)
2480924809

24810-
// This is the deleted message that was skipped, and we can decrement the pending count
24811-
// because it's not pending and only as long as the ack floor hasn't moved up yet.
24810+
// This is the deleted message that was skipped, we've hit the race condition and are not able to
24811+
// fix it at this point. If we decrement then we could have decremented it twice if the message
24812+
// was removed as a result of an Ack with Interest or WorkQueue retention, instead of due to contention.
2481224813
o.decStreamPending(3, "foo")
24814+
requireExpected(1)
24815+
}
24816+
24817+
func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) {
24818+
s := RunBasicJetStreamServer(t)
24819+
defer s.Shutdown()
24820+
24821+
nc, js := jsClientConnect(t, s)
24822+
defer nc.Close()
24823+
24824+
_, err := js.AddStream(&nats.StreamConfig{
24825+
Name: "TEST",
24826+
Subjects: []string{"foo"},
24827+
Retention: nats.WorkQueuePolicy,
24828+
})
24829+
require_NoError(t, err)
24830+
24831+
for i := 0; i < 2; i++ {
24832+
_, err = js.Publish("foo", nil)
24833+
require_NoError(t, err)
24834+
}
24835+
24836+
sub, err := js.PullSubscribe("foo", "CONSUMER")
24837+
require_NoError(t, err)
24838+
24839+
acc, err := s.lookupAccount(globalAccountName)
24840+
require_NoError(t, err)
24841+
mset, err := acc.lookupStream("TEST")
24842+
require_NoError(t, err)
24843+
o := mset.lookupConsumer("CONSUMER")
24844+
24845+
requireExpected := func(expected int64) {
24846+
t.Helper()
24847+
checkFor(t, time.Second, 10*time.Millisecond, func() error {
24848+
o.mu.RLock()
24849+
npc := o.npc
24850+
o.mu.RUnlock()
24851+
if npc != expected {
24852+
return fmt.Errorf("expected npc=%d, got %d", expected, npc)
24853+
}
24854+
return nil
24855+
})
24856+
}
24857+
24858+
// Expect 2 messages pending.
24859+
requireExpected(2)
24860+
24861+
// Fetch 2 messages and ack the last.
24862+
msgs, err := sub.Fetch(2)
24863+
require_NoError(t, err)
24864+
require_Len(t, len(msgs), 2)
24865+
msg := msgs[1]
24866+
err = msg.AckSync()
24867+
require_NoError(t, err)
24868+
24869+
// We've fetched 2 message so should report 0 pending.
2481324870
requireExpected(0)
2481424871
}
2481524872

0 commit comments

Comments
 (0)