Skip to content

Commit

Permalink
Stream message proposal flags, allow sourcing/mirroring to preserve T…
Browse files Browse the repository at this point in the history
…TL headers (#6376)

This adds new flags to the stream message proposals so that we can
distinguish how a message entered a stream, i.e. due to a publish, or
because of a sourcing/mirroring arrangement.

Continue to reject TTL'd messages if they are published to a stream with
TTLs disabled, but allow us to retain them in a sourcing/mirroring
scenario.

Add tests to prove that TTLs being disabled and enabled on
sourcing/mirroring configurations does the right thing.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Jan 15, 2025
2 parents 74f005d + 1f04395 commit 3616832
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 57 deletions.
66 changes: 41 additions & 25 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2979,7 +2979,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
}

subject, reply, hdr, msg, lseq, ts, err := decodeStreamMsg(mbuf)
subject, reply, hdr, msg, lseq, ts, sourced, err := decodeStreamMsg(mbuf)
if err != nil {
if node := mset.raftNode(); node != nil {
s.Errorf("JetStream cluster could not decode stream msg for '%s > %s' [%s]",
Expand Down Expand Up @@ -3036,7 +3036,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
mt = mset.getAndDeleteMsgTrace(lseq)
}
// Process the actual message here.
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt)
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced)

// If we have inflight make sure to clear after processing.
// TODO(dlc) - technically check on inflight != nil could cause datarace.
Expand Down Expand Up @@ -3069,7 +3069,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if state.Msgs == 0 {
mset.store.Compact(lseq + 1)
// Retry
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt)
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced)
}
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
// and what we got.
Expand Down Expand Up @@ -7696,10 +7696,10 @@ func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error)

var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg")

