Production-ready indexer for multiple blockchains with four cooperating workers: Regular (real-time), Catchup (historical), Rescanner (failed/missed blocks), Manual (manual blocks).
This indexer is designed to be used in a multi-chain environment, where each chain is indexed independently and emits events.
Currently Supported:
- Ethereum
- BSC (Binance Smart Chain)
- TRON
- Polygon
- Arbitrum
- Optimism
Roadmap:
- Bitcoin
- Litecoin
- Dogecoin
- Solana
flowchart TB
subgraph Workers ["Workers"]
direction LR
R[RegularWorker]
C[CatchupWorker]
M[ManualWorker]
Re[RescannerWorker]
end
BW[BaseWorker]
subgraph Storage ["Storage & Messaging"]
direction LR
KV[(KV Store)]
NATS[(NATS Events)]
FChan[(failedChan)]
end
Redis[(Redis ZSET)]
%% Workers to BaseWorker
R --> BW
C --> BW
M --> BW
Re --> BW
%% BaseWorker connections
BW --> KV
BW --> NATS
BW --> FChan
%% Feedback failedChan -> Rescanner
FChan -.-> Re
%% ManualWorker special connection
M -.-> Redis
Logic Flow:
- RegularWorker: real-time indexing, reorg handling, error reporting
- CatchupWorker: backfills gaps, tracks progress, cleans up ranges
- ManualWorker: consumes Redis ranges, concurrent-safe backfill
- RescannerWorker: retries failed blocks, updates KV on success
git clone https://github.com/fystack/multichain-indexer.git
cd transaction-indexer
go mod download
cp configs/config.example.yaml configs/config.yaml
go build -o indexer cmd/indexer/main.go
# Index EVM & TRON in real-time
./indexer index --chains=ethereum_mainnet,tron_mainnet
# Add catchup worker for historical gaps
./indexer index --chains=ethereum_mainnet,tron_mainnet --catchup
# Add manual worker for missing blocks
./indexer index --chains=ethereum_mainnet,tron_mainnet --manual
# For help
./indexer --help- Shared logic for all worker types
- Rate limiting, logging, bloom filter, KV store integration, infrastructure management
- Sends error blocks to
failedChanand stores in<chain>/failed_blocks/<block>
- Continuously processes latest blocks from RPC
- Saves progress to
<chain>/latest_block - For EVM, handle reorgs with rollback window
- On block failure β BaseWorker stores it for retry
- Processes historical blocks in ranges
[start,end] - Uses KV
<chain>/catchup_progress/<start>-<end>to track progress - Deletes the key when a range is completed
- Integrates failed blocks from Rescanner
-
Handles explicit missing blocks (due to RPC errors, reorg skips, or manual intervention).
-
Missing ranges are stored in Redis ZSET:
- Member format:
"start-end" - Score =
start(to sort ranges by block number) - Large ranges split into small ranges (default 5 blocks) for finer retries
- Member format:
-
Concurrency-safe with Redis locks (
SETNX + EXvia Lua) -
Workflow:
- Claim unprocessed range (
GetNextRange) - Process all blocks in
[start,end] - Update progress with
SetRangeProcessed - On full success β
RemoveRange - On partial timeout β reinsert remaining
[current,end]
- Claim unprocessed range (
- Re-processes failed blocks from KV
<chain>/failed_blocks/<block>orfailedChan - Updates KV when retry succeeds
- Removes blocks after max retry attempts
- Skips chain head block to reduce reorg risk
| Key | Purpose |
|---|---|
<chain>/latest_block |
RegularWorker progress |
<chain>/catchup_progress/<start>-<end> |
CatchupWorker progress per range |
<chain>/failed_blocks/<block> |
Failed blocks metadata for retry |
<chain_type>/<address> |
Public key store |
missing_blocks:<chain> |
Redis ZSET of missing ranges |
processing:<chain>:<start>-<end> |
Redis lock key for concurrent claim |
processed:<chain>:<start>-<end> |
Last processed block in range |
Start required services before running the indexer (docker-compose provided):
- NATS server (events)
- Consul (KV) or Badger (embedded)
- PostgreSQL (wallet address repo)
- Redis (for Bloom filter or ManualWorker)
docker-compose up -d- Chains: configurable (
start_block,batch_size,poll_interval) - KVStore: BadgerDB / in-memory / Consul
- Bloom Filter: Redis or in-memory
- Event Emitter: NATS streaming
- RPC Providers: failover + rate-limiting
See configs/config.example.yaml for details.
- Multi-chain support: independent workers per chain
- Auto-catchup: detect gaps β backfill β cleanup
- Failed block recovery: persisted + retryable
- Manual backfill: Redis-driven, safe for concurrency
- State persistence: KV + BlockStore β restart-safe
The chain names passed to --chains must match the names defined in configs/config.yaml.
Example: ethereum_mainnet, tron_mainnet.
# Real-time only
./indexer index --chains=ethereum_mainnet,tron_mainnet
# Real-time + catchup (fill historical gaps)
./indexer index --chains=ethereum_mainnet,tron_mainnet --catchup
# Add manual worker to process missing blocks from Redis
./indexer index --chains=ethereum_mainnet,tron_mainnet --manual
# Debug mode (extra logs)
./indexer index --chains=ethereum_mainnet,tron_mainnet --debug
# NATS JetStream transaction monitoring (using NATS CLI)
nats stream add transfer --subjects="transfer.event.*" --storage=file --retention=workqueue
nats consumer add transfer transaction-consumer --filter="transfer.event.dispatch" --deliver=all --ack=explicit
nats consumer sub transfer transaction-consumer
# Initialize bloom filter and kvstore
./wallet-kv-load run --config configs/config.yaml --batch 10000 --debug
# Migrate from Badger to Consul (edit migrate.yaml first)
./kv-migrate run --config configs/config.yaml --dry-runchains:
ethereum_mainnet: # <- this is the chain name
type: "evm"
nodes:
- url: "https://eth-mainnet.g.alchemy.com/v2/${API_KEY}"
auth:
type: "header"
key: "Authorization"
value: "Bearer ${API_KEY}"
- url: "https://rpc.ankr.com/eth"
start_block: 21500000
poll_interval: "6s"
client:
timeout: "20s"
max_retries: 3
retry_delay: "5s"
throttle:
rps: 8
burst: 16
tron_mainnet:
type: "tron"
nodes:
- url: "https://api.trongrid.io"
auth:
type: "header"
key: "TRON-PRO-API-KEY"
value: "${TRON_API_KEY}"
- url: "https://tron-rpc.publicnode.com"
start_block: 75144237
poll_interval: "8s"
client:
timeout: "20s"
max_retries: 5
retry_delay: "10s"
throttle:
rps: 5
burst: 8The indexer publishes transaction events to NATS JetStream. Here's how to consume them:
- Stream Name:
transfer - Subjects:
transfer.event.* - Transaction Topic:
transfer.event.dispatch - Storage: FileStorage with WorkQueue retention policy
# Create stream (if not exists)
nats stream add transfer --subjects="transfer.event.*" --storage=file --retention=workqueue
# Create consumer
nats consumer add transfer my-consumer --filter="transfer.event.dispatch" --deliver=all --ack=explicit
# Consume transactions
nats consumer sub transfer my-consumer
# Get stream info
nats stream info transferpackage main
import (
"context"
"log"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
nc, _ := nats.Connect("nats://localhost:4222")
defer nc.Close()
js, _ := jetstream.New(nc)
// Get consumer
consumer, _ := js.Consumer(context.Background(), "transfer", "my-consumer")
// Consume messages
consumer.Consume(func(msg jetstream.Msg) {
log.Printf("Transaction: %s", string(msg.Data()))
msg.Ack()
})
select {} // Keep running
}