-
Notifications
You must be signed in to change notification settings - Fork 188
/
Copy pathcore_test.go
845 lines (699 loc) · 33 KB
/
core_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
package sealing
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/consensus/approvals"
"github.com/onflow/flow-go/engine/consensus/approvals/tracker"
"github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/flow"
realmodule "github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
module "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/module/trace"
"github.com/onflow/flow-go/module/updatable_configs"
mockstate "github.com/onflow/flow-go/state/protocol/mock"
storage "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/utils/unittest"
)
// TestApprovalProcessingCore performs testing of approval processing core
// Core is responsible for delegating processing to assignment collectorTree for each separate execution result
// Core performs height based checks and decides if approval or incorporated result has to be processed at all
// or rejected as outdated or unverifiable.
// Core maintains a LRU cache of known approvals that cannot be verified at the moment/
func TestApprovalProcessingCore(t *testing.T) {
suite.Run(t, new(ApprovalProcessingCoreTestSuite))
}
// RequiredApprovalsForSealConstructionTestingValue defines the number of approvals that are
// required to construct a seal for testing purposes. Thereby, the default production value
// can be set independently without changing test behaviour.
const RequiredApprovalsForSealConstructionTestingValue = 1
type ApprovalProcessingCoreTestSuite struct {
approvals.BaseAssignmentCollectorTestSuite
sealsDB *storage.Seals
rootHeader *flow.Header
core *Core
setter realmodule.SealingConfigsSetter
}
func (s *ApprovalProcessingCoreTestSuite) TearDownTest() {
s.BaseAssignmentCollectorTestSuite.TearDownTest()
}
func (s *ApprovalProcessingCoreTestSuite) SetupTest() {
s.BaseAssignmentCollectorTestSuite.SetupTest()
s.sealsDB = &storage.Seals{}
s.rootHeader = unittest.GenesisFixture().Header
params := new(mockstate.Params)
s.State.On("Sealed").Return(unittest.StateSnapshotForKnownBlock(s.ParentBlock, nil)).Maybe()
s.State.On("Params").Return(params)
params.On("FinalizedRoot").Return(
func() *flow.Header { return s.rootHeader },
func() error { return nil },
)
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
setter := unittest.NewSealingConfigs(flow.DefaultChunkAssignmentAlpha)
var err error
s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter)
require.NoError(s.T(), err)
s.setter = setter
}
// TestOnBlockFinalized_RejectOutdatedApprovals tests that approvals will be rejected as outdated
// for block that is already sealed
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectOutdatedApprovals() {
approval := unittest.ResultApprovalFixture(unittest.WithApproverID(s.VerID),
unittest.WithChunk(s.Chunks[0].Index),
unittest.WithBlockID(s.Block.ID()))
err := s.core.processApproval(approval)
require.NoError(s.T(), err)
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.Block))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
err = s.core.ProcessFinalizedBlock(s.Block.ID())
require.NoError(s.T(), err)
err = s.core.processApproval(approval)
require.Error(s.T(), err)
require.True(s.T(), engine.IsOutdatedInputError(err))
}
// TestOnBlockFinalized_RejectOutdatedExecutionResult tests that incorporated result will be rejected as outdated
// if the block which is targeted by execution result is already sealed.
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectOutdatedExecutionResult() {
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.Block))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
err := s.core.ProcessFinalizedBlock(s.Block.ID())
require.NoError(s.T(), err)
err = s.core.processIncorporatedResult(s.IncorporatedResult)
require.Error(s.T(), err)
require.True(s.T(), engine.IsOutdatedInputError(err))
}
// TestOnBlockFinalized_RejectUnverifiableEntries tests that core will reject both execution results
// and approvals for blocks that we have no information about.
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectUnverifiableEntries() {
s.IncorporatedResult.Result.BlockID = unittest.IdentifierFixture() // replace blockID with random one
err := s.core.processIncorporatedResult(s.IncorporatedResult)
require.Error(s.T(), err)
require.True(s.T(), engine.IsUnverifiableInputError(err))
approval := unittest.ResultApprovalFixture(unittest.WithApproverID(s.VerID),
unittest.WithChunk(s.Chunks[0].Index))
err = s.core.processApproval(approval)
require.Error(s.T(), err)
require.True(s.T(), engine.IsUnverifiableInputError(err))
}
// TestOnBlockFinalized_RejectOrphanIncorporatedResults tests that execution results incorporated in orphan blocks
// are rejected as outdated in next situation
//
// A <- B_1
// <- B_2
//
// B_1 is finalized rendering B_2 as orphan, submitting IR[ER[A], B_1] is a success, submitting IR[ER[A], B_2] is an outdated incorporated result
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectOrphanIncorporatedResults() {
blockB1 := unittest.BlockHeaderWithParentFixture(s.Block)
blockB2 := unittest.BlockHeaderWithParentFixture(s.Block)
s.Blocks[blockB1.ID()] = blockB1
s.Blocks[blockB2.ID()] = blockB2
IR1 := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithIncorporatedBlockID(blockB1.ID()),
unittest.IncorporatedResult.WithResult(s.IncorporatedResult.Result))
IR2 := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithIncorporatedBlockID(blockB2.ID()),
unittest.IncorporatedResult.WithResult(s.IncorporatedResult.Result))
s.MarkFinalized(blockB1)
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.ParentBlock))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
// blockB1 becomes finalized
err := s.core.ProcessFinalizedBlock(blockB1.ID())
require.NoError(s.T(), err)
err = s.core.processIncorporatedResult(IR1)
require.NoError(s.T(), err)
err = s.core.processIncorporatedResult(IR2)
require.Error(s.T(), err)
require.True(s.T(), engine.IsOutdatedInputError(err))
}
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectOldFinalizedBlock() {
blockB1 := unittest.BlockHeaderWithParentFixture(s.Block)
blockB2 := unittest.BlockHeaderWithParentFixture(blockB1)
s.Blocks[blockB1.ID()] = blockB1
s.Blocks[blockB2.ID()] = blockB2
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.Block))
// should only call it once
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
s.MarkFinalized(blockB1)
s.MarkFinalized(blockB2)
// blockB1 becomes finalized
err := s.core.ProcessFinalizedBlock(blockB2.ID())
require.NoError(s.T(), err)
err = s.core.ProcessFinalizedBlock(blockB1.ID())
require.NoError(s.T(), err)
}
// TestProcessFinalizedBlock_CollectorsCleanup tests that stale collectorTree are cleaned up for
// already sealed blocks.
func (s *ApprovalProcessingCoreTestSuite) TestProcessFinalizedBlock_CollectorsCleanup() {
blockID := s.Block.ID()
numResults := uint(10)
for i := uint(0); i < numResults; i++ {
// all results incorporated in different blocks
incorporatedBlock := unittest.BlockHeaderWithParentFixture(s.IncorporatedBlock)
s.Blocks[incorporatedBlock.ID()] = incorporatedBlock
// create different incorporated results for same block ID
result := unittest.ExecutionResultFixture()
result.BlockID = blockID
result.PreviousResultID = s.IncorporatedResult.Result.ID()
incorporatedResult := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithResult(result),
unittest.IncorporatedResult.WithIncorporatedBlockID(incorporatedBlock.ID()))
err := s.core.processIncorporatedResult(incorporatedResult)
require.NoError(s.T(), err)
}
require.Equal(s.T(), uint64(numResults), s.core.collectorTree.GetSize())
candidate := unittest.BlockHeaderWithParentFixture(s.Block)
s.Blocks[candidate.ID()] = candidate
// candidate becomes new sealed and finalized block, it means that
// we will need to cleanup our tree till new height, removing all outdated collectors
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(candidate))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
s.MarkFinalized(candidate)
err := s.core.ProcessFinalizedBlock(candidate.ID())
require.NoError(s.T(), err)
require.Equal(s.T(), uint64(0), s.core.collectorTree.GetSize())
}
// TestProcessIncorporated_ApprovalsBeforeResult tests a scenario when first we have received approvals for unknown
// execution result and after that we discovered execution result. In this scenario we should be able
// to create a seal right after discovering execution result since all approvals should be cached.(if cache capacity is big enough)
func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ApprovalsBeforeResult() {
s.PublicKey.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
for _, chunk := range s.Chunks {
for verID := range s.AuthorizedVerifiers {
approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index),
unittest.WithApproverID(verID),
unittest.WithBlockID(s.Block.ID()),
unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID()))
err := s.core.processApproval(approval)
require.NoError(s.T(), err)
}
}
s.SealsPL.On("Add", mock.Anything).Return(true, nil).Once()
err := s.core.processIncorporatedResult(s.IncorporatedResult)
require.NoError(s.T(), err)
s.SealsPL.AssertCalled(s.T(), "Add", mock.Anything)
}
// TestProcessIncorporated_ApprovalsAfterResult tests a scenario when first we have discovered execution result
// and after that we started receiving approvals. In this scenario we should be able to create a seal right
// after processing last needed approval to meet `RequiredApprovalsForSealConstruction` threshold.
func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ApprovalsAfterResult() {
s.PublicKey.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
s.SealsPL.On("Add", mock.Anything).Return(true, nil).Once()
err := s.core.processIncorporatedResult(s.IncorporatedResult)
require.NoError(s.T(), err)
for _, chunk := range s.Chunks {
for verID := range s.AuthorizedVerifiers {
approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index),
unittest.WithApproverID(verID),
unittest.WithBlockID(s.Block.ID()),
unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID()))
err := s.core.processApproval(approval)
require.NoError(s.T(), err)
}
}
s.SealsPL.AssertCalled(s.T(), "Add", mock.Anything)
}
// TestProcessIncorporated_ProcessingInvalidApproval tests that processing invalid approval when result is discovered
// is correctly handled in case of sentinel error
func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ProcessingInvalidApproval() {
// fail signature verification for first approval
s.PublicKey.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(false, nil).Once()
// generate approvals for first chunk
approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index),
unittest.WithApproverID(s.VerID),
unittest.WithBlockID(s.Block.ID()),
unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID()))
// this approval has to be cached since execution result is not known yet
err := s.core.processApproval(approval)
require.NoError(s.T(), err)
// at this point approval has to be processed, even if it's invalid
// if it's an expected sentinel error, it has to be handled internally
err = s.core.processIncorporatedResult(s.IncorporatedResult)
require.NoError(s.T(), err)
}
// TestProcessIncorporated_ApprovalVerificationException tests that processing invalid approval when result is discovered
// is correctly handled in case of exception
func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ApprovalVerificationException() {
// fail signature verification with exception
s.PublicKey.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(false, fmt.Errorf("exception")).Once()
// generate approvals for first chunk
approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index),
unittest.WithApproverID(s.VerID),
unittest.WithBlockID(s.Block.ID()),
unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID()))
// this approval has to be cached since execution result is not known yet
err := s.core.processApproval(approval)
require.NoError(s.T(), err)
// at this point approval has to be processed, even if it's invalid
// if it's an expected sentinel error, it has to be handled internally
err = s.core.processIncorporatedResult(s.IncorporatedResult)
require.Error(s.T(), err)
}
// TestOnBlockFinalized_EmergencySealing tests that emergency sealing kicks in to resolve sealing halt
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_EmergencySealing() {
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
setter, err := updatable_configs.NewSealingConfigs(
flow.DefaultRequiredApprovalsForSealConstruction,
flow.DefaultRequiredApprovalsForSealValidation,
flow.DefaultChunkAssignmentAlpha,
true, // enable emergency sealing
)
require.NoError(s.T(), err)
s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter)
require.NoError(s.T(), err)
s.setter = setter
s.SealsPL.On("ByID", mock.Anything).Return(nil, false).Maybe()
s.SealsPL.On("Add", mock.Anything).Run(
func(args mock.Arguments) {
seal := args.Get(0).(*flow.IncorporatedResultSeal)
require.Equal(s.T(), s.Block.ID(), seal.Seal.BlockID)
require.Equal(s.T(), s.IncorporatedResult.Result.ID(), seal.Seal.ResultID)
},
).Return(true, nil).Once()
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.ParentBlock))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Times(approvals.DefaultEmergencySealingThresholdForFinalization)
s.State.On("Sealed").Return(unittest.StateSnapshotForKnownBlock(s.ParentBlock, nil))
err = s.core.ProcessIncorporatedResult(s.IncorporatedResult)
require.NoError(s.T(), err)
lastFinalizedBlock := s.IncorporatedBlock
s.MarkFinalized(lastFinalizedBlock)
for i := 0; i < approvals.DefaultEmergencySealingThresholdForFinalization; i++ {
finalizedBlock := unittest.BlockHeaderWithParentFixture(lastFinalizedBlock)
s.Blocks[finalizedBlock.ID()] = finalizedBlock
s.MarkFinalized(finalizedBlock)
err := s.core.ProcessFinalizedBlock(finalizedBlock.ID())
require.NoError(s.T(), err)
lastFinalizedBlock = finalizedBlock
}
s.SealsPL.AssertExpectations(s.T())
}
// TestOnBlockFinalized_ProcessingOrphanApprovals tests that approvals for orphan forks are rejected as outdated entries without processing
//
// A <- B_1 <- C_1{ IER[B_1] }
// <- B_2 <- C_2{ IER[B_2] } <- D_2{ IER[C_2] }
// <- B_3 <- C_3{ IER[B_3] } <- D_3{ IER[C_3] } <- E_3{ IER[D_3] }
//
// B_1 becomes finalized rendering forks starting at B_2 and B_3 as orphans
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_ProcessingOrphanApprovals() {
forks := make([][]*flow.Block, 3)
forkResults := make([][]*flow.ExecutionResult, len(forks))
for forkIndex := range forks {
forks[forkIndex] = unittest.ChainFixtureFrom(forkIndex+2, s.ParentBlock)
fork := forks[forkIndex]
previousResult := s.IncorporatedResult.Result
for blockIndex, block := range fork {
s.Blocks[block.ID()] = block.Header
s.IdentitiesCache[block.ID()] = s.AuthorizedVerifiers
// create and incorporate result for every block in fork except first one
if blockIndex > 0 {
// create a result
result := unittest.ExecutionResultFixture(unittest.WithPreviousResult(*previousResult))
result.BlockID = block.Header.ParentID
result.Chunks = s.Chunks
forkResults[forkIndex] = append(forkResults[forkIndex], result)
previousResult = result
// incorporate in fork
IR := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()),
unittest.IncorporatedResult.WithResult(result))
err := s.core.processIncorporatedResult(IR)
require.NoError(s.T(), err)
}
}
}
// same block sealed
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.ParentBlock))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
// block B_1 becomes finalized
finalized := forks[0][0].Header
s.MarkFinalized(finalized)
err := s.core.ProcessFinalizedBlock(finalized.ID())
require.NoError(s.T(), err)
// verify will be called twice for every approval in first fork
s.PublicKey.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Times(len(forkResults[0]) * 2)
// try submitting approvals for each result
for _, results := range forkResults {
for _, result := range results {
executedBlockID := result.BlockID
resultID := result.ID()
approval := unittest.ResultApprovalFixture(unittest.WithChunk(0),
unittest.WithApproverID(s.VerID),
unittest.WithBlockID(executedBlockID),
unittest.WithExecutionResultID(resultID))
err := s.core.processApproval(approval)
require.NoError(s.T(), err)
}
}
}
// TestOnBlockFinalized_ExtendingUnprocessableFork tests that extending orphan fork results in non processable collectors
//
// . - X <- Y <- Z
// . /
// . <- A <- B <- C <- D <- E
// . |
// . finalized
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_ExtendingUnprocessableFork() {
forks := make([][]*flow.Block, 2)
for forkIndex := range forks {
forks[forkIndex] = unittest.ChainFixtureFrom(forkIndex+3, s.Block)
fork := forks[forkIndex]
for _, block := range fork {
s.Blocks[block.ID()] = block.Header
s.IdentitiesCache[block.ID()] = s.AuthorizedVerifiers
}
}
finalized := forks[1][0].Header
s.MarkFinalized(finalized)
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.ParentBlock))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
// finalize block B
err := s.core.ProcessFinalizedBlock(finalized.ID())
require.NoError(s.T(), err)
// create incorporated result for each block in main fork
for forkIndex, fork := range forks {
previousResult := s.IncorporatedResult.Result
for blockIndex, block := range fork {
result := unittest.ExecutionResultFixture(unittest.WithPreviousResult(*previousResult))
result.BlockID = block.Header.ParentID
result.Chunks = s.Chunks
previousResult = result
// incorporate in fork
IR := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()),
unittest.IncorporatedResult.WithResult(result))
err := s.core.processIncorporatedResult(IR)
collector := s.core.collectorTree.GetCollector(result.ID())
if forkIndex > 0 {
require.NoError(s.T(), err)
require.Equal(s.T(), approvals.VerifyingApprovals, collector.ProcessingStatus())
} else {
if blockIndex == 0 {
require.Error(s.T(), err)
require.True(s.T(), engine.IsOutdatedInputError(err))
} else {
require.Equal(s.T(), approvals.CachingApprovals, collector.ProcessingStatus())
}
}
}
}
}
// TestOnBlockFinalized_ExtendingSealedResult tests if assignment collector tree accepts collector which extends sealed result
func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_ExtendingSealedResult() {
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.Block))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil).Once()
unsealedBlock := unittest.BlockHeaderWithParentFixture(s.Block)
s.Blocks[unsealedBlock.ID()] = unsealedBlock
s.IdentitiesCache[unsealedBlock.ID()] = s.AuthorizedVerifiers
result := unittest.ExecutionResultFixture(unittest.WithPreviousResult(*s.IncorporatedResult.Result))
result.BlockID = unsealedBlock.ID()
s.MarkFinalized(unsealedBlock)
err := s.core.ProcessFinalizedBlock(unsealedBlock.ID())
require.NoError(s.T(), err)
incorporatedBlock := unittest.BlockHeaderWithParentFixture(unsealedBlock)
s.Blocks[incorporatedBlock.ID()] = incorporatedBlock
s.IdentitiesCache[incorporatedBlock.ID()] = s.AuthorizedVerifiers
IR := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithIncorporatedBlockID(incorporatedBlock.ID()),
unittest.IncorporatedResult.WithResult(result))
err = s.core.processIncorporatedResult(IR)
require.NoError(s.T(), err)
s.sealsDB.AssertExpectations(s.T())
}
// TestRequestPendingApprovals checks that requests are sent only for chunks
// that have not collected enough approvals yet, and are sent only to the
// verifiers assigned to those chunks. It also checks that the threshold and
// rate limiting is respected.
func (s *ApprovalProcessingCoreTestSuite) TestRequestPendingApprovals() {
s.core.requestTracker = approvals.NewRequestTracker(s.core.headers, 1, 3)
s.SealsPL.On("ByID", mock.Anything).Return(nil, false)
// n is the total number of blocks and incorporated-results we add to the
// chain and mempool
n := 100
// create blocks
unsealedFinalizedBlocks := make([]flow.Block, 0, n)
parentBlock := s.ParentBlock
for i := 0; i < n; i++ {
block := unittest.BlockWithParentFixture(parentBlock)
s.Blocks[block.ID()] = block.Header
s.IdentitiesCache[block.ID()] = s.AuthorizedVerifiers
unsealedFinalizedBlocks = append(unsealedFinalizedBlocks, *block)
parentBlock = block.Header
}
// progress latest sealed and latest finalized:
//s.LatestSealedBlock = unsealedFinalizedBlocks[0]
//s.LatestFinalizedBlock = &unsealedFinalizedBlocks[n-1]
// add an unfinalized block; it shouldn't require an approval request
unfinalizedBlock := unittest.BlockWithParentFixture(parentBlock)
s.Blocks[unfinalizedBlock.ID()] = unfinalizedBlock.Header
// we will assume that all chunks are assigned to the same two verifiers.
verifiers := make([]flow.Identifier, 0)
for nodeID := range s.AuthorizedVerifiers {
if len(verifiers) > 2 {
break
}
verifiers = append(verifiers, nodeID)
}
// the sealing Core requires approvals from both verifiers for each chunk
err := s.setter.SetRequiredApprovalsForSealingConstruction(2)
require.NoError(s.T(), err)
// populate the incorporated-results tree with:
// - 50 that have collected two signatures per chunk
// - 25 that have collected only one signature
// - 25 that have collected no signatures
//
//
// sealed unsealed/finalized
// | || |
// 1 <- 2 <- .. <- s <- s+1 <- .. <- n-t <- n
// | |
// expected reqs
prevResult := s.IncorporatedResult.Result
resultIDs := make([]flow.Identifier, 0, n)
chunkCount := 2
for i := 0; i < n-1; i++ {
// Create an incorporated result for unsealedFinalizedBlocks[i].
// By default the result will contain 17 chunks.
ir := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithResult(
unittest.ExecutionResultFixture(
unittest.WithBlock(&unsealedFinalizedBlocks[i]),
unittest.WithPreviousResult(*prevResult),
unittest.WithChunks(uint(chunkCount)),
),
),
unittest.IncorporatedResult.WithIncorporatedBlockID(
unsealedFinalizedBlocks[i+1].ID(),
),
)
prevResult = ir.Result
assignmentBuilder := chunks.NewAssignmentBuilder()
for _, chunk := range ir.Result.Chunks {
// assign the verifier to this chunk
require.NoError(s.T(), assignmentBuilder.Add(chunk.Index, verifiers))
}
s.ChunksAssignment = assignmentBuilder.Build()
err := s.core.processIncorporatedResult(ir)
require.NoError(s.T(), err)
resultIDs = append(resultIDs, ir.Result.ID())
}
// sealed block doesn't change
seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(s.ParentBlock))
s.sealsDB.On("HighestInFork", mock.Anything).Return(seal, nil)
s.State.On("Sealed").Return(unittest.StateSnapshotForKnownBlock(s.ParentBlock, nil))
// start delivering finalization events
lastProcessedIndex := 0
for ; lastProcessedIndex < int(s.core.sealingConfigsGetter.ApprovalRequestsThresholdConst()); lastProcessedIndex++ {
finalized := unsealedFinalizedBlocks[lastProcessedIndex].Header
s.MarkFinalized(finalized)
err := s.core.ProcessFinalizedBlock(finalized.ID())
require.NoError(s.T(), err)
}
require.Empty(s.T(), s.core.requestTracker.GetAllIds())
// process two more blocks, this will trigger requesting approvals for lastSealed + 1 height
// but they will be in blackout period
for i := 0; i < 2; i++ {
finalized := unsealedFinalizedBlocks[lastProcessedIndex].Header
s.MarkFinalized(finalized)
err := s.core.ProcessFinalizedBlock(finalized.ID())
require.NoError(s.T(), err)
lastProcessedIndex += 1
}
require.ElementsMatch(s.T(), s.core.requestTracker.GetAllIds(), resultIDs[:1])
// wait for the max blackout period to elapse
time.Sleep(3 * time.Second)
// our setup is for 5 verification nodes
s.Conduit.On("Publish", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).Times(chunkCount)
// process next block
finalized := unsealedFinalizedBlocks[lastProcessedIndex].Header
s.MarkFinalized(finalized)
err = s.core.ProcessFinalizedBlock(finalized.ID())
require.NoError(s.T(), err)
// now 2 results should be pending
require.ElementsMatch(s.T(), s.core.requestTracker.GetAllIds(), resultIDs[:2])
s.Conduit.AssertExpectations(s.T())
}
// TestRepopulateAssignmentCollectorTree tests that the
// collectors tree will contain execution results and assignment collectors will be created.
//
// P <- A[ER{P}] <- B[ER{A}] <- C[ER{B}] <- D[ER{C}] <- E[ER{D}]
// | <- F[ER{A}] <- G[ER{B}] <- H[ER{G}]
// finalized
//
// collectors tree has to be repopulated with incorporated results from blocks [A, B, C, D, F, G]
// E, H shouldn't be considered since
func (s *ApprovalProcessingCoreTestSuite) TestRepopulateAssignmentCollectorTree() {
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
assigner := &module.ChunkAssigner{}
// setup mocks
payloads := &storage.Payloads{}
expectedResults := []*flow.IncorporatedResult{s.IncorporatedResult}
blockChildren := make([]flow.Identifier, 0)
rootSnapshot := unittest.StateSnapshotForKnownBlock(s.rootHeader, nil)
s.Snapshots[s.rootHeader.ID()] = rootSnapshot
rootSnapshot.On("SealingSegment").Return(
&flow.SealingSegment{
Blocks: []*flow.Block{{
Header: s.rootHeader,
Payload: &flow.Payload{},
}},
}, nil)
s.sealsDB.On("HighestInFork", s.IncorporatedBlock.ID()).Return(
unittest.Seal.Fixture(
unittest.Seal.WithBlock(s.ParentBlock)), nil)
// the incorporated block contains the result for the sealing candidate block
incorporatedBlockPayload := unittest.PayloadFixture(
unittest.WithReceipts(
unittest.ExecutionReceiptFixture(
unittest.WithResult(s.IncorporatedResult.Result))))
payloads.On("ByBlockID", s.IncorporatedBlock.ID()).Return(&incorporatedBlockPayload, nil)
emptyPayload := flow.EmptyPayload()
payloads.On("ByBlockID", s.Block.ID()).Return(&emptyPayload, nil)
s.IdentitiesCache[s.IncorporatedBlock.ID()] = s.AuthorizedVerifiers
assigner.On("Assign", s.IncorporatedResult.Result, mock.Anything).Return(s.ChunksAssignment, nil)
// two forks
for i := 0; i < 2; i++ {
fork := unittest.ChainFixtureFrom(i+3, s.IncorporatedBlock)
prevResult := s.IncorporatedResult.Result
// create execution results for all blocks except last one, since it won't be valid by definition
for blockIndex, block := range fork {
blockID := block.ID()
// create execution result for previous block in chain
// this result will be incorporated in current block.
result := unittest.ExecutionResultFixture(
unittest.WithPreviousResult(*prevResult),
)
result.BlockID = block.Header.ParentID
// update caches
s.Blocks[blockID] = block.Header
s.IdentitiesCache[blockID] = s.AuthorizedVerifiers
blockChildren = append(blockChildren, blockID)
IR := unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithResult(result),
unittest.IncorporatedResult.WithIncorporatedBlockID(blockID))
// TODO: change this test for phase 3, assigner should expect incorporated block ID, not executed
if blockIndex < len(fork)-1 {
assigner.On("Assign", result, blockID).Return(s.ChunksAssignment, nil)
expectedResults = append(expectedResults, IR)
} else {
assigner.On("Assign", result, blockID).Return(nil, fmt.Errorf("no assignment for block without valid child"))
}
payload := unittest.PayloadFixture()
payload.Results = append(payload.Results, result)
payloads.On("ByBlockID", blockID).Return(&payload, nil)
prevResult = result
}
}
// Descendants has to return all valid descendants from finalized block
finalSnapShot := unittest.StateSnapshotForKnownBlock(s.IncorporatedBlock, nil)
finalSnapShot.On("Descendants").Return(blockChildren, nil)
s.State.On("Final").Return(finalSnapShot)
core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{},
s.Headers, s.State, s.sealsDB, assigner, s.SigHasher, s.SealsPL, s.Conduit, s.setter)
require.NoError(s.T(), err)
err = core.RepopulateAssignmentCollectorTree(payloads)
require.NoError(s.T(), err)
// check collector tree, after repopulating we should have all collectors for execution results that we have
// traversed and they have to be processable.
for _, incorporatedResult := range expectedResults {
collector, err := core.collectorTree.GetOrCreateCollector(incorporatedResult.Result)
require.NoError(s.T(), err)
require.False(s.T(), collector.Created)
require.Equal(s.T(), approvals.VerifyingApprovals, collector.Collector.ProcessingStatus())
}
}
// TestRepopulateAssignmentCollectorTree_RootSealingSegment tests that the sealing
// engine will be initialized correctly when bootstrapping with a root sealing
// segment with multiple blocks, as is the case when joining the network at an epoch
// boundary.
//
// In particular, the assignment collector tree population step should ignore
// unknown block references below the root height.
func (s *ApprovalProcessingCoreTestSuite) TestRepopulateAssignmentCollectorTree_RootSealingSegment() {
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
assigner := &module.ChunkAssigner{}
payloads := &storage.Payloads{}
// setup mocks
s.rootHeader = s.IncorporatedBlock
expectedResults := []*flow.IncorporatedResult{s.IncorporatedResult}
s.sealsDB.On("HighestInFork", s.IncorporatedBlock.ID()).Return(
unittest.Seal.Fixture(
unittest.Seal.WithBlock(s.ParentBlock)), nil)
// the incorporated block contains the result for the sealing candidate block
incorporatedBlockPayload := unittest.PayloadFixture(
unittest.WithReceipts(
unittest.ExecutionReceiptFixture(
unittest.WithResult(s.IncorporatedResult.Result))))
payloads.On("ByBlockID", s.IncorporatedBlock.ID()).Return(&incorporatedBlockPayload, nil)
// the sealing candidate block (S) is the lowest block in the segment under consideration here
// initially, this block would represent the lowest block in a node's root sealing segment,
// meaning that all earlier blocks are not known. In this case we should ignore results and seals
// referencing unknown blocks (tested here by adding such a result+seal to the candidate payload).
candidatePayload := unittest.PayloadFixture(
unittest.WithReceipts(unittest.ExecutionReceiptFixture()), // receipt referencing pre-root block
unittest.WithSeals(unittest.Seal.Fixture()), // seal referencing pre-root block
)
payloads.On("ByBlockID", s.Block.ID()).Return(&candidatePayload, nil)
s.IdentitiesCache[s.IncorporatedBlock.ID()] = s.AuthorizedVerifiers
assigner.On("Assign", s.IncorporatedResult.Result, mock.Anything).Return(s.ChunksAssignment, nil)
finalSnapShot := unittest.StateSnapshotForKnownBlock(s.rootHeader, nil)
s.Snapshots[s.rootHeader.ID()] = finalSnapShot
// root snapshot has no pending children
finalSnapShot.On("Descendants").Return(nil, nil)
// set up sealing segment
finalSnapShot.On("SealingSegment").Return(
&flow.SealingSegment{
Blocks: []*flow.Block{{
Header: s.Block,
Payload: &candidatePayload,
}, {
Header: s.ParentBlock,
Payload: &flow.Payload{},
}, {
Header: s.IncorporatedBlock,
Payload: &incorporatedBlockPayload,
}},
}, nil)
s.State.On("Final").Return(finalSnapShot)
core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{},
s.Headers, s.State, s.sealsDB, assigner, s.SigHasher, s.SealsPL, s.Conduit, s.setter)
require.NoError(s.T(), err)
err = core.RepopulateAssignmentCollectorTree(payloads)
require.NoError(s.T(), err)
// check collector tree, after repopulating we should have all collectors for execution results that we have
// traversed and they have to be processable.
for _, incorporatedResult := range expectedResults {
collector, err := core.collectorTree.GetOrCreateCollector(incorporatedResult.Result)
require.NoError(s.T(), err)
require.False(s.T(), collector.Created)
require.Equal(s.T(), approvals.VerifyingApprovals, collector.Collector.ProcessingStatus())
}
}