Skip to content

Commit f981ac3

Browse files
[FIXED] Handle recreating file-based stream to be memory on meta recovery (#6069)
For the following scenario: 1. create file-based R3 stream 2. delete stream 3. create memory-based R3 stream 4. add a consumer The call to `js.processStreamAssignment(sa)` even while we're recovering would mean that on meta recovery we'd first create the file-based stream, then create the memory-based stream and fail to do so since we can't change storage types. Which then leaves us stranded with 2 nodes having a memory-based stream, and that one node with a file-based stream. This change proposes to have the stream additions be consistent with the other collecting of state into `ru *recoveryUpdates` before applying, and waiting until recovery is finished to do so. Signed-off-by: Maurice van Veen <[email protected]> Co-authored-by: Neil Twigg <[email protected]>
2 parents f404ea2 + a4884a1 commit f981ac3

File tree

2 files changed

+84
-3
lines changed

2 files changed

+84
-3
lines changed

Diff for: server/jetstream_cluster.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,7 @@ func (js *jetStream) isMetaRecovering() bool {
11361136
type recoveryUpdates struct {
11371137
removeStreams map[string]*streamAssignment
11381138
removeConsumers map[string]map[string]*consumerAssignment
1139+
addStreams map[string]*streamAssignment
11391140
updateStreams map[string]*streamAssignment
11401141
updateConsumers map[string]map[string]*consumerAssignment
11411142
}
@@ -1343,6 +1344,7 @@ func (js *jetStream) monitorCluster() {
13431344
ru := &recoveryUpdates{
13441345
removeStreams: make(map[string]*streamAssignment),
13451346
removeConsumers: make(map[string]map[string]*consumerAssignment),
1347+
addStreams: make(map[string]*streamAssignment),
13461348
updateStreams: make(map[string]*streamAssignment),
13471349
updateConsumers: make(map[string]map[string]*consumerAssignment),
13481350
}
@@ -1381,6 +1383,10 @@ func (js *jetStream) monitorCluster() {
13811383
for _, sa := range ru.removeStreams {
13821384
js.processStreamRemoval(sa)
13831385
}
1386+
// Process stream additions.
1387+
for _, sa := range ru.addStreams {
1388+
js.processStreamAssignment(sa)
1389+
}
13841390
// Process pending updates.
13851391
for _, sa := range ru.updateStreams {
13861392
js.processUpdateStreamAssignment(sa)
@@ -1637,6 +1643,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
16371643
key := sa.recoveryKey()
16381644
ru.removeStreams[key] = sa
16391645
delete(ru.updateConsumers, key)
1646+
delete(ru.addStreams, key)
16401647
delete(ru.updateStreams, key)
16411648
} else {
16421649
js.processStreamRemoval(sa)
@@ -1661,6 +1668,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
16611668
if isRecovering {
16621669
key := sa.recoveryKey()
16631670
ru.updateStreams[key] = sa
1671+
delete(ru.addStreams, key)
16641672
delete(ru.removeStreams, key)
16651673
} else {
16661674
js.processUpdateStreamAssignment(sa)
@@ -1945,9 +1953,10 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
19451953
}
19461954
if isRecovering {
19471955
js.setStreamAssignmentRecovering(sa)
1948-
delete(ru.removeStreams, sa.recoveryKey())
1949-
}
1950-
if js.processStreamAssignment(sa) {
1956+
key := sa.recoveryKey()
1957+
ru.addStreams[key] = sa
1958+
delete(ru.removeStreams, key)
1959+
} else if js.processStreamAssignment(sa) {
19511960
didRemoveStream = true
19521961
}
19531962
case removeStreamOp:
@@ -1960,6 +1969,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
19601969
js.setStreamAssignmentRecovering(sa)
19611970
key := sa.recoveryKey()
19621971
ru.removeStreams[key] = sa
1972+
delete(ru.addStreams, key)
19631973
delete(ru.updateStreams, key)
19641974
delete(ru.updateConsumers, key)
19651975
} else {
@@ -2031,6 +2041,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
20312041
js.setStreamAssignmentRecovering(sa)
20322042
key := sa.recoveryKey()
20332043
ru.updateStreams[key] = sa
2044+
delete(ru.addStreams, key)
20342045
delete(ru.removeStreams, key)
20352046
} else {
20362047
js.processUpdateStreamAssignment(sa)

Diff for: server/jetstream_cluster_1_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -6527,6 +6527,7 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) {
65276527
ru := &recoveryUpdates{
65286528
removeStreams: make(map[string]*streamAssignment),
65296529
removeConsumers: make(map[string]map[string]*consumerAssignment),
6530+
addStreams: make(map[string]*streamAssignment),
65306531
updateStreams: make(map[string]*streamAssignment),
65316532
updateConsumers: make(map[string]map[string]*consumerAssignment),
65326533
}
@@ -6544,6 +6545,75 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) {
65446545
require_Len(t, len(ru.updateConsumers), 0)
65456546
}
65466547

6548+
func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) {
6549+
c := createJetStreamClusterExplicit(t, "R3S", 3)
6550+
defer c.shutdown()
6551+
6552+
js := c.leader().getJetStream()
6553+
6554+
createFileStream := []*Entry{
6555+
{EntryNormal, encodeAddStreamAssignment(&streamAssignment{
6556+
Config: &StreamConfig{Name: "TEST", Storage: FileStorage},
6557+
})},
6558+
}
6559+
6560+
deleteFileStream := []*Entry{
6561+
{EntryNormal, encodeDeleteStreamAssignment(&streamAssignment{
6562+
Config: &StreamConfig{Name: "TEST", Storage: FileStorage},
6563+
})},
6564+
}
6565+
6566+
createMemoryStream := []*Entry{
6567+
{EntryNormal, encodeAddStreamAssignment(&streamAssignment{
6568+
Config: &StreamConfig{Name: "TEST", Storage: FileStorage},
6569+
})},
6570+
}
6571+
6572+
createConsumer := []*Entry{
6573+
{EntryNormal, encodeAddConsumerAssignment(&consumerAssignment{
6574+
Stream: "TEST",
6575+
Config: &ConsumerConfig{Name: "consumer"},
6576+
})},
6577+
}
6578+
6579+
// Need to be recovering so that we accumulate recoveryUpdates.
6580+
js.setMetaRecovering()
6581+
ru := &recoveryUpdates{
6582+
removeStreams: make(map[string]*streamAssignment),
6583+
removeConsumers: make(map[string]map[string]*consumerAssignment),
6584+
addStreams: make(map[string]*streamAssignment),
6585+
updateStreams: make(map[string]*streamAssignment),
6586+
updateConsumers: make(map[string]map[string]*consumerAssignment),
6587+
}
6588+
6589+
// We created a file-based stream first, but deleted it shortly after.
6590+
_, _, _, err := js.applyMetaEntries(createFileStream, ru)
6591+
require_NoError(t, err)
6592+
require_Len(t, len(ru.addStreams), 1)
6593+
require_Len(t, len(ru.removeStreams), 0)
6594+
6595+
// Now push another recovery entry that deletes the stream.
6596+
// The file-based stream should not have been created.
6597+
_, _, _, err = js.applyMetaEntries(deleteFileStream, ru)
6598+
require_NoError(t, err)
6599+
require_Len(t, len(ru.addStreams), 0)
6600+
require_Len(t, len(ru.removeStreams), 1)
6601+
6602+
// Now stage a memory-based stream to be created.
6603+
_, _, _, err = js.applyMetaEntries(createMemoryStream, ru)
6604+
require_NoError(t, err)
6605+
require_Len(t, len(ru.addStreams), 1)
6606+
require_Len(t, len(ru.removeStreams), 0)
6607+
require_Len(t, len(ru.updateConsumers), 0)
6608+
6609+
// Also create a consumer on that memory-based stream.
6610+
_, _, _, err = js.applyMetaEntries(createConsumer, ru)
6611+
require_NoError(t, err)
6612+
require_Len(t, len(ru.addStreams), 1)
6613+
require_Len(t, len(ru.removeStreams), 0)
6614+
require_Len(t, len(ru.updateConsumers), 1)
6615+
}
6616+
65476617
//
65486618
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
65496619
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

0 commit comments

Comments
 (0)