Skip to content

Commit 4e912a9

Browse files
committed
services: improve Notary service locking
1. Allow parallel handling of notary requests with different main transaction. 2. Move expired request removal to PostBlock. Signed-off-by: Anna Shaleva <[email protected]>
1 parent c00829e commit 4e912a9

File tree

1 file changed

+42
-17
lines changed

1 file changed

+42
-17
lines changed

pkg/services/notary/notary.go

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ type (
5252
// started is a status bool to protect from double start/shutdown.
5353
started atomic.Bool
5454

55-
// reqMtx protects requests list.
55+
// reqMtx protects the request list from concurrent requests addition/removal.
56+
// Use per-request locks instead of this one to perform request-changing operations.
5657
reqMtx sync.RWMutex
5758
// requests represents a map of main transactions which needs to be completed
5859
// with the associated fallback transactions grouped by the main transaction hash
@@ -89,6 +90,7 @@ const defaultTxChannelCapacity = 100
8990
type (
9091
// request represents Notary service request.
9192
request struct {
93+
lock sync.RWMutex
9294
// isSent indicates whether the main transaction was successfully sent to the network.
9395
isSent bool
9496
main *transaction.Transaction
@@ -117,6 +119,7 @@ type (
117119
)
118120

119121
// isMainCompleted denotes whether all signatures for the main transaction were collected.
122+
// The caller is supposed to hold the request lock.
120123
func (r request) isMainCompleted() bool {
121124
if r.witnessInfo == nil {
122125
return false
@@ -254,12 +257,14 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
254257
zap.String("verification error", validationErr.Error()))
255258
}
256259
n.reqMtx.Lock()
257-
defer n.reqMtx.Unlock()
258260
r, exists := n.requests[payload.MainTransaction.Hash()]
259261
if exists {
262+
r.lock.Lock() // RLock doesn't fit here since we modify r.minNotValidBefore below.
260263
if slices.ContainsFunc(r.fallbacks, func(fb *transaction.Transaction) bool {
261264
return fb.Hash().Equals(payload.FallbackTransaction.Hash())
262265
}) {
266+
r.lock.Unlock()
267+
n.reqMtx.Unlock()
263268
return // then we already have processed this request
264269
}
265270
r.minNotValidBefore = min(r.minNotValidBefore, nvbFallback)
@@ -270,8 +275,10 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
270275
main: payload.MainTransaction.Copy(),
271276
minNotValidBefore: nvbFallback,
272277
}
278+
r.lock.Lock()
273279
n.requests[payload.MainTransaction.Hash()] = r
274280
}
281+
n.reqMtx.Unlock()
275282
if r.witnessInfo == nil && validationErr == nil {
276283
r.witnessInfo = newInfo
277284
}
@@ -282,8 +289,14 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
282289
// the copy.
283290
r.fallbacks = append(r.fallbacks, payload.FallbackTransaction.Copy())
284291
if exists && r.isMainCompleted() || validationErr != nil {
292+
r.lock.Unlock()
285293
return
286294
}
295+
296+
// @roman-khimov, I tried to work with r.copy() here in order not to hold r.lock during
297+
// witness verification. However, the race is possible in the end of this method if main
298+
// transaction will be completed by another thread, which leads to the situation when
299+
// two main transactions with different set of witnesses will be submitted.
287300
mainHash := hash.NetSha256(uint32(n.Network), r.main).BytesBE()
288301
for i, w := range payload.MainTransaction.Scripts {
289302
if len(w.InvocationScript) == 0 || // check that signature for this witness was provided
@@ -338,6 +351,7 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
338351
zap.Error(err))
339352
}
340353
}
354+
r.lock.Unlock()
341355
}
342356

343357
// OnRequestRemoval is a callback which is called after fallback transaction is removed
@@ -348,20 +362,20 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) {
348362
}
349363

