diff --git a/embedded/tbtree/snapshot.go b/embedded/tbtree/snapshot.go index e74e111811..ef8974d8d0 100644 --- a/embedded/tbtree/snapshot.go +++ b/embedded/tbtree/snapshot.go @@ -442,8 +442,15 @@ func (n *innerNode) writeTo(nw, hw io.Writer, writeOpts *WriteOpts, buf []byte) buf[bi] = InnerNodeType bi++ - binary.BigEndian.PutUint16(buf[bi:], uint16(len(n.nodes))) - bi += 2 + if n.tsMutated() { + binary.BigEndian.PutUint16(buf[bi:], uint16(len(n.nodes)+1)) + bi += 2 + + bi += writeNodeRefToWithOffset(n, math.MaxInt64, math.MaxInt64, buf[bi:]) + } else { + binary.BigEndian.PutUint16(buf[bi:], uint16(len(n.nodes))) + bi += 2 + } for i, c := range n.nodes { n := writeNodeRefToWithOffset(c, offsets[i], minOffsets[i], buf[bi:]) @@ -505,11 +512,23 @@ func (l *leafNode) writeTo(nw, hw io.Writer, writeOpts *WriteOpts, buf []byte) ( buf[bi] = LeafNodeType bi++ - binary.BigEndian.PutUint16(buf[bi:], uint16(len(l.values))) - bi += 2 + if l.tsMutated() { + // NOTE: we store a marker entry to remember the highest timestamp seen - accH := int64(0) + binary.BigEndian.PutUint16(buf[bi:], uint16(len(l.values)+1)) + bi += 2 + binary.BigEndian.PutUint16(buf[bi:], 0) + bi += 2 + + binary.BigEndian.PutUint64(buf[bi:], l._ts) + bi += 8 + } else { + binary.BigEndian.PutUint16(buf[bi:], uint16(len(l.values))) + bi += 2 + } + + accH := int64(0) for _, v := range l.values { timedValue := v.timedValues[0] diff --git a/embedded/tbtree/tbtree.go b/embedded/tbtree/tbtree.go index 8fb731a479..6fc66bec90 100644 --- a/embedded/tbtree/tbtree.go +++ b/embedded/tbtree/tbtree.go @@ -240,6 +240,7 @@ type node interface { minKey() []byte ts() uint64 setTs(ts uint64) (node, error) + tsMutated() bool size() (int, error) mutated() bool offset() int64 // only valid when !mutated() @@ -825,7 +826,6 @@ func (t *TBtree) readNodeFrom(r *appendable.Reader) (node, error) { n.off = off return n, nil } - return nil, ErrReadingFileContent } @@ -837,7 +837,7 @@ func (t *TBtree) readInnerNodeFrom(r *appendable.Reader) (*innerNode, error) { n := &innerNode{ t: t, - nodes: make([]node, childCount), + nodes: make([]node, 0, childCount), _minOff: math.MaxInt64, } @@ -847,7 +847,13 @@ func (t *TBtree) readInnerNodeFrom(r *appendable.Reader) (*innerNode, error) { return nil, err } - n.nodes[c] = nref + // marker node + if nref.off == math.MaxInt64 { + n._ts = nref.ts() + continue + } + + n.nodes = append(n.nodes, nref) if n._ts < nref._ts { n._ts = nref._ts @@ -857,7 +863,6 @@ func (t *TBtree) readInnerNodeFrom(r *appendable.Reader) (*innerNode, error) { n._minOff = nref._minOff } } - return n, nil } @@ -905,7 +910,7 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) { l := &leafNode{ t: t, - values: make([]*leafValue, valueCount), + values: make([]*leafValue, 0, valueCount), } for c := 0; c < int(valueCount); c++ { @@ -914,6 +919,16 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) { return nil, err } + if ksize == 0 { + ts, err := r.ReadUint64() + if err != nil { + return nil, err + } + + l._ts = ts + continue + } + key := make([]byte, ksize) _, err = r.Read(key) if err != nil { @@ -953,13 +968,12 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) { hCount: hCount, } - l.values[c] = leafValue + l.values = append(l.values, leafValue) if l._ts < ts { l._ts = ts } } - return l, nil } @@ -1607,7 +1621,6 @@ func (t *TBtree) IncreaseTs(ts uint64) error { _, _, err := t.flushTree(t.cleanupPercentage, false, false, "increaseTs") return err } - return nil } @@ -2097,6 +2110,15 @@ func (n *innerNode) size() (int, error) { return size, nil } +func (l *innerNode) tsMutated() bool { + for _, nd := range l.nodes { + if nd.ts() >= l.ts() { + return false + } + } + return true +} + func (n *innerNode) mutated() bool { return n.mut } @@ -2265,6 +2287,10 @@ func (r *nodeRef) size() (int, error) { return n.size() } +func (r *nodeRef) tsMutated() bool { + return false +} + func (r *nodeRef) mutated() bool { return false } @@ -2600,6 +2626,15 @@ func (l *leafNode) size() (int, error) { return size, nil } +func (l *leafNode) tsMutated() bool { + for _, v := range l.values { + if v.timedValues[0].Ts >= l.ts() { + return false + } + } + return true +} + func (l *leafNode) mutated() bool { return l.mut } diff --git a/embedded/tbtree/tbtree_test.go b/embedded/tbtree/tbtree_test.go index ecd263c555..82449e6068 100644 --- a/embedded/tbtree/tbtree_test.go +++ b/embedded/tbtree/tbtree_test.go @@ -1314,6 +1314,100 @@ func TestTBTreeIncreaseTs(t *testing.T) { require.ErrorIs(t, err, ErrAlreadyClosed) } +func TestTBTreeFlushAfterIncreaseTs(t *testing.T) { + dir := t.TempDir() + t.Cleanup(func() { + os.RemoveAll(dir) + }) + + tbtree, err := Open(dir, DefaultOptions()) + require.NoError(t, err) + + t.Run("increase ts to empty tree", func(t *testing.T) { + err = tbtree.IncreaseTs(100) + require.NoError(t, err) + + _, _, err = tbtree.Flush() + require.NoError(t, err) + + err = tbtree.Close() + require.NoError(t, err) + + tbtree, err = Open(dir, DefaultOptions()) + require.NoError(t, err) + + require.Equal(t, tbtree.Ts(), uint64(100)) + }) + + insertValues := func(n int) { + for i := 0; i < n; i++ { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], uint32(i)) + + err := tbtree.Insert( + buf[:], + buf[:], + ) + require.NoError(t, err) + } + } + + t.Run("increase ts after insertions", func(t *testing.T) { + insertValues(10) + require.Equal(t, uint64(110), tbtree.Ts()) + + err := tbtree.IncreaseTs(200) + require.NoError(t, err) + + _, _, err = tbtree.Flush() + require.NoError(t, err) + + err = tbtree.Close() + require.NoError(t, err) + + tbtree, err = Open(dir, DefaultOptions()) + require.NoError(t, err) + + require.Equal(t, uint64(200), tbtree.Ts()) + + ln, isLeaf := tbtree.root.(*leafNode) + require.True(t, isLeaf) + + require.Equal(t, uint64(200), ln.ts()) + require.Len(t, ln.values, 10) + + for _, v := range ln.values { + require.Less(t, v.timedValue().Ts, ln.ts()) + } + }) + + t.Run("increase ts after more insertions", func(t *testing.T) { + insertValues(1000) + require.Equal(t, uint64(1200), tbtree.Ts()) + + err := tbtree.IncreaseTs(2500) + require.NoError(t, err) + + _, _, err = tbtree.Flush() + require.NoError(t, err) + + err = tbtree.Close() + require.NoError(t, err) + + tbtree, err = Open(dir, DefaultOptions()) + require.NoError(t, err) + + require.Equal(t, uint64(2500), tbtree.Ts()) + + nd, isInner := tbtree.root.(*innerNode) + require.True(t, isInner) + + for _, child := range nd.nodes { + require.Less(t, child.ts(), nd.ts()) + } + }) +} + func BenchmarkRandomInsertion(b *testing.B) { seed := rand.NewSource(time.Now().UnixNano()) rnd := rand.New(seed)