Skip to content

Commit a8b94ca

Browse files
graph, store: Batch stale contracts and handle max limit
Signed-off-by: Maksim Dimitrov <[email protected]>
1 parent d950b9e commit a8b94ca

File tree

3 files changed

+154
-76
lines changed

3 files changed

+154
-76
lines changed

graph/src/blockchain/mock.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,11 @@ impl ChainStore for MockChainStore {
571571
async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> {
572572
unimplemented!()
573573
}
574-
async fn clear_stale_call_cache(&self, _ttl_days: i32) -> Result<(), Error> {
574+
async fn clear_stale_call_cache(
575+
&self,
576+
_ttl_days: i32,
577+
_ttl_max_contracts: Option<i64>,
578+
) -> Result<(), Error> {
575579
unimplemented!()
576580
}
577581
fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {

graph/src/components/store/traits.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,11 @@ pub trait ChainStore: ChainHeadStore {
600600
async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>;
601601

602602
/// Clears stale call cache entries for the given TTL in days.
603-
async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error>;
603+
async fn clear_stale_call_cache(
604+
&self,
605+
ttl_days: i32,
606+
ttl_max_contracts: Option<i64>,
607+
) -> Result<(), Error>;
604608

605609
/// Return the chain identifier for this store.
606610
fn chain_identifier(&self) -> Result<ChainIdentifier, Error>;

store/postgres/src/chain_store.rs

Lines changed: 144 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ mod data {
105105
use graph::prelude::transaction_receipt::LightTransactionReceipt;
106106
use graph::prelude::web3::types::H256;
107107
use graph::prelude::{
108-
serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger, StoreError,
108+
info, serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger,
109+
StoreError,
109110
};
111+
110112
use std::collections::HashMap;
111113
use std::convert::TryFrom;
112114
use std::fmt;
@@ -1404,56 +1406,87 @@ mod data {
14041406
conn: &mut PgConnection,
14051407
logger: &Logger,
14061408
ttl_days: i32,
1409+
ttl_max_contracts: Option<i64>,
14071410
) -> Result<(), Error> {
1408-
// Delete cache entries in batches since there could be thousands of cache entries per contract
1409-
let mut total_deleted = 0;
1410-
let batch_size = 5000;
1411+
let mut total_calls: usize = 0;
1412+
let mut total_contracts: i64 = 0;
1413+
// We process contracts in batches to avoid loading too many entries into memory
1414+
// at once. Each contract can have many calls, so we also delete calls in batches.
1415+
// Note: The batch sizes were chosen based on experimentation. Potentially, they
1416+
// could be made configurable via ENV vars.
1417+
let contracts_batch_size: i64 = 2000;
1418+
let cache_batch_size: usize = 10000;
1419+
1420+
// Limits the number of contracts to process if ttl_max_contracts is set.
1421+
// Used also to adjust the final batch size, so we don't process more
1422+
// contracts than the set limit.
1423+
let remaining_contracts = |processed: i64| -> Option<i64> {
1424+
ttl_max_contracts.map(|limit| limit.saturating_sub(processed))
1425+
};
14111426

14121427
match self {
14131428
Storage::Shared => {
14141429
use public::eth_call_cache as cache;
14151430
use public::eth_call_meta as meta;
14161431

1417-
let stale_contracts = meta::table
1418-
.select(meta::contract_address)
1419-
.filter(
1420-
meta::accessed_at
1421-
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
1422-
)
1423-
.get_results::<Vec<u8>>(conn)?;
1424-
1425-
if stale_contracts.is_empty() {
1426-
return Ok(());
1427-
}
1428-
14291432
loop {
1430-
let next_batch = cache::table
1431-
.select(cache::id)
1432-
.filter(cache::contract_address.eq_any(&stale_contracts))
1433-
.limit(batch_size as i64)
1434-
.get_results::<Vec<u8>>(conn)?;
1435-
let deleted_count =
1436-
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch)))
1437-
.execute(conn)?;
1433+
if let Some(0) = remaining_contracts(total_contracts) {
1434+
info!(
1435+
logger,
1436+
"Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)",
1437+
total_calls,
1438+
total_contracts
1439+
);
1440+
break;
1441+
}
14381442

1439-
total_deleted += deleted_count;
1443+
let batch_limit = remaining_contracts(total_contracts)
1444+
.map(|left| left.min(contracts_batch_size))
1445+
.unwrap_or(contracts_batch_size);
1446+
1447+
let stale_contracts = meta::table
1448+
.select(meta::contract_address)
1449+
.filter(
1450+
meta::accessed_at
1451+
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
1452+
)
1453+
.limit(batch_limit)
1454+
.get_results::<Vec<u8>>(conn)?;
14401455

1441-
if deleted_count < batch_size {
1456+
if stale_contracts.is_empty() {
1457+
info!(
1458+
logger,
1459+
"Finished cleaning call cache: deleted {} entries for {} contracts",
1460+
total_calls,
1461+
total_contracts
1462+
);
14421463
break;
14431464
}
1444-
}
14451465

1446-
graph::slog::info!(
1447-
logger,
1448-
"Cleaned call cache: deleted {} entries for {} contracts",
1449-
total_deleted,
1450-
stale_contracts.len()
1451-
);
1466+
loop {
1467+
let next_batch = cache::table
1468+
.select(cache::id)
1469+
.filter(cache::contract_address.eq_any(&stale_contracts))
1470+
.limit(cache_batch_size as i64)
1471+
.get_results::<Vec<u8>>(conn)?;
1472+
let deleted_count =
1473+
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch)))
1474+
.execute(conn)?;
14521475

1453-
diesel::delete(
1454-
meta::table.filter(meta::contract_address.eq_any(&stale_contracts)),
1455-
)
1456-
.execute(conn)?;
1476+
total_calls += deleted_count;
1477+
1478+
if deleted_count < cache_batch_size {
1479+
break;
1480+
}
1481+
}
1482+
1483+
let deleted_contracts = diesel::delete(
1484+
meta::table.filter(meta::contract_address.eq_any(&stale_contracts)),
1485+
)
1486+
.execute(conn)?;
1487+
1488+
total_contracts += deleted_contracts as i64;
1489+
}
14571490

14581491
Ok(())
14591492
}
@@ -1463,56 +1496,89 @@ mod data {
14631496
..
14641497
}) => {
14651498
let select_query = format!(
1466-
"SELECT contract_address FROM {} \
1467-
WHERE accessed_at < CURRENT_DATE - interval '{} days'",
1499+
"WITH stale_contracts AS (
1500+
SELECT contract_address
1501+
FROM {}
1502+
WHERE accessed_at < current_date - interval '{} days'
1503+
LIMIT $1
1504+
)
1505+
SELECT contract_address FROM stale_contracts",
14681506
call_meta.qname, ttl_days
14691507
);
14701508