350364
n.reqMtx.Lock()
351-
defer n.reqMtx.Unlock()
352365
r, ok := n.requests[pld.MainTransaction.Hash()]
366+
n.reqMtx.Unlock()
353367
if !ok {
354368
return
355369
}
370+
371+
r.lock.Lock()
356372
for i, fb := range r.fallbacks {
357373
if fb.Hash().Equals(pld.FallbackTransaction.Hash()) {
358374
r.fallbacks = append(r.fallbacks[:i], r.fallbacks[i+1:]...)
359375
break
360376
}
361377
}
362-
if len(r.fallbacks) == 0 {
363-
delete(n.requests, r.main.Hash())
364-
}
378+
r.lock.Unlock()
365379
}
366380

367381
// PostPersist is a callback which is called after a new block event is received.
@@ -376,15 +390,24 @@ func (n *Notary) PostPersist() {
376390
}
377391

378392
n.reqMtx.Lock()
393+
// @roman-khimov, an option is to copy n.requests in order not to hold lock over n.reqMtx for the whole cycle,
394+
// but we need to take a write lock over every request anyway since it's modified by n.finalize.
395+
// We also need to clean the list of requests with 0 fallbacks, this requires n.reqMtx to be taken.
379396
defer n.reqMtx.Unlock()
380397
currHeight := n.Config.Chain.BlockHeight()
381398
for h, r := range n.requests {
399+
r.lock.Lock()
400+
if len(r.fallbacks) == 0 {
401+
delete(n.requests, r.main.Hash())
402+
continue
403+
}
382404
if !r.isSent && r.isMainCompleted() && r.minNotValidBefore > currHeight {
383405
if err := n.finalize(acc, r.main, h); err != nil {
384406
n.Config.Log.Error("failed to finalize main transaction after PostPersist, waiting for the next block to retry",
385407
zap.String("hash", r.main.Hash().StringLE()),
386408
zap.Error(err))
387409
}
410+
r.lock.Unlock()
388411
continue
389412
}
390413
if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent.
@@ -400,6 +423,7 @@ func (n *Notary) PostPersist() {
400423
}
401424
}
402425
}
426+
r.lock.Unlock()
403427
}
404428
}
405429

@@ -448,33 +472,37 @@ func (n *Notary) newTxCallbackLoop() {
448472
case tx := <-n.newTxs:
449473
isMain := tx.tx.Hash() == tx.mainHash
450474

451-
n.reqMtx.Lock()
475+
n.reqMtx.RLock()
452476
r, ok := n.requests[tx.mainHash]
453-
if !ok || isMain && (r.isSent || r.minNotValidBefore <= n.Config.Chain.BlockHeight()) {
454-
n.reqMtx.Unlock()
477+
n.reqMtx.RUnlock()
478+
if !ok {
455479
continue
456480
}
481+
r.lock.Lock()
482+
if isMain && (r.isSent || r.minNotValidBefore <= n.Config.Chain.BlockHeight()) {
483+
r.lock.Unlock()
484+
continue
485+
}
486+
457487
if !isMain {
458488
// Ensure that fallback was not already completed.
459489
var isPending = slices.ContainsFunc(r.fallbacks, func(fb *transaction.Transaction) bool {
460490
return fb.Hash() == tx.tx.Hash()
461491
})
462492
if !isPending {
463-
n.reqMtx.Unlock()
493+
r.lock.Unlock()
464494
continue
465495
}
466496
}
467497

468-
n.reqMtx.Unlock()
469498
err := n.onTransaction(tx.tx)
470499
if err != nil {
471500
n.Config.Log.Error("new transaction callback finished with error",
472501
zap.Error(err),
473502
zap.Bool("is main", isMain))
503+
r.lock.Unlock()
474504
continue
475505
}
476-
477-
n.reqMtx.Lock()
478506
if isMain {
479507
r.isSent = true
480508
} else {
@@ -484,11 +512,8 @@ func (n *Notary) newTxCallbackLoop() {
484512
break
485513
}
486514
}
487-
if len(r.fallbacks) == 0 {
488-
delete(n.requests, tx.mainHash)
489-
}
490515
}
491-
n.reqMtx.Unlock()
516+
r.lock.Unlock()
492517
case <-n.stopCh:
493518
return
494519
}

0 commit comments

Comments
 (0)