func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq uint64, ts int64, err error) {
func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool, err error) {
var le = binary.LittleEndian
if len(buf) < 26 {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
lseq = le.Uint64(buf)
buf = buf[8:]
Expand All @@ -7708,44 +7708,49 @@ func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq u
sl := int(le.Uint16(buf))
buf = buf[2:]
if len(buf) < sl {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
subject = string(buf[:sl])
buf = buf[sl:]
if len(buf) < 2 {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
rl := int(le.Uint16(buf))
buf = buf[2:]
if len(buf) < rl {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
reply = string(buf[:rl])
buf = buf[rl:]
if len(buf) < 2 {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
hl := int(le.Uint16(buf))
buf = buf[2:]
if len(buf) < hl {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
if hdr = buf[:hl]; len(hdr) == 0 {
hdr = nil
}
buf = buf[hl:]
if len(buf) < 4 {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
ml := int(le.Uint32(buf))
buf = buf[4:]
if len(buf) < ml {
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg
return _EMPTY_, _EMPTY_, nil, nil, 0, 0, false, errBadStreamMsg
}
if msg = buf[:ml]; len(msg) == 0 {
msg = nil
}
return subject, reply, hdr, msg, lseq, ts, nil
buf = buf[ml:]
if len(buf) > 0 {
flags, _ := binary.Uvarint(buf)
sourced = flags&msgFlagFromSourceOrMirror != 0
}
return subject, reply, hdr, msg, lseq, ts, sourced, nil
}

// Helper to return if compression allowed.
Expand All @@ -7755,16 +7760,21 @@ func (mset *stream) compressAllowed() bool {
return mset.compressOK
}

func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) []byte {
return encodeStreamMsgAllowCompress(subject, reply, hdr, msg, lseq, ts, false)
// Flags for encodeStreamMsg/decodeStreamMsg.
const (
msgFlagFromSourceOrMirror uint64 = 1 << iota
)

func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool) []byte {
return encodeStreamMsgAllowCompress(subject, reply, hdr, msg, lseq, ts, sourced, false)
}

// Threshold for compression.
// TODO(dlc) - Eventually make configurable.
const compressThreshold = 8192 // 8k

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte {
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool, compressOK bool) []byte {
// Clip the subject, reply, header and msgs down. Operate on
// uint64 lengths to avoid overflowing.
slen := min(uint64(len(subject)), math.MaxUint16)
Expand All @@ -7775,7 +7785,12 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u

shouldCompress := compressOK && total > compressThreshold
elen := int(1 + 8 + 8 + total)
elen += (2 + 2 + 2 + 4) // Encoded lengths, 4bytes
elen += (2 + 2 + 2 + 4 + 8) // Encoded lengths, 4bytes, flags are up to 8 bytes

var flags uint64
if sourced {
flags |= msgFlagFromSourceOrMirror
}

buf := make([]byte, 1, elen)
buf[0] = byte(streamMsgOp)
Expand All @@ -7791,6 +7806,7 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u
buf = append(buf, hdr[:hlen]...)
buf = le.AppendUint32(buf, uint32(mlen))
buf = append(buf, msg[:mlen]...)
buf = binary.AppendUvarint(buf, flags)

// Check if we should compress.
if shouldCompress {
Expand Down Expand Up @@ -7904,7 +7920,7 @@ func (mset *stream) checkAllowMsgCompress(peers []string) {
const streamLagWarnThreshold = 10_000

// processClusteredInboundMsg will propose the inbound message to the underlying raft group.
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte, mt *msgTrace) (retErr error) {
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte, mt *msgTrace, sourced bool) (retErr error) {
// For possible error response.
var response []byte

Expand All @@ -7922,7 +7938,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// We also invoke this in clustering mode for message tracing when not
// performing message delivery.
if node == nil || mt.traceOnly() {
return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0, mt)
return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0, mt, sourced)
}

// If message tracing (with message delivery), we will need to send the
Expand Down Expand Up @@ -8050,7 +8066,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}

// TTL'd messages are rejected entirely if TTLs are not enabled on the stream.
if ttl, _ := getMessageTTL(hdr); ttl != 0 && !allowTTL {
if ttl, _ := getMessageTTL(hdr); !sourced && ttl != 0 && !allowTTL {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSMessageTTLDisabledError()
Expand Down Expand Up @@ -8190,7 +8206,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
}

esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), compressOK)
esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), sourced, compressOK)
var mtKey uint64
if mt != nil {
mtKey = mset.clseq
Expand Down Expand Up @@ -8711,7 +8727,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
}
}

subj, _, hdr, msg, seq, ts, err := decodeStreamMsg(mbuf)
subj, _, hdr, msg, seq, ts, _, err := decodeStreamMsg(mbuf)
if err != nil {
return 0, errCatchupBadMsg
}
Expand Down Expand Up @@ -9181,7 +9197,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
sendDR := func() {
if dr.Num == 1 {
// Send like a normal skip msg.
sendEM(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, dr.First, 0))
sendEM(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, dr.First, 0, false))
} else {
// We have a run, send a gap record. We send these without reply or tracking.
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, encodeDeleteRange(&dr))
Expand Down Expand Up @@ -9261,7 +9277,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
sendDR()
}
// Send the normal message now.
sendEM(encodeStreamMsgAllowCompress(sm.subj, _EMPTY_, sm.hdr, sm.msg, sm.seq, sm.ts, compressOk))
sendEM(encodeStreamMsgAllowCompress(sm.subj, _EMPTY_, sm.hdr, sm.msg, sm.seq, sm.ts, false, compressOk))
} else {
if drOk {
if dr.First == 0 {
Expand All @@ -9271,7 +9287,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
}
} else {
// Skip record for deleted msg.
sendEM(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq, 0))
sendEM(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq, 0, false))
}
}

Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6801,7 +6801,7 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, ts, err := decodeStreamMsg(msg.Data[1:])
subj, _, _, _, seq, ts, _, err := decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 5)
require_Equal(t, subj, _EMPTY_)
Expand Down Expand Up @@ -6856,7 +6856,7 @@ func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, _, err := decodeStreamMsg(msg.Data[1:])
subj, _, _, _, seq, _, _, err := decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 1)
require_Equal(t, subj, "foo")
Expand Down Expand Up @@ -6893,7 +6893,7 @@ func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) {
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, _, err = decodeStreamMsg(msg.Data[1:])
subj, _, _, _, seq, _, _, err = decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 1)
require_Equal(t, subj, "foo")
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7063,7 +7063,7 @@ func TestJetStreamClusterLeaderAbortsCatchupOnFollowerError(t *testing.T) {
}

// Now send a message with a wrong sequence and expect to receive an error.
em := encodeStreamMsg("foo", _EMPTY_, nil, []byte("fail"), 102, time.Now().UnixNano())
em := encodeStreamMsg("foo", _EMPTY_, nil, []byte("fail"), 102, time.Now().UnixNano(), false)
leader.sendInternalMsgLocked(sreqSubj, syncRepl.Subject, nil, em)
msg = natsNexMsg(t, syncRepl, time.Second)
if len(msg.Data) == 0 {
Expand Down
Loading

0 comments on commit 3616832

Please sign in to comment.