Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for subject delete markers by MaxAge #6378

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type fileStore struct {
tombs []uint64
ld *LostStreamData
scb StorageUpdateHandler
sdmcb SubjectDeleteMarkerUpdateHandler
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
Expand Down Expand Up @@ -2131,7 +2132,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
break
}
// Can we remove whole block here?
if mb.last.ts <= minAge {
// TODO(nat): We can't do this with LimitsTTL as we have no way to know
// if we're throwing away real messages or other tombstones without
// loading them, so in this case we'll fall through to the "slow path".
// There might be a better way of handling this though.
if !mb.fs.cfg.SubjectDeleteMarkers && mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
deleteEmptyBlock(mb)
Expand Down Expand Up @@ -2198,7 +2203,17 @@ func (fs *fileStore) expireMsgsOnRecover() {
// Update fss
// Make sure we have fss loaded.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
if fs.removePerSubject(sm.subj) && fs.cfg.SubjectDeleteMarkers {
// Need to release the mb lock here in case we need to write a new
// tombstone into the same mb in subjectDeleteMarkerIfNeeded. However
// at this point fs.mu is held, so nothing else should happen here.
// No need to process the callbacks from subjectDeleteMarkerIfNeeded
// here as none will have been registered yet (we haven't yet returned
// from newFileStore*).
mb.mu.Unlock()
fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge)
mb.mu.Lock()
}
}
// Make sure we have a proper next first sequence.
if needNextFirst {
Expand Down Expand Up @@ -3612,6 +3627,13 @@ func (fs *fileStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
}
}

// RegisterSubjectDeleteMarkerUpdates registers a callback for updates to new tombstones.
func (fs *fileStore) RegisterSubjectDeleteMarkerUpdates(cb SubjectDeleteMarkerUpdateHandler) {
fs.mu.Lock()
fs.sdmcb = cb
fs.mu.Unlock()
}

// Helper to get hash key for specific message block.
// Lock should be held
func (fs *fileStore) hashKeyForBlock(index uint32) []byte {
Expand Down Expand Up @@ -4325,10 +4347,10 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
}

// Convenience function to remove per subject tracking at the filestore level.
// Lock should be held.
func (fs *fileStore) removePerSubject(subj string) {
// Lock should be held. Returns if we deleted the last message on the subject.
func (fs *fileStore) removePerSubject(subj string) bool {
if len(subj) == 0 || fs.psim == nil {
return
return false
}
// We do not update sense of fblk here but will do so when we resolve during lookup.
bsubj := stringToBytes(subj)
Expand All @@ -4339,9 +4361,11 @@ func (fs *fileStore) removePerSubject(subj string) {
} else if info.total == 0 {
if _, ok = fs.psim.Delete(bsubj); ok {
fs.tsl -= len(subj)
return true
}
}
}
return false
}

// Remove a message, optionally rewriting the mb file.
Expand Down Expand Up @@ -5295,6 +5319,52 @@ func (fs *fileStore) cancelAgeChk() {
}
}

// Lock must be held so that nothing else can interleave and write a
// new message on this subject before we get the chance to write the
// delete marker. If the delete marker is written successfully then
// this function returns a callback func to call scb and sdmcb after
// the lock has been released.
func (fs *fileStore) subjectDeleteMarkerIfNeeded(sm *StoreMsg, reason string) func() {
// If the deleted message was itself a delete marker then
// don't write out more of them or we'll churn endlessly.
if len(getHeader(JSAppliedLimit, sm.hdr)) != 0 {
return nil
}
if !fs.cfg.SubjectDeleteMarkers {
return nil
}
if _, ok := fs.psim.Find(stringToBytes(sm.subj)); ok {
// There are still messages left with this subject,
// therefore it wasn't the last message deleted.
return nil
}
// Build the subject delete marker. If no TTL is specified then
// we'll default to 15 minutes — by that time every possible condition
// should have cleared (i.e. ordered consumer timeout, client timeouts,
// route/gateway interruptions, even device/client restarts etc).
var ttl int64 = 60 * 15
if fs.cfg.SubjectDeleteMarkerTTL != _EMPTY_ {
ttl, _ = parseMessageTTL(fs.cfg.SubjectDeleteMarkerTTL)
}
var _hdr [128]byte
hdr := fmt.Appendf(_hdr[:0], "NATS/1.0\r\n%s: %s\r\n%s: %s\r\n\r\n", JSAppliedLimit, reason, JSMessageTTL, time.Duration(ttl)*time.Second)
seq, ts := fs.state.LastSeq+1, time.Now().UnixNano()
// Store it in the stream and then prepare the callbacks
// to return to the caller.
if err := fs.storeRawMsg(sm.subj, hdr, nil, seq, ts, ttl); err != nil {
return nil
}
cb, tcb := fs.scb, fs.sdmcb
return func() {
if cb != nil {
cb(1, int64(fileStoreMsgSize(sm.subj, hdr, nil)), seq, sm.subj)
}
if tcb != nil {
tcb(seq, sm.subj)
}
}
}