1509+
let delete_cache_query = format!(
1510+
"WITH targets AS (
1511+
SELECT id
1512+
FROM {}
1513+
WHERE contract_address = ANY($1)
1514+
LIMIT {}
1515+
)
1516+
DELETE FROM {} USING targets
1517+
WHERE {}.id = targets.id",
1518+
call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname
1519+
);
1520+
1521+
let delete_meta_query = format!(
1522+
"DELETE FROM {} WHERE contract_address = ANY($1)",
1523+
call_meta.qname
1524+
);
1525+
14711526
#[derive(QueryableByName)]
14721527
struct ContractAddress {
14731528
#[diesel(sql_type = Bytea)]
14741529
contract_address: Vec<u8>,
14751530
}
14761531

1477-
let all_stale_contracts: Vec<Vec<u8>> = sql_query(select_query)
1478-
.load::<ContractAddress>(conn)?
1479-
.into_iter()
1480-
.map(|row| row.contract_address)
1481-
.collect();
1482-
1483-
if all_stale_contracts.is_empty() {
1484-
graph::slog::info!(logger, "Cleaned call cache: no stale entries found");
1485-
return Ok(());
1486-
}
1487-
14881532
loop {
1489-
let delete_cache_query = format!(
1490-
"DELETE FROM {} WHERE id IN (
1491-
SELECT id FROM {}
1492-
WHERE contract_address = ANY($1)
1493-
LIMIT {}
1494-
)",
1495-
call_cache.qname, call_cache.qname, batch_size
1496-
);
1533+
if let Some(0) = remaining_contracts(total_contracts) {
1534+
info!(
1535+
logger,
1536+
"Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)",
1537+
total_calls,
1538+
total_contracts
1539+
);
1540+
break;
1541+
}
14971542

1498-
let deleted_count = sql_query(delete_cache_query)
1499-
.bind::<Array<Bytea>, _>(&all_stale_contracts)
1500-
.execute(conn)?;
1543+
let batch_limit = remaining_contracts(total_contracts)
1544+
.map(|left| left.min(contracts_batch_size))
1545+
.unwrap_or(contracts_batch_size);
1546+
1547+
let stale_contracts: Vec<Vec<u8>> = sql_query(&select_query)
1548+
.bind::<BigInt, _>(batch_limit)
1549+
.load::<ContractAddress>(conn)?
1550+
.into_iter()
1551+
.map(|r| r.contract_address)
1552+
.collect();
1553+
1554+
if stale_contracts.is_empty() {
1555+
info!(
1556+
logger,
1557+
"Finished cleaning call cache: deleted {} entries for {} contracts",
1558+
total_calls,
1559+
total_contracts
1560+
);
1561+
break;
1562+
}
15011563

1502-
total_deleted += deleted_count;
1564+
loop {
1565+
let deleted_count = sql_query(&delete_cache_query)
1566+
.bind::<Array<Bytea>, _>(&stale_contracts)
1567+
.execute(conn)?;
15031568

1504-
if deleted_count < batch_size {
1505-
break;
1569+
total_calls += deleted_count;
1570+
1571+
if deleted_count < cache_batch_size {
1572+
break;
1573+
}
15061574
}
1507-
}
15081575

1509-
let delete_meta_query = format!(
1510-
"DELETE FROM {} WHERE contract_address = ANY($1)",
1511-
call_meta.qname
1512-
);
1513-
sql_query(delete_meta_query)
1514-
.bind::<Array<Bytea>, _>(&all_stale_contracts)
1515-
.execute(conn)?;
1576+
let deleted_contracts = sql_query(&delete_meta_query)
1577+
.bind::<Array<Bytea>, _>(&stale_contracts)
1578+
.execute(conn)?;
1579+
1580+
total_contracts += deleted_contracts as i64;
1581+
}
15161582

15171583
Ok(())
15181584
}
@@ -2629,10 +2695,14 @@ impl ChainStoreTrait for ChainStore {
26292695
Ok(())
26302696
}
26312697

2632-
async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error> {
2698+
async fn clear_stale_call_cache(
2699+
&self,
2700+
ttl_days: i32,
2701+
ttl_max_contracts: Option<i64>,
2702+
) -> Result<(), Error> {
26332703
let conn = &mut *self.get_conn()?;
26342704
self.storage
2635-
.clear_stale_call_cache(conn, &self.logger, ttl_days)
2705+
.clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts)
26362706
}
26372707

26382708
async fn transaction_receipts_in_block(

0 commit comments

Comments
 (0)