Skip to content

Commit 990cf81

Browse files
committed
likenft-indexer: batch BookNFT eth_getLogs calls to reduce Alchemy CU spend
Batch per-contract acquire-book-nft-events tasks into groups of 50 (configurable via TASK_ACQUIRE_BOOKNFT_BATCH_SIZE). The inner handler already supports multiple addresses, so this change is purely in the task wrapping and enqueuing layer. - Add TaskAcquireBookNFTBatchSize config field - Change lifecycle task payload to accept []string with backward compat - Batch lifecycles before enqueuing, one asynq task per batch - Enqueue inside WithEnqueueing callback to avoid orphaned lifecycles - Guard against negative fetch count when queue is full - Add per-address fallback when batched eth_getLogs exceeds response limits
1 parent 93da984 commit 990cf81

5 files changed

Lines changed: 292 additions & 53 deletions

File tree

likenft-indexer/cmd/worker/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type EnvConfig struct {
2727
EvmEventQueryToBlockPadding uint64 `envconfig:"EVM_EVENT_QUERY_TO_BLOCK_PADDING" default:"10"`
2828

2929
TaskAcquireBookNFTMaxQueueLength int `envconfig:"TASK_ACQUIRE_BOOKNFT_MAX_QUEUE_LENGTH" default:"500"`
30+
TaskAcquireBookNFTBatchSize int `envconfig:"TASK_ACQUIRE_BOOKNFT_BATCH_SIZE" default:"50"`
3031

3132
// The block height weight is the multiplier for the block height to be added to the score
3233
TaskAcquireBookNFTNextProcessingBlockHeightWeight float64 `envconfig:"TASK_ACQUIRE_BOOKNFT_NEXT_PROCESSING_BLOCK_HEIGHT_WEIGHT" default:"0.00000001"`

likenft-indexer/cmd/worker/task/acquire_book_nft_events.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"log/slog"
9+
"strings"
810

911
appcontext "likenft-indexer/cmd/worker/context"
12+
"likenft-indexer/cmd/worker/config"
1013
"likenft-indexer/ent"
1114
"likenft-indexer/ent/schema/typeutil"
1215
"likenft-indexer/internal/database"
@@ -104,6 +107,22 @@ func HandleAcquireBookNFTEventsTask(ctx context.Context, t *asynq.Task) error {
104107
cfg.EvmEventQueryNumberOfBlocksLimit,
105108
)
106109

110+
if err != nil && len(addresses) > 1 {
111+
var errCannotConvertLog *contractevmeventacquirer.ErrCannotConvertLog
112+
if isResponseTooLargeError(err) || errors.As(err, &errCannotConvertLog) {
113+
mylogger.Warn("batched acquire failed, falling back to per-address queries",
114+
"addressCount", len(addresses), "err", err)
115+
if fallbackErr := acquirePerAddress(
116+
ctx, logger, cfg, evmEventQueryClient, evmEventRepository,
117+
evmEventQueryClient, evmClient, nftClassRepository,
118+
addresses, uint64(latestEventsBlockHeight),
119+
); fallbackErr != nil {
120+
return fallbackErr
121+
}
122+
continue
123+
}
124+
}
125+
107126
if err != nil {
108127
mylogger.Error("acquirer.Acquire", "err", err)
109128
var errCannotConvertLog *contractevmeventacquirer.ErrCannotConvertLog
@@ -130,6 +149,74 @@ func HandleAcquireBookNFTEventsTask(ctx context.Context, t *asynq.Task) error {
130149
return nil
131150
}
132151

152+
// isResponseTooLargeError checks if an RPC error indicates the eth_getLogs
153+
// response exceeded the provider's size limit.
154+
// Known error messages:
155+
// - Alchemy: "Log response size exceeded."
156+
// - Geth/others: "query returned more than 10000 results"
157+
func isResponseTooLargeError(err error) bool {
158+
msg := strings.ToLower(err.Error())
159+
return strings.Contains(msg, "log response size exceeded") ||
160+
strings.Contains(msg, "query returned more than")
161+
}
162+
163+
// acquirePerAddress retries event acquisition one address at a time.
164+
// Used as a fallback when a batched FilterLogs call exceeds response limits.
165+
func acquirePerAddress(
166+
ctx context.Context,
167+
logger *slog.Logger,
168+
cfg *config.EnvConfig,
169+
abiManager contractevmeventacquirer.ABIManager,
170+
evmEventRepository database.EVMEventRepository,
171+
evmEventQueryClient contractevmeventacquirer.EvmEventQueryClient,
172+
evmClient contractevmeventacquirer.EvmClient,
173+
nftClassRepository database.NFTClassRepository,
174+
addresses []string,
175+
fromBlock uint64,
176+
) error {
177+
mylogger := logger.WithGroup("acquirePerAddress")
178+
179+
for _, addr := range addresses {
180+
singleAcquirer := contractevmeventacquirer.NewContractEvmEventsAcquirer(
181+
abiManager,
182+
evmEventRepository,
183+
evmEventQueryClient,
184+
evmClient,
185+
cfg.EvmEventQueryToBlockPadding,
186+
contractevmeventacquirer.ContractEvmEventsAcquirerContractTypeBookNFT,
187+
[]string{addr},
188+
)
189+
190+
newBlockHeight, _, err := singleAcquirer.Acquire(
191+
ctx,
192+
logger,
193+
fromBlock,
194+
cfg.EvmEventQueryNumberOfBlocksLimit,
195+
)
196+
if err != nil {
197+
mylogger.Error("acquirer.Acquire", "addr", addr, "err", err)
198+
var errCannotConvertLog *contractevmeventacquirer.ErrCannotConvertLog
199+
if errors.As(err, &errCannotConvertLog) {
200+
if disableErr := nftClassRepository.DisableForIndexing(
201+
ctx, errCannotConvertLog.Log.Address.Hex(), err.Error(),
202+
); disableErr != nil {
203+
mylogger.Error("DisableForIndexing", "addr", addr, "err", disableErr)
204+
}
205+
continue
206+
}
207+
return err
208+
}
209+
210+
if updateErr := nftClassRepository.UpdateNFTClassesLatestEventBlockNumber(
211+
ctx, []string{addr}, typeutil.Uint64(newBlockHeight),
212+
); updateErr != nil {
213+
mylogger.Error("UpdateNFTClassesLatestEventBlockNumber", "addr", addr, "err", updateErr)
214+
}
215+
}
216+
217+
return nil
218+
}
219+
133220
func init() {
134221
Tasks.Register(task.DefineTask(
135222
TypeAcquireBookNFTEventsTaskPayload,

likenft-indexer/cmd/worker/task/acquire_book_nft_events_with_lifecycle.go

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,25 @@ import (
1717
const TypeAcquireBookNFTEventsTaskPayloadWithLifecyclePayload = "acquire-book-nft-events-with-lifecycle"
1818

1919
type AcquireBookNFTEventsTaskPayloadWithLifecyclePayload struct {
20-
ContractAddress string
20+
// Deprecated: use ContractAddresses instead. Kept for backward compatibility
21+
// with in-flight tasks during rolling deploy.
22+
ContractAddress string `json:"ContractAddress,omitempty"`
23+
ContractAddresses []string `json:"ContractAddresses,omitempty"`
2124
}
2225

23-
func NewTypeAcquireBookNFTEventsTaskPayloadWithLifecycle(contractAddress string) (*asynq.Task, error) {
26+
func (p *AcquireBookNFTEventsTaskPayloadWithLifecyclePayload) GetAddresses() []string {
27+
if len(p.ContractAddresses) > 0 {
28+
return p.ContractAddresses
29+
}
30+
if p.ContractAddress != "" {
31+
return []string{p.ContractAddress}
32+
}
33+
return nil
34+
}
35+
36+
func NewTypeAcquireBookNFTEventsTaskPayloadWithLifecycle(contractAddresses []string) (*asynq.Task, error) {
2437
payload, err := json.Marshal(AcquireBookNFTEventsTaskPayloadWithLifecyclePayload{
25-
ContractAddress: contractAddress,
38+
ContractAddresses: contractAddresses,
2639
})
2740
if err != nil {
2841
return nil, err
@@ -50,39 +63,72 @@ func handlerWithLifecycle(
5063
return fmt.Errorf("json.Unmarshal: %v", err)
5164
}
5265

53-
task, err := NewAcquireBookNFTEventsTask(
54-
[]string{p.ContractAddress},
55-
)
56-
if err != nil {
57-
return fmt.Errorf("NewAcquireBookNFTEventsTask: %v", err)
66+
addresses := p.GetAddresses()
67+
if len(addresses) == 0 {
68+
return fmt.Errorf("no contract addresses in payload: %w", asynq.SkipRetry)
5869
}
59-
lifecycle, err := nftclassacquirebooknftevent.MakeNFTClassAcquireBookNFTEventLifecycleFromAddress(
60-
ctx,
61-
nftClassAcquireBookNFTEventsRepository,
62-
p.ContractAddress,
63-
nftclassacquirebooknftevent.MakeCalculateNextProcessingScoreFn(
64-
config.TaskAcquireBookNFTNextProcessingBlockHeightWeight,
65-
config.TaskAcquireBookNFTNextProcessingTimeFloor,
66-
config.TaskAcquireBookNFTNextProcessingTimeCeiling,
67-
config.TaskAcquireBookNFTNextProcessingTimeWeight,
68-
),
69-
nftclassacquirebooknftevent.MakeCalculateTimeoutScoreFn(
70-
config.TaskAcquireBookNFTInProgressTimeoutSeconds,
71-
),
72-
nftclassacquirebooknftevent.MakeCalculateRetryScoreFn(
73-
config.TaskAcquireBookNFTRetryInitialTimeoutSeconds,
74-
config.TaskAcquireBookNFTRetryExponentialBackoffCoeff,
75-
config.TaskAcquireBookNFTRetryMaxTimeoutSeconds,
76-
),
70+
71+
calculateNextScoreFn := nftclassacquirebooknftevent.MakeCalculateNextProcessingScoreFn(
72+
config.TaskAcquireBookNFTNextProcessingBlockHeightWeight,
73+
config.TaskAcquireBookNFTNextProcessingTimeFloor,
74+
config.TaskAcquireBookNFTNextProcessingTimeCeiling,
75+
config.TaskAcquireBookNFTNextProcessingTimeWeight,
76+
)
77+
calculateTimeoutScoreFn := nftclassacquirebooknftevent.MakeCalculateTimeoutScoreFn(
78+
config.TaskAcquireBookNFTInProgressTimeoutSeconds,
7779
)
80+
calculateRetryScoreFn := nftclassacquirebooknftevent.MakeCalculateRetryScoreFn(
81+
config.TaskAcquireBookNFTRetryInitialTimeoutSeconds,
82+
config.TaskAcquireBookNFTRetryExponentialBackoffCoeff,
83+
config.TaskAcquireBookNFTRetryMaxTimeoutSeconds,
84+
)
85+
86+
// Build lifecycle objects, tracking which addresses have valid lifecycles
87+
var lifecycles []nftclassacquirebooknftevent.NFTClassAcquireBookNFTEventLifecycle
88+
var validAddresses []string
89+
for _, addr := range addresses {
90+
lifecycle, err := nftclassacquirebooknftevent.MakeNFTClassAcquireBookNFTEventLifecycleFromAddress(
91+
ctx,
92+
nftClassAcquireBookNFTEventsRepository,
93+
addr,
94+
calculateNextScoreFn,
95+
calculateTimeoutScoreFn,
96+
calculateRetryScoreFn,
97+
)
98+
if err != nil {
99+
mylogger.Error("MakeNFTClassAcquireBookNFTEventLifecycleFromAddress", "addr", addr, "err", err)
100+
continue
101+
}
102+
lifecycles = append(lifecycles, lifecycle)
103+
validAddresses = append(validAddresses, addr)
104+
}
105+
106+
if len(lifecycles) == 0 {
107+
return fmt.Errorf("no valid lifecycles for batch: %w", asynq.SkipRetry)
108+
}
109+
110+
// Create innerTask with only addresses that have valid lifecycles
111+
innerTask, err := NewAcquireBookNFTEventsTask(validAddresses)
78112
if err != nil {
79-
return fmt.Errorf("p.ToLifecycle: %v", err)
113+
return fmt.Errorf("NewAcquireBookNFTEventsTask: %v", err)
80114
}
81115

82-
if err := lifecycle.WithEnqueued(ctx, func(nftClass *ent.NFTClass) error {
83-
return handler(ctx, task)
84-
}); err != nil {
85-
mylogger.Error("lifecycle.WithEnqueued", "err", err)
116+
// Run the inner handler once, then transition all lifecycles.
117+
// The first lifecycle executes the handler; the rest reuse its result.
118+
// NOTE: This relies on WithEnqueued calling the callback synchronously.
119+
var handlerErr error
120+
handlerExecuted := false
121+
122+
for _, lifecycle := range lifecycles {
123+
if err := lifecycle.WithEnqueued(ctx, func(nftClass *ent.NFTClass) error {
124+
if !handlerExecuted {
125+
handlerExecuted = true
126+
handlerErr = handler(ctx, innerTask)
127+
}
128+
return handlerErr
129+
}); err != nil {
130+
mylogger.Error("lifecycle.WithEnqueued", "err", err)
131+
}
86132
}
87133

88134
return nil
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package task
2+
3+
import "testing"
4+
5+
func TestAcquireBookNFTEventsTaskPayloadWithLifecyclePayload_GetAddresses(t *testing.T) {
6+
tests := []struct {
7+
name string
8+
payload AcquireBookNFTEventsTaskPayloadWithLifecyclePayload
9+
expected []string
10+
}{
11+
{
12+
name: "new field takes precedence",
13+
payload: AcquireBookNFTEventsTaskPayloadWithLifecyclePayload{
14+
ContractAddress: "0xOLD",
15+
ContractAddresses: []string{"0xA", "0xB"},
16+
},
17+
expected: []string{"0xA", "0xB"},
18+
},
19+
{
20+
name: "falls back to old single address",
21+
payload: AcquireBookNFTEventsTaskPayloadWithLifecyclePayload{
22+
ContractAddress: "0xOLD",
23+
},
24+
expected: []string{"0xOLD"},
25+
},
26+
{
27+
name: "returns nil when both empty",
28+
payload: AcquireBookNFTEventsTaskPayloadWithLifecyclePayload{},
29+
expected: nil,
30+
},
31+
{
32+
name: "empty slice falls back to old address",
33+
payload: AcquireBookNFTEventsTaskPayloadWithLifecyclePayload{
34+
ContractAddress: "0xOLD",
35+
ContractAddresses: []string{},
36+
},
37+
expected: []string{"0xOLD"},
38+
},
39+
}
40+
41+
for _, tt := range tests {
42+
t.Run(tt.name, func(t *testing.T) {
43+
got := tt.payload.GetAddresses()
44+
if tt.expected == nil {
45+
if got != nil {
46+
t.Fatalf("got %v, want nil", got)
47+
}
48+
return
49+
}
50+
if len(got) != len(tt.expected) {
51+
t.Fatalf("got %v, want %v", got, tt.expected)
52+
}
53+
for i := range got {
54+
if got[i] != tt.expected[i] {
55+
t.Fatalf("got[%d] = %q, want %q", i, got[i], tt.expected[i])
56+
}
57+
}
58+
})
59+
}
60+
}

0 commit comments

Comments
 (0)