Skip to content

Commit 0d28540

Browse files
committed
expire stale groups. use rwmutexes where appropriate
1 parent 902e5e4 commit 0d28540

6 files changed

+120
-42
lines changed

gturbine/gtshred/benchmark_test.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gtshred
22

33
import (
4+
"context"
45
"crypto/rand"
56
"testing"
67
"time"
@@ -18,10 +19,10 @@ func BenchmarkShredProcessing(b *testing.B) {
1819
size int
1920
chunkSize uint32
2021
}{
21-
{"8MB", 8 << 20, 1 << 18}, // 256KB chunks
22-
{"16MB", 16 << 20, 1 << 19}, // 512KB chunks
23-
{"32MB", 32 << 20, 1 << 20}, // 1MB chunks
24-
{"64MB", 64 << 20, 1 << 21}, // 2MB chunks
22+
{"8MB", 8 << 20, 1 << 18}, // 256KB chunks
23+
{"16MB", 16 << 20, 1 << 19}, // 512KB chunks
24+
{"32MB", 32 << 20, 1 << 20}, // 1MB chunks
25+
{"64MB", 64 << 20, 1 << 21}, // 2MB chunks
2526
{"128MB", 128 << 20, 1 << 22}, // 4MB chunks
2627
}
2728

@@ -35,7 +36,7 @@ func BenchmarkShredProcessing(b *testing.B) {
3536
}
3637

3738
// Create processor with noop callback
38-
p := NewProcessor(&noopCallback{}, time.Minute)
39+
p := NewProcessor(context.Background(), &noopCallback{}, time.Minute)
3940

4041
// Reset timer before main benchmark loop
4142
b.ResetTimer()
@@ -56,7 +57,7 @@ func BenchmarkShredProcessing(b *testing.B) {
5657

5758
b.StopTimer()
5859
// Reset processor state between iterations
59-
p.groups = make(map[string]*ShredGroup)
60+
p.groups = make(map[string]*ShredGroupWithTimestamp)
6061
p.completedBlocks = make(map[string]time.Time)
6162
b.StartTimer()
6263
}
@@ -87,7 +88,7 @@ func BenchmarkShredReconstruction(b *testing.B) {
8788
for _, pattern := range patterns {
8889
b.Run(pattern.name, func(b *testing.B) {
8990
// Create processor
90-
p := NewProcessor(&noopCallback{}, time.Minute)
91+
p := NewProcessor(context.Background(), &noopCallback{}, time.Minute)
9192

9293
b.ResetTimer()
9394

@@ -121,10 +122,10 @@ func BenchmarkShredReconstruction(b *testing.B) {
121122
}
122123

123124
b.StopTimer()
124-
p.groups = make(map[string]*ShredGroup)
125+
p.groups = make(map[string]*ShredGroupWithTimestamp)
125126
p.completedBlocks = make(map[string]time.Time)
126127
b.StartTimer()
127128
}
128129
})
129130
}
130-
}
131+
}

gturbine/gtshred/process_shred_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gtshred
22

