Skip to content

Commit

Permalink
Rename clockDrift to timeTrim. Use timer and stop for ctx.Done
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Jul 18, 2022
1 parent 84c7610 commit 6e36e8b
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ const (
// Delta block times will be excluded from rolling average if they exceed this.
maxConsideredDeltaBlockTimeMs = 15000

// Scenarios for how much clock drift should be added to
// Scenarios for how much time trim should be added to
// target ideal block query window.

// Clock drift addition when a block query fails
queryFailureClockDriftAdditionMs = 73
// Time trim addition when a block query fails
queryFailureTimeTrimAdditionMs = 73

// Clock drift addition when the block queries succeeds
querySuccessClockDriftAdditionMs = -11
// Time trim addition when the block queries succeeds
querySuccessTimeTrimAdditionMs = -31

// Clock drift addition when the latest block is the same as
// Time trim addition when the latest block is the same as
// the last successfully queried block
sameBlockClockDriftAdditionMs = 97
sameBlockTimeTrimAdditionMs = 97
)

// latestClientState is a map of clientID to the latest clientInfo for that client.
Expand Down Expand Up @@ -174,23 +174,23 @@ type queryCyclePersistence struct {
latestQueriedBlockTime time.Time
averageBlockTimeMs int64
minQueryLoopDuration time.Duration
clockDriftMs int64
timeTrimMs int64
}

// addClockDriftMs is used to modify the clock drift for targeting the
// addTimeTrimMs is used to modify the time trim for targeting the
// next ideal window for block queries. For example if a block query fails
// because it is too early to be queried, clock drift should be added. If the query
// succeeds, clock drift should be removed, but not as much as is added for the error
// case. Clock drift should also be added when checking the latest height and it has
// because it is too early to be queried, time trim should be added. If the query
// succeeds, time trim should be removed, but not as much as is added for the error
// case. Time trim should also be added when checking the latest height and it has
// not yet incremented.
func (p *queryCyclePersistence) addClockDriftMs(ms int64) {
p.clockDriftMs += ms
if p.clockDriftMs < 0 {
p.clockDriftMs = 0
} else if p.clockDriftMs > p.averageBlockTimeMs {
func (p *queryCyclePersistence) addTimeTrimMs(ms int64) {
p.timeTrimMs += ms
if p.timeTrimMs < 0 {
p.timeTrimMs = 0
} else if p.timeTrimMs > p.averageBlockTimeMs {
// Should not add more than the average block time as a delay
// when targeting the next available block query.
p.clockDriftMs = p.averageBlockTimeMs
p.timeTrimMs = p.averageBlockTimeMs
}
}

Expand Down Expand Up @@ -234,16 +234,16 @@ func (p *queryCyclePersistence) dynamicBlockTime(
queryDurationMs := time.Since(queryStart).Milliseconds()

// also take into account older blocks, where timeQueriedAfterBlockTime > p.averageBlockTimeMs, by using remainder.
// clock drift tolerant using clockDriftMs trim value
// time trim tolerant using timeTrimMs trim value
targetedQueryTimeFromNow := p.averageBlockTimeMs - (timeQueriedAfterBlockTime % p.averageBlockTimeMs) +
p.clockDriftMs - queryDurationMs
p.timeTrimMs - queryDurationMs

p.minQueryLoopDuration = time.Millisecond * time.Duration(targetedQueryTimeFromNow)

log.Debug("Dynamic query time",
zap.Int64("avg_block_ms", p.averageBlockTimeMs),
zap.Int64("targeted_query_ms", targetedQueryTimeFromNow),
zap.Int64("clock_drift_ms", p.clockDriftMs),
zap.Int64("clock_drift_ms", p.timeTrimMs),
)
}

Expand Down Expand Up @@ -297,13 +297,16 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
ccp.log.Debug("Entering main query loop")

for {
if err := ccp.queryCycle(ctx, &persistence); err != nil {
return err
}
t := time.NewTimer(persistence.minQueryLoopDuration)
select {
case <-ctx.Done():
t.Stop()
return nil
case <-time.After(persistence.minQueryLoopDuration):
if err := ccp.queryCycle(ctx, &persistence); err != nil {
return err
}
case <-t.C:
continue
}
}
}
Expand Down Expand Up @@ -375,7 +378,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
)

if persistence.latestHeight == persistence.latestQueriedBlock {
persistence.addClockDriftMs(sameBlockClockDriftAdditionMs)
persistence.addTimeTrimMs(sameBlockTimeTrimAdditionMs)
persistence.minQueryLoopDuration = defaultMinQueryLoopDuration
return nil
}
Expand Down Expand Up @@ -424,11 +427,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
})

if err := eg.Wait(); err != nil {
persistence.addClockDriftMs(queryFailureClockDriftAdditionMs)
persistence.addTimeTrimMs(queryFailureTimeTrimAdditionMs)
ccp.log.Warn("Error querying block data", zap.Error(err))
break
}
persistence.addClockDriftMs(querySuccessClockDriftAdditionMs)
persistence.addTimeTrimMs(querySuccessTimeTrimAdditionMs)

latestHeader = ibcHeader.(cosmos.CosmosIBCHeader)

Expand Down

0 comments on commit 6e36e8b

Please sign in to comment.