diff --git a/server/filestore.go b/server/filestore.go index 297d57cfc7b..73dd95269d7 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -174,6 +174,7 @@ type fileStore struct { tombs []uint64 ld *LostStreamData scb StorageUpdateHandler + sdmcb SubjectDeleteMarkerUpdateHandler ageChk *time.Timer syncTmr *time.Timer cfg FileStreamInfo @@ -2131,7 +2132,11 @@ func (fs *fileStore) expireMsgsOnRecover() { break } // Can we remove whole block here? - if mb.last.ts <= minAge { + // TODO(nat): We can't do this with LimitsTTL as we have no way to know + // if we're throwing away real messages or other tombstones without + // loading them, so in this case we'll fall through to the "slow path". + // There might be a better way of handling this though. + if !mb.fs.cfg.SubjectDeleteMarkers && mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes deleteEmptyBlock(mb) @@ -2198,7 +2203,17 @@ func (fs *fileStore) expireMsgsOnRecover() { // Update fss // Make sure we have fss loaded. mb.removeSeqPerSubject(sm.subj, seq) - fs.removePerSubject(sm.subj) + if fs.removePerSubject(sm.subj) && fs.cfg.SubjectDeleteMarkers { + // Need to release the mb lock here in case we need to write a new + // tombstone into the same mb in subjectDeleteMarkerIfNeeded. However + // at this point fs.mu is held, so nothing else should happen here. + // No need to process the callbacks from subjectDeleteMarkerIfNeeded + // here as none will have been registered yet (we haven't yet returned + // from newFileStore*). + mb.mu.Unlock() + fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge) + mb.mu.Lock() + } } // Make sure we have a proper next first sequence. if needNextFirst { @@ -3612,6 +3627,13 @@ func (fs *fileStore) RegisterStorageUpdates(cb StorageUpdateHandler) { } } +// RegisterSubjectDeleteMarkerUpdates registers a callback for updates to new tombstones. +func (fs *fileStore) RegisterSubjectDeleteMarkerUpdates(cb SubjectDeleteMarkerUpdateHandler) { + fs.mu.Lock() + fs.sdmcb = cb + fs.mu.Unlock() +} + // Helper to get hash key for specific message block. // Lock should be held func (fs *fileStore) hashKeyForBlock(index uint32) []byte { @@ -4325,10 +4347,10 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) { } // Convenience function to remove per subject tracking at the filestore level. -// Lock should be held. -func (fs *fileStore) removePerSubject(subj string) { +// Lock should be held. Returns if we deleted the last message on the subject. +func (fs *fileStore) removePerSubject(subj string) bool { if len(subj) == 0 || fs.psim == nil { - return + return false } // We do not update sense of fblk here but will do so when we resolve during lookup. bsubj := stringToBytes(subj) @@ -4339,9 +4361,11 @@ func (fs *fileStore) removePerSubject(subj string) { } else if info.total == 0 { if _, ok = fs.psim.Delete(bsubj); ok { fs.tsl -= len(subj) + return true } } } + return false } // Remove a message, optionally rewriting the mb file. @@ -5295,6 +5319,52 @@ func (fs *fileStore) cancelAgeChk() { } } +// Lock must be held so that nothing else can interleave and write a +// new message on this subject before we get the chance to write the +// delete marker. If the delete marker is written successfully then +// this function returns a callback func to call scb and sdmcb after +// the lock has been released. +func (fs *fileStore) subjectDeleteMarkerIfNeeded(sm *StoreMsg, reason string) func() { + // If the deleted message was itself a delete marker then + // don't write out more of them or we'll churn endlessly. + if len(getHeader(JSAppliedLimit, sm.hdr)) != 0 { + return nil + } + if !fs.cfg.SubjectDeleteMarkers { + return nil + } + if _, ok := fs.psim.Find(stringToBytes(sm.subj)); ok { + // There are still messages left with this subject, + // therefore it wasn't the last message deleted. + return nil + } + // Build the subject delete marker. If no TTL is specified then + // we'll default to 15 minutes — by that time every possible condition + // should have cleared (i.e. ordered consumer timeout, client timeouts, + // route/gateway interruptions, even device/client restarts etc). + var ttl int64 = 60 * 15 + if fs.cfg.SubjectDeleteMarkerTTL != _EMPTY_ { + ttl, _ = parseMessageTTL(fs.cfg.SubjectDeleteMarkerTTL) + } + var _hdr [128]byte + hdr := fmt.Appendf(_hdr[:0], "NATS/1.0\r\n%s: %s\r\n%s: %s\r\n\r\n", JSAppliedLimit, reason, JSMessageTTL, time.Duration(ttl)*time.Second) + seq, ts := fs.state.LastSeq+1, time.Now().UnixNano() + // Store it in the stream and then prepare the callbacks + // to return to the caller. + if err := fs.storeRawMsg(sm.subj, hdr, nil, seq, ts, ttl); err != nil { + return nil + } + cb, tcb := fs.scb, fs.sdmcb + return func() { + if cb != nil { + cb(1, int64(fileStoreMsgSize(sm.subj, hdr, nil)), seq, sm.subj) + } + if tcb != nil { + tcb(seq, sm.subj) + } + } +} + // Will expire msgs that are too old. func (fs *fileStore) expireMsgs() { // We need to delete one by one here and can not optimize for the time being. @@ -5316,9 +5386,15 @@ func (fs *fileStore) expireMsgs() { continue } } + // Remove the message and then, if LimitsTTL is enabled, try and work out + // if it was the last message of that particular subject that we just deleted. fs.mu.Lock() fs.removeMsgViaLimits(sm.seq) + cbs := fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge) fs.mu.Unlock() + if cbs != nil { + cbs() + } // Recalculate in case we are expiring a bunch. minAge = time.Now().UnixNano() - maxAge } diff --git a/server/filestore_test.go b/server/filestore_test.go index 2369891c7a9..4f9a761a0bc 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -8562,3 +8562,126 @@ func TestFileStoreDontSpamCompactWhenMostlyTombstones(t *testing.T) { fmb.bytes /= 2 require_True(t, fmb.shouldCompactInline()) } + +func TestFileStoreSubjectDeleteMarkers(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{ + Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s", + }, + ) + require_NoError(t, err) + defer fs.Stop() + + // Store three messages that will expire because of MaxAge. + var seq uint64 + for i := 0; i < 3; i++ { + seq, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + } + + // The last message should be gone after MaxAge has passed. + time.Sleep(time.Second * 2) + sm, err := fs.LoadMsg(seq, nil) + require_Error(t, err) + require_Equal(t, sm, nil) + + // We should have replaced it with a tombstone. + sm, err = fs.LoadMsg(seq+1, nil) + require_NoError(t, err) + require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") + + time.Sleep(time.Second * 2) + + // The tombstone itself only has a TTL of 1 second so that should + // also be gone by now too. No more tombstones should have been + // published. + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, sm.seq+1) + require_Equal(t, ss.LastSeq, sm.seq) + require_Equal(t, ss.Msgs, 0) +} + +func TestFileStoreSubjectDeleteMarkersDefaultTTL(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{ + Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkers: true, + }, + ) + require_NoError(t, err) + defer fs.Stop() + + // Store three messages that will expire because of MaxAge. + var seq uint64 + for i := 0; i < 3; i++ { + seq, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + } + + // The last message should be gone after MaxAge has passed. + time.Sleep(time.Second * 2) + sm, err := fs.LoadMsg(seq, nil) + require_Error(t, err) + require_Equal(t, sm, nil) + + // We should have replaced it with a tombstone. + sm, err = fs.LoadMsg(seq+1, nil) + require_NoError(t, err) + require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "15m0s") +} + +func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) { + storeDir := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{ + Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s", + }, + ) + require_NoError(t, err) + defer fs.Stop() + + // Store three messages that will expire because of MaxAge. + var seq uint64 + for i := 0; i < 3; i++ { + seq, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + } + + // Stop the store so that the expiry happens while it's technically + // offline. Then wait for at least MaxAge and then restart, which should + // hit the expireMsgsOnRecover path instead. + require_NoError(t, fs.Stop()) + time.Sleep(time.Second * 2) + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{ + Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s", + }, + ) + require_NoError(t, err) + defer fs.Stop() + + // The last message should be gone after MaxAge has passed. + sm, err := fs.LoadMsg(seq, nil) + require_Error(t, err) + require_Equal(t, sm, nil) + + // We should have replaced it with a tombstone. + sm, err = fs.LoadMsg(seq+1, nil) + require_NoError(t, err) + require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") +} diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 394a8e3d70a..f4071679190 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5369,7 +5369,7 @@ func TestJetStreamClusterMessageTTLWhenSourcing(t *testing.T) { }) hdr := nats.Header{} - hdr.Add("Nats-TTL", "1s") + hdr.Add(JSMessageTTL, "1s") _, err := js.PublishMsg(&nats.Msg{ Subject: "test", @@ -5436,7 +5436,7 @@ func TestJetStreamClusterMessageTTLWhenMirroring(t *testing.T) { }) hdr := nats.Header{} - hdr.Add("Nats-TTL", "1s") + hdr.Add(JSMessageTTL, "1s") _, err := js.PublishMsg(&nats.Msg{ Subject: "test", @@ -5486,7 +5486,7 @@ func TestJetStreamClusterMessageTTLDisabled(t *testing.T) { Header: nats.Header{}, } - msg.Header.Set("Nats-TTL", "1s") + msg.Header.Set(JSMessageTTL, "1s") _, err := js.PublishMsg(msg) require_Error(t, err) diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 86ed6675baa..cd6c9594a20 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1265,7 +1265,7 @@ func jsClientConnectURL(t testing.TB, url string, opts ...nats.Option) (*nats.Co } // jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet. -func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfig { +func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConfig, error) { t.Helper() j, err := json.Marshal(cfg) @@ -1276,8 +1276,13 @@ func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfi var resp JSApiStreamUpdateResponse require_NoError(t, json.Unmarshal(msg.Data, &resp)) + + if resp.Error != nil { + return nil, resp.Error + } + require_NotNil(t, resp.StreamInfo) - return &resp.Config + return &resp.Config, nil } // jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 26d6ebb6349..673e3b3282e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24890,7 +24890,7 @@ func TestJetStreamMessageTTL(t *testing.T) { } for i := 1; i <= 10; i++ { - msg.Header.Set("Nats-TTL", "1s") + msg.Header.Set(JSMessageTTL, "1s") _, err := js.PublishMsg(msg) require_NoError(t, err) } @@ -24930,7 +24930,7 @@ func TestJetStreamMessageTTLRestart(t *testing.T) { } for i := 1; i <= 10; i++ { - msg.Header.Set("Nats-TTL", "1s") + msg.Header.Set(JSMessageTTL, "1s") _, err := js.PublishMsg(msg) require_NoError(t, err) } @@ -24985,7 +24985,7 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) { } for i := 1; i <= 10; i++ { - msg.Header.Set("Nats-TTL", "1s") + msg.Header.Set(JSMessageTTL, "1s") _, err := js.PublishMsg(msg) require_NoError(t, err) } @@ -25042,11 +25042,11 @@ func TestJetStreamMessageTTLInvalid(t *testing.T) { Header: nats.Header{}, } - msg.Header.Set("Nats-TTL", "500ms") + msg.Header.Set(JSMessageTTL, "500ms") _, err := js.PublishMsg(msg) require_Error(t, err) - msg.Header.Set("Nats-TTL", "something") + msg.Header.Set(JSMessageTTL, "something") _, err = js.PublishMsg(msg) require_Error(t, err) } @@ -25096,12 +25096,12 @@ func TestJetStreamMessageTTLNeverExpire(t *testing.T) { // The first message we publish is set to "never expire", therefore it // won't age out with the MaxAge policy. - msg.Header.Set("Nats-TTL", "never") + msg.Header.Set(JSMessageTTL, "never") _, err := js.PublishMsg(msg) require_NoError(t, err) // Following messages will be published as normal and will age out. - msg.Header.Del("Nats-TTL") + msg.Header.Del(JSMessageTTL) for i := 1; i <= 10; i++ { _, err := js.PublishMsg(msg) require_NoError(t, err) @@ -25140,7 +25140,7 @@ func TestJetStreamMessageTTLDisabled(t *testing.T) { Header: nats.Header{}, } - msg.Header.Set("Nats-TTL", "1s") + msg.Header.Set(JSMessageTTL, "1s") _, err := js.PublishMsg(msg) require_Error(t, err) } @@ -25178,7 +25178,7 @@ func TestJetStreamMessageTTLWhenSourcing(t *testing.T) { }) hdr := nats.Header{} - hdr.Add("Nats-TTL", "1s") + hdr.Add(JSMessageTTL, "1s") _, err := js.PublishMsg(&nats.Msg{ Subject: "test", @@ -25209,7 +25209,7 @@ func TestJetStreamMessageTTLWhenSourcing(t *testing.T) { } } -func TestJetStreamMessageTTLSWhenMirroring(t *testing.T) { +func TestJetStreamMessageTTLWhenMirroring(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() @@ -25242,7 +25242,7 @@ func TestJetStreamMessageTTLSWhenMirroring(t *testing.T) { }) hdr := nats.Header{} - hdr.Add("Nats-TTL", "1s") + hdr.Add(JSMessageTTL, "1s") _, err := js.PublishMsg(&nats.Msg{ Subject: "test", @@ -25272,3 +25272,67 @@ func TestJetStreamMessageTTLSWhenMirroring(t *testing.T) { }) } } + +func TestJetStreamSubjectDeleteMarkers(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: FileStorage, + Subjects: []string{"test"}, + MaxAge: time.Second, + AllowMsgTTL: true, + SubjectDeleteMarkers: true, + SubjectDeleteMarkerTTL: "1s", + }) + + sub, err := js.SubscribeSync("test") + require_NoError(t, err) + + for i := 0; i < 3; i++ { + _, err = js.Publish("test", nil) + require_NoError(t, err) + } + + for i := 0; i < 3; i++ { + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_NoError(t, msg.AckSync()) + } + + msg, err := sub.NextMsg(time.Second * 10) + require_NoError(t, err) + require_Equal(t, msg.Header.Get(JSAppliedLimit), "MaxAge") + require_Equal(t, msg.Header.Get(JSMessageTTL), "1s") +} + +func TestJetStreamSubjectDeleteMarkersWithMirror(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "Origin", + Storage: FileStorage, + Subjects: []string{"test"}, + MaxAge: time.Second, + }) + require_NoError(t, err) + + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "Mirror", + Storage: FileStorage, + AllowMsgTTL: true, + SubjectDeleteMarkers: true, + Mirror: &StreamSource{ + Name: "Origin", + }, + }) + require_Error(t, err) +} diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 28a3bb17c97..fd6d3bff90d 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -45,7 +45,7 @@ func setStaticStreamMetadata(cfg *StreamConfig, _ *StreamConfig) { } // TTLs were added in v2.11 and require API level 1. - if cfg.AllowMsgTTL { + if cfg.AllowMsgTTL || cfg.SubjectDeleteMarkers { requires(1) } diff --git a/server/memstore.go b/server/memstore.go index 5f76fed8fd5..b9217c0fd1f 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -36,6 +36,7 @@ type memStore struct { dmap avl.SequenceSet maxp int64 scb StorageUpdateHandler + tcb SubjectDeleteMarkerUpdateHandler ageChk *time.Timer consumers int receivedAny bool @@ -312,6 +313,13 @@ func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) { ms.mu.Unlock() } +// RegisterSubjectDeleteMarkerUpdates registers a callback for updates to new subject delete markers. +func (ms *memStore) RegisterSubjectDeleteMarkerUpdates(cb SubjectDeleteMarkerUpdateHandler) { + ms.mu.Lock() + ms.tcb = cb + ms.mu.Unlock() +} + // GetSeqFromTime looks for the first sequence number that has the message // with >= timestamp. // FIXME(dlc) - inefficient. diff --git a/server/store.go b/server/store.go index 2e148dc0731..ca0dc18a6c5 100644 --- a/server/store.go +++ b/server/store.go @@ -83,6 +83,9 @@ type StoreMsg struct { // For the cases where its a single message we will also supply sequence number and subject. type StorageUpdateHandler func(msgs, bytes int64, seq uint64, subj string) +// Used to call back into the upper layers to report on newly created subject delete markers. +type SubjectDeleteMarkerUpdateHandler func(seq uint64, subj string) + type StreamStore interface { StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error) StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64, ttl int64) error @@ -111,6 +114,7 @@ type StreamStore interface { SyncDeleted(dbs DeleteBlocks) Type() StorageType RegisterStorageUpdates(StorageUpdateHandler) + RegisterSubjectDeleteMarkerUpdates(SubjectDeleteMarkerUpdateHandler) UpdateConfig(cfg *StreamConfig) error Delete() error Stop() error diff --git a/server/stream.go b/server/stream.go index 39a212e3a4f..a6eab08c3f6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -104,6 +104,14 @@ type StreamConfig struct { // then the `NATS-TTL` header will be ignored. AllowMsgTTL bool `json:"allow_msg_ttl"` + // SubjectDeleteMarkers enables leaving a delete marker when the last message + // for a subject was deleted. + SubjectDeleteMarkers bool `json:"subject_delete_markers,omitempty"` + + // SubjectDeleteMarkerTTL sets the TTL of delete marker messages left behind by + // SubjectDeleteMarkers. + SubjectDeleteMarkerTTL string `json:"subject_delete_marker_ttl,omitempty"` + // Metadata is additional metadata for the Stream. Metadata map[string]string `json:"metadata,omitempty"` } @@ -418,7 +426,7 @@ const ( JSMsgSize = "Nats-Msg-Size" JSResponseType = "Nats-Response-Type" JSMessageTTL = "Nats-TTL" - JSMessageNoExpire = "Nats-No-Expire" + JSAppliedLimit = "Nats-Applied-Limit" ) // Headers for republished messages and direct gets. @@ -438,6 +446,11 @@ const ( JSMsgRollupAll = "all" ) +// Applied limits in the Nats-Applied-Limit header. +const ( + JSAppliedLimitMaxAge = "MaxAge" +) + const ( jsCreateResponse = "create" ) @@ -1353,6 +1366,23 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } } + if cfg.SubjectDeleteMarkers { + if cfg.SubjectDeleteMarkerTTL != _EMPTY_ && !cfg.AllowMsgTTL { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject marker delete cannot be set if message TTLs are disabled")) + } + ttl, err := parseMessageTTL(cfg.SubjectDeleteMarkerTTL) + if err != nil { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("invalid subject marker delete TTL: %s", err)) + } + if ttl < 1 { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subject marker delete TTL must be at least 1 second")) + } + } else { + if cfg.SubjectDeleteMarkerTTL != _EMPTY_ { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("limits tombstones TTL requires limits tombstones to be enabled")) + } + } + getStream := func(streamName string) (bool, StreamConfig) { var exists bool var cfg StreamConfig @@ -1395,6 +1425,12 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo if cfg.Mirror.FilterSubject != _EMPTY_ && len(cfg.Mirror.SubjectTransforms) != 0 { return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError() } + if cfg.SubjectDeleteMarkers { + // LimitsTTL cannot be configured on a mirror as it would result in new + // tombstones which would use up sequence numbers, diverging from the origin + // stream. + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("limits tombstones forbidden on mirrors")) + } // Check subject filters overlap. for outer, tr := range cfg.Mirror.SubjectTransforms { if tr.Source != _EMPTY_ && !IsValidSubject(tr.Source) { @@ -4055,6 +4091,9 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { } // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates) + mset.store.RegisterSubjectDeleteMarkerUpdates(func(seq uint64, subj string) { + mset.signalConsumers(subj, seq) + }) mset.mu.Unlock() return nil @@ -4217,7 +4256,7 @@ func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string { return string(getHeader(JSExpectedLastSubjSeqSubj, hdr)) } -// Fast lookup of the message TTL: +// Fast lookup of the message TTL from headers: // - Positive return value: duration in seconds. // - Zero return value: no TTL or parse error. // - Negative return value: never expires. @@ -4226,18 +4265,24 @@ func getMessageTTL(hdr []byte) (int64, error) { if len(ttl) == 0 { return 0, nil } - sttl := bytesToString(ttl) - if strings.ToLower(sttl) == "never" { + return parseMessageTTL(bytesToString(ttl)) +} + +// - Positive return value: duration in seconds. +// - Zero return value: no TTL or parse error. +// - Negative return value: never expires. +func parseMessageTTL(ttl string) (int64, error) { + if strings.ToLower(ttl) == "never" { return -1, nil } - dur, err := time.ParseDuration(sttl) + dur, err := time.ParseDuration(ttl) if err == nil { if dur < time.Second { return 0, NewJSMessageTTLInvalidError() } return int64(dur.Seconds()), nil } - t := parseInt64(ttl) + t := parseInt64(stringToBytes(ttl)) if t < 0 { // This probably means a parse failure, hence why // we have a special case "never" for returning -1.