33
import (
44
"bytes"
5+
"context"
56
"crypto/rand"
67
"crypto/sha256"
78
"testing"
@@ -85,7 +86,7 @@ func TestProcessorShredding(t *testing.T) {
8586
t.Run(tc.name, func(t *testing.T) {
8687
var cb = new(testProcessorCallback)
8788

88-
p := NewProcessor(cb, time.Minute)
89+
p := NewProcessor(context.Background(), cb, time.Minute)
8990

9091
block := makeRandomBlock(tc.blockSize)
9192
group, err := NewShredGroup(block, TestHeight, DefaultDataShreds, DefaultRecoveryShreds, DefaultChunkSize)

gturbine/gtshred/processor.go

+91-26
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gtshred
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67
"time"
@@ -16,22 +17,38 @@ const (
1617
maxBlockSize = 128 * 1024 * 1024 // 128MB maximum block size (matches Solana)
1718
)
1819

20+
// ShredGroupWithTimestamp is a ShredGroup with a timestamp for tracking when the group was created (when the first shred was received).
21+
type ShredGroupWithTimestamp struct {
22+
*ShredGroup
23+
Timestamp time.Time
24+
}
25+
1926
type Processor struct {
20-
groups map[string]*ShredGroup
21-
mu sync.Mutex
22-
cb ProcessorCallback
23-
completedBlocks map[string]time.Time
27+
// cb is the callback to call when a block is fully reassembled
28+
cb ProcessorCallback
29+
30+
// groups is a cache of shred groups currently being processed.
31+
groups map[string]*ShredGroupWithTimestamp
32+
groupsMu sync.RWMutex
33+
34+
// completedBlocks is a cache of block hashes that have been fully reassembled and should no longer be processed.
35+
completedBlocks map[string]time.Time
36+
completedBlocksMu sync.RWMutex
37+
38+
// cleanupInterval is the interval at which stale groups are cleaned up and completed blocks are removed
2439
cleanupInterval time.Duration
2540
}
2641

42+
// ProcessorCallback is the interface for processor callbacks.
2743
type ProcessorCallback interface {
2844
ProcessBlock(height uint64, blockHash []byte, block []byte) error
2945
}
3046

31-
func NewProcessor(cb ProcessorCallback, cleanupInterval time.Duration) *Processor {
47+
// NewProcessor creates a new Processor with the given callback and cleanup interval.
48+
func NewProcessor(ctx context.Context, cb ProcessorCallback, cleanupInterval time.Duration) *Processor {
3249
p := &Processor{
3350
cb: cb,
34-
groups: make(map[string]*ShredGroup),
51+
groups: make(map[string]*ShredGroupWithTimestamp),
3552
completedBlocks: make(map[string]time.Time),
3653
cleanupInterval: cleanupInterval,
3754
}
@@ -43,6 +60,8 @@ func NewProcessor(cb ProcessorCallback, cleanupInterval time.Duration) *Processo
4360

4461
for {
4562
select {
63+
case <-ctx.Done():
64+
return
4665
case now := <-ticker.C:
4766
p.cleanupStaleGroups(now)
4867
}
@@ -58,28 +77,42 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
5877
return fmt.Errorf("nil shred")
5978
}
6079

80+
p.completedBlocksMu.RLock()
6181
// Skip shreds from already processed blocks
62-
if _, completed := p.completedBlocks[string(shred.BlockHash)]; completed {
82+
_, completed := p.completedBlocks[string(shred.BlockHash)]
83+
p.completedBlocksMu.RUnlock()
84+
if completed {
6385
return nil
6486
}
6587

66-
p.mu.Lock()
67-
defer p.mu.Unlock()
88+
// Take read lock on groups to check if group exists, and get it if it does.
89+
p.groupsMu.RLock()
6890
group, ok := p.groups[shred.GroupID]
91+
p.groupsMu.RUnlock()
92+
6993
if !ok {
70-
group := &ShredGroup{
71-
DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds),
72-
RecoveryShreds: make([]*gturbine.Shred, shred.TotalRecoveryShreds),
73-
TotalDataShreds: shred.TotalDataShreds,
74-
TotalRecoveryShreds: shred.TotalRecoveryShreds,
75-
GroupID: shred.GroupID,
76-
BlockHash: shred.BlockHash,
77-
Height: shred.Height,
78-
OriginalSize: shred.FullDataSize,
94+
// If the group doesn't exist, create it and add the shred
95+
group := &ShredGroupWithTimestamp{
96+
ShredGroup: &ShredGroup{
97+
DataShreds: make([]*gturbine.Shred, shred.TotalDataShreds),
98+
RecoveryShreds: make([]*gturbine.Shred, shred.TotalRecoveryShreds),
99+
TotalDataShreds: shred.TotalDataShreds,
100+
TotalRecoveryShreds: shred.TotalRecoveryShreds,
101+
GroupID: shred.GroupID,
102+
BlockHash: shred.BlockHash,
103+
Height: shred.Height,
104+
OriginalSize: shred.FullDataSize,
105+
},
106+
Timestamp: time.Now(), // Record the time the group was created consumer side.
79107
}
108+
80109
group.DataShreds[shred.Index] = shred
81110

111+
// Take write lock to add the group
112+
p.groupsMu.Lock()
82113
p.groups[shred.GroupID] = group
114+
p.groupsMu.Unlock()
115+
83116
return nil
84117
}
85118

@@ -111,19 +144,51 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
111144
}
112145

113146
func (p *Processor) cleanupStaleGroups(now time.Time) {
114-
p.mu.Lock()
115-
defer p.mu.Unlock()
147+
var deleteHashes []string
116148

149+
p.completedBlocksMu.RLock()
117150
for hash, completedAt := range p.completedBlocks {
118151
if now.Sub(completedAt) > p.cleanupInterval {
152+
deleteHashes = append(deleteHashes, hash)
153+
}
154+
}
155+
p.completedBlocksMu.RUnlock()
156+
157+
if len(deleteHashes) != 0 {
158+
// Take write lock once for all deletions
159+
p.completedBlocksMu.Lock()
160+
for _, hash := range deleteHashes {
119161
delete(p.completedBlocks, hash)
120-
// Find and reset any groups with this block hash
121-
for id, group := range p.groups {
122-
if string(group.BlockHash) == hash {
123-
group.Reset()
124-
delete(p.groups, id)
125-
}
162+
}
163+
p.completedBlocksMu.Unlock()
164+
}
165+
166+
var deleteGroups []string
167+
168+
// Take read lock on groups to check for groups to delete (stale or duplicate blockhash)
169+
p.groupsMu.RLock()
170+
for id, group := range p.groups {
171+
for _, hash := range deleteHashes {
172+
// Check if group is associated with a completed block
173+
if string(group.BlockHash) == hash {
174+
deleteGroups = append(deleteGroups, id)
126175
}
127176
}
177+
178+
// Check if group is stale
179+
if now.Sub(group.Timestamp) > p.cleanupInterval {
180+
deleteGroups = append(deleteGroups, id)
181+
}
182+
}
183+
p.groupsMu.RUnlock()
184+
185+
if len(deleteGroups) != 0 {
186+
// Take write lock once for all deletions
187+
p.groupsMu.Lock()
188+
for _, id := range deleteGroups {
189+
p.groups[id].Reset() // TODO: is this necessary?
190+
delete(p.groups, id)
191+
}
192+
p.groupsMu.Unlock()
128193
}
129194
}

gturbine/gtshred/processor_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gtshred
22

33
import (
4+
"context"
45
"testing"
56
"time"
67
)
@@ -9,7 +10,7 @@ func TestProcessorMemoryCleanup(t *testing.T) {
910
// Create processor with short cleanup interval for testing
1011
var cb = new(testProcessorCallback)
1112
cleanupInterval := 100 * time.Millisecond
12-
p := NewProcessor(cb, cleanupInterval)
13+
p := NewProcessor(context.Background(), cb, cleanupInterval)
1314

1415
// Create a test block and shred group
1516
block := []byte("test block data")

gturbine/gtshred/shred_group.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gtshred
33
import (
44
"crypto/sha256"
55
"fmt"
6+
"sync"
67

78
"github.com/google/uuid"
89
"github.com/gordian-engine/gordian/gturbine"
@@ -19,6 +20,8 @@ type ShredGroup struct {
1920
BlockHash []byte
2021
Height uint64 // Added to struct level
2122
OriginalSize int
23+
24+
mu sync.Mutex
2225
}
2326

2427
// NewShredGroup creates a new ShredGroup from a block of data
@@ -116,15 +119,15 @@ func NewShredGroup(block []byte, height uint64, dataShreds, recoveryShreds int,
116119
return group, nil
117120
}
118121

119-
// IsFull checks if enough shreds are available for reconstruction
120-
// NOTE: we'd like shredgroup to know the data threshold as a property on the shredgroup
121-
func (g *ShredGroup) IsFull() bool {
122+
// isFull checks if enough shreds are available to attempt reconstruction.
123+
func (g *ShredGroup) isFull() bool {
122124
valid := 0
123125
for _, s := range g.DataShreds {
124126
if s != nil {
125127
valid++
126128
}
127129
}
130+
128131
for _, s := range g.RecoveryShreds {
129132
if s != nil {
130133
valid++
@@ -136,6 +139,8 @@ func (g *ShredGroup) IsFull() bool {
136139

137140
// ReconstructBlock attempts to reconstruct the original block from available shreds
138141
func (g *ShredGroup) ReconstructBlock(encoder *gtencoding.Encoder) ([]byte, error) {
142+
g.mu.Lock()
143+
defer g.mu.Unlock()
139144

140145
// Extract data bytes for erasure coding
141146
allBytes := make([][]byte, len(g.DataShreds)+len(g.RecoveryShreds))
@@ -178,6 +183,7 @@ func (g *ShredGroup) ReconstructBlock(encoder *gtencoding.Encoder) ([]byte, erro
178183
}
179184

180185
// Verify reconstructed block hash
186+
// TODO hasher should be interface.
181187
computedHash := sha256.Sum256(reconstructed)
182188
if string(computedHash[:]) != string(g.BlockHash) {
183189
return nil, fmt.Errorf("block hash mismatch after reconstruction")
@@ -203,6 +209,9 @@ func (g *ShredGroup) CollectShred(shred *gturbine.Shred) (bool, error) {
203209
return false, fmt.Errorf("block hash mismatch")
204210
}
205211

212+
g.mu.Lock()
213+
defer g.mu.Unlock()
214+
206215
switch shred.Type {
207216
case gturbine.DataShred:
208217
// Validate shred index
@@ -222,7 +231,7 @@ func (g *ShredGroup) CollectShred(shred *gturbine.Shred) (bool, error) {
222231
return false, fmt.Errorf("invalid shred type: %d", shred.Type)
223232
}
224233

225-
return g.IsFull(), nil
234+
return g.isFull(), nil
226235
}
227236

228237
// Reset clears the ShredGroup data while maintaining allocated memory

gturbine/turbine_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gturbine_test
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"net"
78
"sync"
@@ -65,7 +66,7 @@ func newTestNode(t *testing.T, basePort int) *testNode {
6566

6667
cb := &testBlockHandler{}
6768

68-
processor := gtshred.NewProcessor(cb, time.Minute)
69+
processor := gtshred.NewProcessor(context.Background(), cb, time.Minute)
6970

7071
shredHandler := &testShredHandler{}
7172
node := &testNode{

0 commit comments

Comments
 (0)