Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-3122] Make get_logs call in small chunks #729

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assertions/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Manager struct {
delegatedStaking bool
autoDeposit bool
autoAllowanceApproval bool
maxGetLogBlocks uint64
}

type assertionChainData struct {
Expand Down Expand Up @@ -192,6 +193,14 @@ func WithAverageBlockCreationTime(t time.Duration) Opt {
}
}

// WithMaxGetLogBlocks overrides the default maximum number of blocks to get
// logs for in a single call.
func WithMaxGetLogBlocks(n uint64) Opt {
return func(m *Manager) {
m.maxGetLogBlocks = n
}
}

// WithMinimumGapToParentAssertion overrides the default minimum gap (in duration)
// to parent assertion creation time.
//
Expand Down Expand Up @@ -238,6 +247,7 @@ func NewManager(
rivalHandler: nil, // Must be set after construction if mode > DefensiveMode
autoDeposit: true,
autoAllowanceApproval: true,
maxGetLogBlocks: 1000,
}
for _, o := range opts {
o(m)
Expand Down
36 changes: 21 additions & 15 deletions assertions/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,28 @@ func (m *Manager) syncAssertions(ctx context.Context) {
return
}
if fromBlock != toBlock {
filterOpts := &bind.FilterOpts{
Start: fromBlock,
End: &toBlock,
Context: ctx,
}
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
innerErr := m.processAllAssertionsInRange(ctx, filterer, filterOpts)
if innerErr != nil {
log.Error("Could not process assertions in range", "err", innerErr)
return false, innerErr
for startBlock := fromBlock; startBlock <= toBlock; startBlock = startBlock + m.maxGetLogBlocks {
endBlock := startBlock + m.maxGetLogBlocks
if endBlock > toBlock {
endBlock = toBlock
}
filterOpts := &bind.FilterOpts{
Start: startBlock,
End: &endBlock,
Context: ctx,
}
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
innerErr := m.processAllAssertionsInRange(ctx, filterer, filterOpts)
if innerErr != nil {
log.Error("Could not process assertions in range", "err", innerErr)
return false, innerErr
}
return true, nil
})
if err != nil {
log.Error("Could not check for assertion added event")
return
}
return true, nil
})
if err != nil {
log.Error("Could not check for assertion added event")
return
}
fromBlock = toBlock
}
Expand Down
61 changes: 35 additions & 26 deletions challenge-manager/chain-watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Watcher struct {
// Track all if empty / nil.
trackChallengeParentAssertionHashes []protocol.AssertionHash
maxLookbackBlocks uint64
maxGetLogBlocks uint64
}

// New initializes a watcher service for frequently scanning the chain
Expand All @@ -102,6 +103,7 @@ func New(
averageTimeForBlockCreation time.Duration,
trackChallengeParentAssertionHashes []protocol.AssertionHash,
maxLookbackBlocks uint64,
maxGetLogBlocks uint64,
) (*Watcher, error) {
return &Watcher{
chain: chain,
Expand All @@ -118,6 +120,7 @@ func New(
evilEdgesByLevel: threadsafe.NewMap(threadsafe.MapWithMetric[protocol.ChallengeLevel, *threadsafe.Set[protocol.EdgeId]]("evilEdgesByLevel")),
trackChallengeParentAssertionHashes: trackChallengeParentAssertionHashes,
maxLookbackBlocks: maxLookbackBlocks,
maxGetLogBlocks: maxGetLogBlocks,
}, nil
}

Expand Down Expand Up @@ -211,33 +214,39 @@ func (w *Watcher) Start(ctx context.Context) {
log.Error("Could not initialize edge challenge manager filterer", "err", err)
return
}
filterOpts := &bind.FilterOpts{
Start: fromBlock,
End: &toBlock,
Context: ctx,
}
for startBlock := fromBlock; startBlock <= toBlock; startBlock = startBlock + w.maxGetLogBlocks {
endBlock := startBlock + w.maxGetLogBlocks
if endBlock > toBlock {
endBlock = toBlock
}
filterOpts := &bind.FilterOpts{
Start: startBlock,
End: &endBlock,
Context: ctx,
}

// Checks for different events right away before we start polling.
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
return true, w.checkForEdgeAdded(ctx, filterer, filterOpts)
})
if err != nil {
log.Error("Could not check for edge added", "err", err)
return
}
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
return true, w.checkForEdgeConfirmedByOneStepProof(ctx, filterer, filterOpts)
})
if err != nil {
log.Error("Could not check for edge confirmed by osp", "err", err)
return
}
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
return true, w.checkForEdgeConfirmedByTime(ctx, filterer, filterOpts)
})
if err != nil {
log.Error("Could not check for edge confirmed by time", "err", err)
return
// Checks for different events right away before we start polling.
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
return true, w.checkForEdgeAdded(ctx, filterer, filterOpts)
})
if err != nil {
log.Error("Could not check for edge added", "err", err)
return
}
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
return true, w.checkForEdgeConfirmedByOneStepProof(ctx, filterer, filterOpts)
})
if err != nil {
log.Error("Could not check for edge confirmed by osp", "err", err)
return
}
_, err = retry.UntilSucceeds(ctx, func() (bool, error) {
return true, w.checkForEdgeConfirmedByTime(ctx, filterer, filterOpts)
})
if err != nil {
log.Error("Could not check for edge confirmed by time", "err", err)
return
}
}

