1
1
package gtshred
2
2
3
3
import (
4
+ "context"
4
5
"fmt"
5
6
"sync"
6
7
"time"
@@ -16,22 +17,37 @@ const (
16
17
maxBlockSize = 128 * 1024 * 1024 // 128MB maximum block size (matches Solana)
17
18
)
18
19
20
+ type ShredGroupWithTimestamp struct {
21
+ ShredGroup
22
+ Timestamp time.Time
23
+ }
24
+
19
25
type Processor struct {
20
- groups map [string ]* ShredGroup
21
- mu sync.Mutex
22
- cb ProcessorCallback
23
- completedBlocks map [string ]time.Time
26
+ // cb is the callback to call when a block is fully reassembled
27
+ cb ProcessorCallback
28
+
29
+ // groups is a cache of shred groups currently being processed.
30
+ groups map [string ]* ShredGroupWithTimestamp
31
+ groupsMu sync.RWMutex
32
+
33
+ // completedBlocks is a cache of block hashes that have been fully reassembled and should no longer be processed.
34
+ completedBlocks map [string ]time.Time
35
+ completedBlocksMu sync.RWMutex
36
+
37
+ // cleanupInterval is the interval at which stale groups are cleaned up and completed blocks are removed
24
38
cleanupInterval time.Duration
25
39
}
26
40
41
+ // ProcessorCallback is the interface for processor callbacks.
27
42
type ProcessorCallback interface {
28
43
ProcessBlock (height uint64 , blockHash []byte , block []byte ) error
29
44
}
30
45
31
- func NewProcessor (cb ProcessorCallback , cleanupInterval time.Duration ) * Processor {
46
+ // NewProcessor creates a new Processor with the given callback and cleanup interval.
47
+ func NewProcessor (ctx context.Context , cb ProcessorCallback , cleanupInterval time.Duration ) * Processor {
32
48
p := & Processor {
33
49
cb : cb ,
34
- groups : make (map [string ]* ShredGroup ),
50
+ groups : make (map [string ]* ShredGroupWithTimestamp ),
35
51
completedBlocks : make (map [string ]time.Time ),
36
52
cleanupInterval : cleanupInterval ,
37
53
}
@@ -43,6 +59,8 @@ func NewProcessor(cb ProcessorCallback, cleanupInterval time.Duration) *Processo
43
59
44
60
for {
45
61
select {
62
+ case <- ctx .Done ():
63
+ return
46
64
case now := <- ticker .C :
47
65
p .cleanupStaleGroups (now )
48
66
}
@@ -58,28 +76,42 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
58
76
return fmt .Errorf ("nil shred" )
59
77
}
60
78
79
+ p .completedBlocksMu .RLock ()
61
80
// Skip shreds from already processed blocks
62
- if _ , completed := p .completedBlocks [string (shred .BlockHash )]; completed {
81
+ _ , completed := p .completedBlocks [string (shred .BlockHash )]
82
+ p .completedBlocksMu .RUnlock ()
83
+ if completed {
63
84
return nil
64
85
}
65
86
66
- p . mu . Lock ()
67
- defer p . mu . Unlock ()
87
+ // Take read lock on groups to check if group exists, and get it if it does.
88
+ p . groupsMu . RLock ()
68
89
group , ok := p .groups [shred .GroupID ]
90
+ p .groupsMu .RUnlock ()
91
+
69
92
if ! ok {
70
- group := & ShredGroup {
71
- DataShreds : make ([]* gturbine.Shred , shred .TotalDataShreds ),
72
- RecoveryShreds : make ([]* gturbine.Shred , shred .TotalRecoveryShreds ),
73
- TotalDataShreds : shred .TotalDataShreds ,
74
- TotalRecoveryShreds : shred .TotalRecoveryShreds ,
75
- GroupID : shred .GroupID ,
76
- BlockHash : shred .BlockHash ,
77
- Height : shred .Height ,
78
- OriginalSize : shred .FullDataSize ,
93
+ // If the group doesn't exist, create it and add the shred
94
+ group := & ShredGroupWithTimestamp {
95
+ ShredGroup : ShredGroup {
96
+ DataShreds : make ([]* gturbine.Shred , shred .TotalDataShreds ),
97
+ RecoveryShreds : make ([]* gturbine.Shred , shred .TotalRecoveryShreds ),
98
+ TotalDataShreds : shred .TotalDataShreds ,
99
+ TotalRecoveryShreds : shred .TotalRecoveryShreds ,
100
+ GroupID : shred .GroupID ,
101
+ BlockHash : shred .BlockHash ,
102
+ Height : shred .Height ,
103
+ OriginalSize : shred .FullDataSize ,
104
+ },
105
+ Timestamp : time .Now (), // Record the time the group was created consumer side.
79
106
}
107
+
80
108
group .DataShreds [shred .Index ] = shred
81
109
110
+ // Take write lock to add the group
111
+ p .groupsMu .Lock ()
82
112
p .groups [shred .GroupID ] = group
113
+ p .groupsMu .Unlock ()
114
+
83
115
return nil
84
116
}
85
117
@@ -111,19 +143,51 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
111
143
}
112
144
113
145
func (p * Processor ) cleanupStaleGroups (now time.Time ) {
114
- p .mu .Lock ()
115
- defer p .mu .Unlock ()
146
+ var deleteHashes []string
116
147
148
+ p .completedBlocksMu .RLock ()
117
149
for hash , completedAt := range p .completedBlocks {
118
150
if now .Sub (completedAt ) > p .cleanupInterval {
151
+ deleteHashes = append (deleteHashes , hash )
152
+ }
153
+ }
154
+ p .completedBlocksMu .RUnlock ()
155
+
156
+ if len (deleteHashes ) != 0 {
157
+ // Take write lock once for all deletions
158
+ p .completedBlocksMu .Lock ()
159
+ for _ , hash := range deleteHashes {
119
160
delete (p .completedBlocks , hash )
120
- // Find and reset any groups with this block hash
121
- for id , group := range p .groups {
122
- if string (group .BlockHash ) == hash {
123
- group .Reset ()
124
- delete (p .groups , id )
125
- }
161
+ }
162
+ p .completedBlocksMu .Unlock ()
163
+ }
164
+
165
+ var deleteGroups []string
166
+
167
+ // Take read lock on groups to check for groups to delete (stale or duplicate blockhash)
168
+ p .groupsMu .RLock ()
169
+ for id , group := range p .groups {
170
+ for _ , hash := range deleteHashes {
171
+ // Check if group is associated with a completed block
172
+ if string (group .BlockHash ) == hash {
173
+ deleteGroups = append (deleteGroups , id )
126
174
}
127
175
}
176
+
177
+ // Check if group is stale
178
+ if now .Sub (group .Timestamp ) > p .cleanupInterval {
179
+ deleteGroups = append (deleteGroups , id )
180
+ }
181
+ }
182
+ p .groupsMu .RUnlock ()
183
+
184
+ if len (deleteGroups ) != 0 {
185
+ // Take write lock once for all deletions
186
+ p .groupsMu .Lock ()
187
+ for _ , id := range deleteGroups {
188
+ p .groups [id ].Reset () // TODO: is this necessary?
189
+ delete (p .groups , id )
190
+ }
191
+ p .groupsMu .Unlock ()
128
192
}
129
193
}
0 commit comments