// Will expire msgs that are too old.
func (fs *fileStore) expireMsgs() {
// We need to delete one by one here and can not optimize for the time being.
Expand All @@ -5316,9 +5386,15 @@ func (fs *fileStore) expireMsgs() {
continue
}
}
// Remove the message and then, if LimitsTTL is enabled, try and work out
// if it was the last message of that particular subject that we just deleted.
fs.mu.Lock()
fs.removeMsgViaLimits(sm.seq)
cbs := fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge)
fs.mu.Unlock()
if cbs != nil {
cbs()
}
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}
Expand Down
123 changes: 123 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8562,3 +8562,126 @@ func TestFileStoreDontSpamCompactWhenMostlyTombstones(t *testing.T) {
fmb.bytes /= 2
require_True(t, fmb.shouldCompactInline())
}

func TestFileStoreSubjectDeleteMarkers(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s",
},
)
require_NoError(t, err)
defer fs.Stop()

// Store three messages that will expire because of MaxAge.
var seq uint64
for i := 0; i < 3; i++ {
seq, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)
}

// The last message should be gone after MaxAge has passed.
time.Sleep(time.Second * 2)
sm, err := fs.LoadMsg(seq, nil)
require_Error(t, err)
require_Equal(t, sm, nil)

// We should have replaced it with a tombstone.
sm, err = fs.LoadMsg(seq+1, nil)
require_NoError(t, err)
require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge)
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")

time.Sleep(time.Second * 2)

// The tombstone itself only has a TTL of 1 second so that should
// also be gone by now too. No more tombstones should have been
// published.
var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, sm.seq+1)
require_Equal(t, ss.LastSeq, sm.seq)
require_Equal(t, ss.Msgs, 0)
}

func TestFileStoreSubjectDeleteMarkersDefaultTTL(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true,
},
)
require_NoError(t, err)
defer fs.Stop()

// Store three messages that will expire because of MaxAge.
var seq uint64
for i := 0; i < 3; i++ {
seq, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)
}

// The last message should be gone after MaxAge has passed.
time.Sleep(time.Second * 2)
sm, err := fs.LoadMsg(seq, nil)
require_Error(t, err)
require_Equal(t, sm, nil)

// We should have replaced it with a tombstone.
sm, err = fs.LoadMsg(seq+1, nil)
require_NoError(t, err)
require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge)
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "15m0s")
}

func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) {
storeDir := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s",
},
)
require_NoError(t, err)
defer fs.Stop()

// Store three messages that will expire because of MaxAge.
var seq uint64
for i := 0; i < 3; i++ {
seq, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)
}

// Stop the store so that the expiry happens while it's technically
// offline. Then wait for at least MaxAge and then restart, which should
// hit the expireMsgsOnRecover path instead.
require_NoError(t, fs.Stop())
time.Sleep(time.Second * 2)
fs, err = newFileStore(
FileStoreConfig{StoreDir: storeDir},
StreamConfig{
Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage,
MaxAge: time.Second, AllowMsgTTL: true,
SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s",
},
)
require_NoError(t, err)
defer fs.Stop()

// The last message should be gone after MaxAge has passed.
sm, err := fs.LoadMsg(seq, nil)
require_Error(t, err)
require_Equal(t, sm, nil)

// We should have replaced it with a tombstone.
sm, err = fs.LoadMsg(seq+1, nil)
require_NoError(t, err)
require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge)
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
}
6 changes: 3 additions & 3 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5369,7 +5369,7 @@ func TestJetStreamClusterMessageTTLWhenSourcing(t *testing.T) {
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "1s")
hdr.Add(JSMessageTTL, "1s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Expand Down Expand Up @@ -5436,7 +5436,7 @@ func TestJetStreamClusterMessageTTLWhenMirroring(t *testing.T) {
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "1s")
hdr.Add(JSMessageTTL, "1s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Expand Down Expand Up @@ -5486,7 +5486,7 @@ func TestJetStreamClusterMessageTTLDisabled(t *testing.T) {
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "1s")
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)

Expand Down
9 changes: 7 additions & 2 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ func jsClientConnectURL(t testing.TB, url string, opts ...nats.Option) (*nats.Co
}

// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfig {
func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConfig, error) {
t.Helper()

j, err := json.Marshal(cfg)
Expand All @@ -1276,8 +1276,13 @@ func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfi

var resp JSApiStreamUpdateResponse
require_NoError(t, json.Unmarshal(msg.Data, &resp))

if resp.Error != nil {
return nil, resp.Error
}

require_NotNil(t, resp.StreamInfo)
return &resp.Config
return &resp.Config, nil
}

// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
Expand Down
Loading
Loading