fromBlock = toBlock
Expand Down
2 changes: 2 additions & 0 deletions challenge-manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func setupEdgeTrackersForBisection(
avgBlockTime,
nil,
100,
10,
)
require.NoError(t, err)
honestWatcher.SetEdgeManager(honestValidator)
Expand Down Expand Up @@ -220,6 +221,7 @@ func setupEdgeTrackersForBisection(
avgBlockTime,
nil,
100,
10,
)
require.NoError(t, err)
evilWatcher.SetEdgeManager(evilValidator)
Expand Down
16 changes: 16 additions & 0 deletions challenge-manager/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type stackParams struct {
enableFastConfirmation bool
assertionManagerOverride *assertions.Manager
maxLookbackBlocks int64
maxGetLogBlocks int64
delegatedStaking bool
autoDeposit bool
autoAllowanceApproval bool
Expand All @@ -56,6 +57,7 @@ var defaultStackParams = stackParams{
enableFastConfirmation: false,
assertionManagerOverride: nil,
maxLookbackBlocks: blocksPerInterval(time.Second*12, 21*24*time.Hour), // Default to 3 weeks worth of blocks.
maxGetLogBlocks: 1000,
delegatedStaking: false,
autoDeposit: true,
autoAllowanceApproval: true,
Expand Down Expand Up @@ -158,6 +160,14 @@ func StackWithSyncMaxLookbackBlocks(maxLookback int64) StackOpt {
}
}

// StackWithSyncMaxGetLogBlocks specifies the max size chunks of blocks to use when using get logs rpc for
// when syncing the chain watcher.
func StackWithSyncMaxGetLogBlocks(maxGetLog int64) StackOpt {
return func(p *stackParams) {
p.maxGetLogBlocks = maxGetLog
}
}

// StackWithDelegatedStaking specifies that the challenge manager will call
// the `newStake` function in the rollup contract on startup to await funding from another account
// such that it becomes a delegated staker.
Expand Down Expand Up @@ -218,6 +228,10 @@ func NewChallengeStack(
if err != nil {
return nil, err
}
maxGetLogBlocks, err := safecast.ToUint64(params.maxGetLogBlocks)
if err != nil {
return nil, err
}

// Create the chain watcher.
watcher, err := watcher.New(
Expand All @@ -229,6 +243,7 @@ func NewChallengeStack(
params.avgBlockTime,
params.trackChallengeParentAssertionHashes,
maxLookbackBlocks,
maxGetLogBlocks,
)
if err != nil {
return nil, err
Expand All @@ -254,6 +269,7 @@ func NewChallengeStack(
assertions.WithPollingInterval(params.pollInterval),
assertions.WithPostingInterval(params.postInterval),
assertions.WithMinimumGapToParentAssertion(params.minGapToParent),
assertions.WithMaxGetLogBlocks(maxGetLogBlocks),
}
if apiDB != nil {
amOpts = append(amOpts, assertions.WithAPIDB(apiDB))
Expand Down
Loading