From e29f070556aa359a0e11a3f49866ecac1df217d2 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Wed, 1 Oct 2025 16:12:58 +0300 Subject: [PATCH 1/8] graph,node,store: Add option to remove stale call_cache in graphman Signed-off-by: Maksim Dimitrov --- graph/src/blockchain/mock.rs | 3 + graph/src/components/store/traits.rs | 3 + node/src/bin/manager.rs | 17 +++- node/src/manager/commands/chain.rs | 12 +++ store/postgres/src/chain_store.rs | 129 ++++++++++++++++++++++++++- 5 files changed, 161 insertions(+), 3 deletions(-) diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 8f0bc565e6c..8938a2162f3 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -571,6 +571,9 @@ impl ChainStore for MockChainStore { async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> { unimplemented!() } + async fn clear_stale_call_cache(&self, _ttl_days: i32) -> Result<(), Error> { + unimplemented!() + } fn chain_identifier(&self) -> Result { unimplemented!() } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index f29c66f4784..2a29ad611cf 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -599,6 +599,9 @@ pub trait ChainStore: ChainHeadStore { /// Clears call cache of the chain for the given `from` and `to` block number. async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>; + /// Clears stale call cache entries for the given TTL in days. + async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error>; + /// Return the chain identifier for this store. fn chain_identifier(&self) -> Result; diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index aba6595f1c9..97114d08fc7 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -555,14 +555,18 @@ pub enum ChainCommand { pub enum CallCacheCommand { /// Remove the call cache of the specified chain. /// - /// Either remove entries in the range `--from` and `--to`, or remove - /// the entire cache with `--remove-entire-cache`. Removing the entire + /// Either remove entries in the range `--from` and `--to`, + /// remove the cache for contracts that have not been accessed for the specified duration --ttl_days, + /// or remove the entire cache with `--remove-entire-cache`. Removing the entire /// cache can reduce indexing performance significantly and should /// generally be avoided. Remove { /// Remove the entire cache #[clap(long, conflicts_with_all = &["from", "to"])] remove_entire_cache: bool, + /// Remove the cache for contracts that have not been accessed in the last days + #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"])] + ttl_days: Option, /// Starting block number #[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")] from: Option, @@ -1472,8 +1476,17 @@ async fn main() -> anyhow::Result<()> { from, to, remove_entire_cache, + ttl_days, } => { let chain_store = ctx.chain_store(&chain_name)?; + if let Some(ttl_days) = ttl_days { + return commands::chain::clear_stale_call_cache( + chain_store, + ttl_days, + ) + .await; + } + if !remove_entire_cache && from.is_none() && to.is_none() { bail!("you must specify either --from and --to or --remove-entire-cache"); } diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 905568a5637..efda09da94c 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -81,6 +81,18 @@ pub async fn clear_call_cache( Ok(()) } +pub async fn clear_stale_call_cache( + chain_store: Arc, + ttl_days: i32, +) -> Result<(), Error> { + println!( + "Removing stale entries from the call cache for `{}`", + chain_store.chain + ); + chain_store.clear_stale_call_cache(ttl_days).await?; + Ok(()) +} + pub async fn info( primary: ConnectionPool, store: Arc, diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index db83199a56c..a38ab4ebedf 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -83,6 +83,7 @@ pub use data::Storage; /// Encapuslate access to the blocks table for a chain. mod data { + use crate::diesel::dsl::IntervalDsl; use diesel::sql_types::{Array, Binary, Bool, Nullable}; use diesel::{connection::SimpleConnection, insert_into}; use diesel::{delete, prelude::*, sql_query}; @@ -104,7 +105,7 @@ mod data { use graph::prelude::transaction_receipt::LightTransactionReceipt; use graph::prelude::web3::types::H256; use graph::prelude::{ - serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, StoreError, + serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger, StoreError, }; use std::collections::HashMap; use std::convert::TryFrom; @@ -1398,6 +1399,126 @@ mod data { } } + pub fn clear_stale_call_cache( + &self, + conn: &mut PgConnection, + logger: &Logger, + ttl_days: i32, + ) -> Result<(), Error> { + // Delete cache entries in batches since there could be thousands of cache entries per contract + let mut total_deleted = 0; + let batch_size = 5000; + + match self { + Storage::Shared => { + use public::eth_call_cache as cache; + use public::eth_call_meta as meta; + + let stale_contracts = meta::table + .select(meta::contract_address) + .filter( + meta::accessed_at + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + ) + .get_results::>(conn)?; + + if stale_contracts.is_empty() { + return Ok(()); + } + + loop { + let next_batch = cache::table + .select(cache::id) + .filter(cache::contract_address.eq_any(&stale_contracts)) + .limit(batch_size as i64) + .get_results::>(conn)?; + let deleted_count = + diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) + .execute(conn)?; + + total_deleted += deleted_count; + + if deleted_count < batch_size { + break; + } + } + + graph::slog::info!( + logger, + "Cleaned call cache: deleted {} entries for {} contracts", + total_deleted, + stale_contracts.len() + ); + + diesel::delete( + meta::table.filter(meta::contract_address.eq_any(&stale_contracts)), + ) + .execute(conn)?; + + Ok(()) + } + Storage::Private(Schema { + call_cache, + call_meta, + .. + }) => { + let select_query = format!( + "SELECT contract_address FROM {} \ + WHERE accessed_at < CURRENT_DATE - interval '{} days'", + call_meta.qname, ttl_days + ); + + #[derive(QueryableByName)] + struct ContractAddress { + #[diesel(sql_type = Bytea)] + contract_address: Vec, + } + + let all_stale_contracts: Vec> = sql_query(select_query) + .load::(conn)? + .into_iter() + .map(|row| row.contract_address) + .collect(); + + if all_stale_contracts.is_empty() { + graph::slog::info!(logger, "Cleaned call cache: no stale entries found"); + return Ok(()); + } + + loop { + let delete_cache_query = format!( + "DELETE FROM {} WHERE id IN ( + SELECT id FROM {} + WHERE contract_address = ANY($1) + LIMIT {} + )", + call_cache.qname, call_cache.qname, batch_size + ); + + let deleted_count = sql_query(delete_cache_query) + .bind::, _>(&all_stale_contracts) + .execute(conn)?; + + total_deleted += deleted_count; + + if deleted_count < batch_size { + break; + } + } + + let delete_meta_query = format!( + "DELETE FROM {} WHERE contract_address = ANY($1)", + call_meta.qname + ); + sql_query(delete_meta_query) + .bind::, _>(&all_stale_contracts) + .execute(conn)?; + + Ok(()) + } + } + } + pub(super) fn update_accessed_at( &self, conn: &mut PgConnection, @@ -2508,6 +2629,12 @@ impl ChainStoreTrait for ChainStore { Ok(()) } + async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error> { + let conn = &mut *self.get_conn()?; + self.storage + .clear_stale_call_cache(conn, &self.logger, ttl_days) + } + async fn transaction_receipts_in_block( &self, block_hash: &H256, From 91735b1cabf4d214016ec15911b0c9c2306b1675 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Wed, 1 Oct 2025 16:13:12 +0300 Subject: [PATCH 2/8] docs: Update graphman usage documentation Signed-off-by: Maksim Dimitrov --- docs/graphman.md | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/docs/graphman.md b/docs/graphman.md index 31353fbabc3..1ca506eaeb5 100644 --- a/docs/graphman.md +++ b/docs/graphman.md @@ -371,21 +371,27 @@ Inspect all blocks after block `13000000`: Remove the call cache of the specified chain. -If block numbers are not mentioned in `--from` and `--to`, then all the call cache will be removed. +Either remove entries in the range `--from` and `--to`, remove stale contracts which have not been accessed for a specified duration `--ttl_days`, or remove the entire cache with `--remove-entire-cache`. Removing the entire cache can reduce indexing performance significantly and should generally be avoided. -USAGE: - graphman chain call-cache remove [OPTIONS] + Usage: graphman chain call-cache remove [OPTIONS] + + Options: + --remove-entire-cache + Remove the entire cache + + --ttl-days + Remove stale contracts based on call_meta table -OPTIONS: -f, --from Starting block number - -h, --help - Print help information - -t, --to Ending block number + -h, --help + Print help (see a summary with '-h') + + ### DESCRIPTION Remove the call cache of a specified chain. @@ -404,6 +410,12 @@ the first block number will be used as the starting block number. The `to` option is used to specify the ending block number of the block range. In the absence of `to` option, the last block number will be used as the ending block number. +#### `--remove-entire-cache` +The `--remove-entire-cache` option is used to remove the entire call cache of the specified chain. + +#### `--ttl-days ` +The `--ttl-days` option is used to remove stale contracts based on the `call_meta.accessed_at` field. For example, if `--ttl-days` is set to 7, all calls to a contract that has not been accessed in the last 7 days will be removed from the call cache. + ### EXAMPLES Remove the call cache for all blocks numbered from 10 to 20: @@ -412,5 +424,9 @@ Remove the call cache for all blocks numbered from 10 to 20: Remove all the call cache of the specified chain: - graphman --config config.toml chain call-cache ethereum remove + graphman --config config.toml chain call-cache ethereum remove --remove-entire-cache + +Remove stale contracts from the call cache that have not been accessed in the last 7 days: + + graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 From d340ca8fb3cc4b2b3e3ee75f34919a4fe670867d Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Thu, 2 Oct 2025 10:33:52 +0300 Subject: [PATCH 3/8] store: Add clear_stale_call_cache test Signed-off-by: Maksim Dimitrov --- store/test-store/tests/postgres/chain_head.rs | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index cf501f1438f..5a63a2f6236 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -490,6 +490,39 @@ fn eth_call_cache() { }) } +#[test] +/// Tests mainly query correctness. Requires data in order not to hit early returns when no stale contracts are found. +fn test_clear_stale_call_cache() { + let chain = vec![]; + run_test_async(chain, |store, _, _| async move { + let logger = LOGGER.cheap_clone(); + let address = H160([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]); + let call: [u8; 6] = [1, 2, 3, 4, 5, 6]; + let return_value: [u8; 3] = [7, 8, 9]; + + // Insert a call cache entry, otherwise it will hit an early return and won't test all queries + let call = call::Request::new(address, call.to_vec(), 0); + store + .set_call( + &logger, + call.cheap_clone(), + BLOCK_ONE.block_ptr(), + call::Retval::Value(Bytes::from(return_value)), + ) + .unwrap(); + + // Confirm the call cache entry is there + let ret = store.get_call(&call, BLOCK_ONE.block_ptr()).unwrap(); + assert!(ret.is_some()); + + // Note: The storage field is not accessible from here, so we cannot fetch the Schema for the private chain + // and manually populate the cache and meta tables or alter the accessed_at timestamp. + // We can only test that the function runs to completion without error. + let result = store.clear_stale_call_cache(7).await; + assert!(result.is_ok()); + }); +} + #[test] /// Tests only query correctness. No data is involved. fn test_transaction_receipts_in_block_function() { From d950b9edff32a11f03083ed0b6dd65bfaddd9595 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 7 Oct 2025 22:50:59 +0300 Subject: [PATCH 4/8] node: Add max contracts option and value validations Signed-off-by: Maksim Dimitrov --- node/src/bin/manager.rs | 7 ++++++- node/src/manager/commands/chain.rs | 5 ++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 97114d08fc7..9e67a532a8c 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -565,8 +565,11 @@ pub enum CallCacheCommand { #[clap(long, conflicts_with_all = &["from", "to"])] remove_entire_cache: bool, /// Remove the cache for contracts that have not been accessed in the last days - #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"])] + #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(i32).range(1..))] ttl_days: Option, + /// Limits the number of contracts to consider for cache removal when using --ttl_days + #[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(i64).range(1..))] + ttl_max_contracts: Option, /// Starting block number #[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")] from: Option, @@ -1477,12 +1480,14 @@ async fn main() -> anyhow::Result<()> { to, remove_entire_cache, ttl_days, + ttl_max_contracts, } => { let chain_store = ctx.chain_store(&chain_name)?; if let Some(ttl_days) = ttl_days { return commands::chain::clear_stale_call_cache( chain_store, ttl_days, + ttl_max_contracts, ) .await; } diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index efda09da94c..11622dca2da 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -84,12 +84,15 @@ pub async fn clear_call_cache( pub async fn clear_stale_call_cache( chain_store: Arc, ttl_days: i32, + ttl_max_contracts: Option, ) -> Result<(), Error> { println!( "Removing stale entries from the call cache for `{}`", chain_store.chain ); - chain_store.clear_stale_call_cache(ttl_days).await?; + chain_store + .clear_stale_call_cache(ttl_days, ttl_max_contracts) + .await?; Ok(()) } From a8b94ca90adb114fa8d0495e3b04433f1297448b Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 7 Oct 2025 22:52:49 +0300 Subject: [PATCH 5/8] graph, store: Batch stale contracts and handle max limit Signed-off-by: Maksim Dimitrov --- graph/src/blockchain/mock.rs | 6 +- graph/src/components/store/traits.rs | 6 +- store/postgres/src/chain_store.rs | 218 ++++++++++++++++++--------- 3 files changed, 154 insertions(+), 76 deletions(-) diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 8938a2162f3..b2d9bf71df2 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -571,7 +571,11 @@ impl ChainStore for MockChainStore { async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> { unimplemented!() } - async fn clear_stale_call_cache(&self, _ttl_days: i32) -> Result<(), Error> { + async fn clear_stale_call_cache( + &self, + _ttl_days: i32, + _ttl_max_contracts: Option, + ) -> Result<(), Error> { unimplemented!() } fn chain_identifier(&self) -> Result { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 2a29ad611cf..2d115aeff07 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -600,7 +600,11 @@ pub trait ChainStore: ChainHeadStore { async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>; /// Clears stale call cache entries for the given TTL in days. - async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error>; + async fn clear_stale_call_cache( + &self, + ttl_days: i32, + ttl_max_contracts: Option, + ) -> Result<(), Error>; /// Return the chain identifier for this store. fn chain_identifier(&self) -> Result; diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index a38ab4ebedf..45d5a7f27a9 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -105,8 +105,10 @@ mod data { use graph::prelude::transaction_receipt::LightTransactionReceipt; use graph::prelude::web3::types::H256; use graph::prelude::{ - serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger, StoreError, + info, serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger, + StoreError, }; + use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; @@ -1404,56 +1406,87 @@ mod data { conn: &mut PgConnection, logger: &Logger, ttl_days: i32, + ttl_max_contracts: Option, ) -> Result<(), Error> { - // Delete cache entries in batches since there could be thousands of cache entries per contract - let mut total_deleted = 0; - let batch_size = 5000; + let mut total_calls: usize = 0; + let mut total_contracts: i64 = 0; + // We process contracts in batches to avoid loading too many entries into memory + // at once. Each contract can have many calls, so we also delete calls in batches. + // Note: The batch sizes were chosen based on experimentation. Potentially, they + // could be made configurable via ENV vars. + let contracts_batch_size: i64 = 2000; + let cache_batch_size: usize = 10000; + + // Limits the number of contracts to process if ttl_max_contracts is set. + // Used also to adjust the final batch size, so we don't process more + // contracts than the set limit. + let remaining_contracts = |processed: i64| -> Option { + ttl_max_contracts.map(|limit| limit.saturating_sub(processed)) + }; match self { Storage::Shared => { use public::eth_call_cache as cache; use public::eth_call_meta as meta; - let stale_contracts = meta::table - .select(meta::contract_address) - .filter( - meta::accessed_at - .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), - ) - .get_results::>(conn)?; - - if stale_contracts.is_empty() { - return Ok(()); - } - loop { - let next_batch = cache::table - .select(cache::id) - .filter(cache::contract_address.eq_any(&stale_contracts)) - .limit(batch_size as i64) - .get_results::>(conn)?; - let deleted_count = - diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) - .execute(conn)?; + if let Some(0) = remaining_contracts(total_contracts) { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", + total_calls, + total_contracts + ); + break; + } - total_deleted += deleted_count; + let batch_limit = remaining_contracts(total_contracts) + .map(|left| left.min(contracts_batch_size)) + .unwrap_or(contracts_batch_size); + + let stale_contracts = meta::table + .select(meta::contract_address) + .filter( + meta::accessed_at + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + ) + .limit(batch_limit) + .get_results::>(conn)?; - if deleted_count < batch_size { + if stale_contracts.is_empty() { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts", + total_calls, + total_contracts + ); break; } - } - graph::slog::info!( - logger, - "Cleaned call cache: deleted {} entries for {} contracts", - total_deleted, - stale_contracts.len() - ); + loop { + let next_batch = cache::table + .select(cache::id) + .filter(cache::contract_address.eq_any(&stale_contracts)) + .limit(cache_batch_size as i64) + .get_results::>(conn)?; + let deleted_count = + diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) + .execute(conn)?; - diesel::delete( - meta::table.filter(meta::contract_address.eq_any(&stale_contracts)), - ) - .execute(conn)?; + total_calls += deleted_count; + + if deleted_count < cache_batch_size { + break; + } + } + + let deleted_contracts = diesel::delete( + meta::table.filter(meta::contract_address.eq_any(&stale_contracts)), + ) + .execute(conn)?; + + total_contracts += deleted_contracts as i64; + } Ok(()) } @@ -1463,56 +1496,89 @@ mod data { .. }) => { let select_query = format!( - "SELECT contract_address FROM {} \ - WHERE accessed_at < CURRENT_DATE - interval '{} days'", + "WITH stale_contracts AS ( + SELECT contract_address + FROM {} + WHERE accessed_at < current_date - interval '{} days' + LIMIT $1 + ) + SELECT contract_address FROM stale_contracts", call_meta.qname, ttl_days ); + let delete_cache_query = format!( + "WITH targets AS ( + SELECT id + FROM {} + WHERE contract_address = ANY($1) + LIMIT {} + ) + DELETE FROM {} USING targets + WHERE {}.id = targets.id", + call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname + ); + + let delete_meta_query = format!( + "DELETE FROM {} WHERE contract_address = ANY($1)", + call_meta.qname + ); + #[derive(QueryableByName)] struct ContractAddress { #[diesel(sql_type = Bytea)] contract_address: Vec, } - let all_stale_contracts: Vec> = sql_query(select_query) - .load::(conn)? - .into_iter() - .map(|row| row.contract_address) - .collect(); - - if all_stale_contracts.is_empty() { - graph::slog::info!(logger, "Cleaned call cache: no stale entries found"); - return Ok(()); - } - loop { - let delete_cache_query = format!( - "DELETE FROM {} WHERE id IN ( - SELECT id FROM {} - WHERE contract_address = ANY($1) - LIMIT {} - )", - call_cache.qname, call_cache.qname, batch_size - ); + if let Some(0) = remaining_contracts(total_contracts) { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", + total_calls, + total_contracts + ); + break; + } - let deleted_count = sql_query(delete_cache_query) - .bind::, _>(&all_stale_contracts) - .execute(conn)?; + let batch_limit = remaining_contracts(total_contracts) + .map(|left| left.min(contracts_batch_size)) + .unwrap_or(contracts_batch_size); + + let stale_contracts: Vec> = sql_query(&select_query) + .bind::(batch_limit) + .load::(conn)? + .into_iter() + .map(|r| r.contract_address) + .collect(); + + if stale_contracts.is_empty() { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts", + total_calls, + total_contracts + ); + break; + } - total_deleted += deleted_count; + loop { + let deleted_count = sql_query(&delete_cache_query) + .bind::, _>(&stale_contracts) + .execute(conn)?; - if deleted_count < batch_size { - break; + total_calls += deleted_count; + + if deleted_count < cache_batch_size { + break; + } } - } - let delete_meta_query = format!( - "DELETE FROM {} WHERE contract_address = ANY($1)", - call_meta.qname - ); - sql_query(delete_meta_query) - .bind::, _>(&all_stale_contracts) - .execute(conn)?; + let deleted_contracts = sql_query(&delete_meta_query) + .bind::, _>(&stale_contracts) + .execute(conn)?; + + total_contracts += deleted_contracts as i64; + } Ok(()) } @@ -2629,10 +2695,14 @@ impl ChainStoreTrait for ChainStore { Ok(()) } - async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error> { + async fn clear_stale_call_cache( + &self, + ttl_days: i32, + ttl_max_contracts: Option, + ) -> Result<(), Error> { let conn = &mut *self.get_conn()?; self.storage - .clear_stale_call_cache(conn, &self.logger, ttl_days) + .clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts) } async fn transaction_receipts_in_block( From 216b5e6bbab76f46bee714c2867d2dfbdc6eaf62 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 7 Oct 2025 22:53:26 +0300 Subject: [PATCH 6/8] test: Update tests case Signed-off-by: Maksim Dimitrov --- store/test-store/tests/postgres/chain_head.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index 5a63a2f6236..54459f7f484 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -518,7 +518,7 @@ fn test_clear_stale_call_cache() { // Note: The storage field is not accessible from here, so we cannot fetch the Schema for the private chain // and manually populate the cache and meta tables or alter the accessed_at timestamp. // We can only test that the function runs to completion without error. - let result = store.clear_stale_call_cache(7).await; + let result = store.clear_stale_call_cache(7, None).await; assert!(result.is_ok()); }); } From 8e14998d2048370a498929b0a763f663bed51369 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Tue, 7 Oct 2025 23:00:21 +0300 Subject: [PATCH 7/8] docs: Update graphman docs Signed-off-by: Maksim Dimitrov --- docs/graphman.md | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/docs/graphman.md b/docs/graphman.md index 1ca506eaeb5..8c857703dda 100644 --- a/docs/graphman.md +++ b/docs/graphman.md @@ -382,13 +382,16 @@ Either remove entries in the range `--from` and `--to`, remove stale contracts w --ttl-days Remove stale contracts based on call_meta table - -f, --from + --ttl-max-contracts + Limit the number of contracts to consider for stale contract removal + + -f, --from Starting block number - -t, --to + -t, --to Ending block number - -h, --help + -h, --help Print help (see a summary with '-h') @@ -416,6 +419,9 @@ The `--remove-entire-cache` option is used to remove the entire call cache of th #### `--ttl-days ` The `--ttl-days` option is used to remove stale contracts based on the `call_meta.accessed_at` field. For example, if `--ttl-days` is set to 7, all calls to a contract that has not been accessed in the last 7 days will be removed from the call cache. +#### `--ttl-max-contracts ` +The `--ttl-max-contracts` option is used to limit the maximum number of contracts to be removed when using the `--ttl-days` option. For example, if `--ttl-max-contracts` is set to 100, at most 100 contracts will be removed from the call cache even if more contracts meet the TTL criteria. + ### EXAMPLES Remove the call cache for all blocks numbered from 10 to 20: @@ -430,3 +436,6 @@ Remove stale contracts from the call cache that have not been accessed in the la graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 +Remove stale contracts from the call cache that have not been accessed in the last 7 days, limiting the removal to a maximum of 100 contracts: + graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 --ttl-max-contracts 100 + From 1fa315d6ac633742c2aa44aa2c34a963a3765d51 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Wed, 8 Oct 2025 13:06:14 +0300 Subject: [PATCH 8/8] store: Update the cache test Signed-off-by: Maksim Dimitrov --- store/test-store/tests/postgres/chain_head.rs | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index 54459f7f484..73baf71b008 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -1,6 +1,7 @@ //! Test ChainStore implementation of Store, in particular, how //! the chain head pointer gets updated in various situations +use diesel::RunQueryDsl; use graph::blockchain::{BlockHash, BlockPtr}; use graph::data::store::ethereum::call; use graph::data::store::scalar::Bytes; @@ -494,15 +495,24 @@ fn eth_call_cache() { /// Tests mainly query correctness. Requires data in order not to hit early returns when no stale contracts are found. fn test_clear_stale_call_cache() { let chain = vec![]; - run_test_async(chain, |store, _, _| async move { + + #[derive(diesel::QueryableByName)] + struct Namespace { + #[diesel(sql_type = diesel::sql_types::Text)] + namespace: String, + } + + run_test_async(chain, |chain_store, _, _| async move { let logger = LOGGER.cheap_clone(); - let address = H160([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]); + let address = H160([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3]); let call: [u8; 6] = [1, 2, 3, 4, 5, 6]; let return_value: [u8; 3] = [7, 8, 9]; + let mut conn = PRIMARY_POOL.get().unwrap(); + // Insert a call cache entry, otherwise it will hit an early return and won't test all queries let call = call::Request::new(address, call.to_vec(), 0); - store + chain_store .set_call( &logger, call.cheap_clone(), @@ -512,14 +522,38 @@ fn test_clear_stale_call_cache() { .unwrap(); // Confirm the call cache entry is there - let ret = store.get_call(&call, BLOCK_ONE.block_ptr()).unwrap(); + let ret = chain_store.get_call(&call, BLOCK_ONE.block_ptr()).unwrap(); assert!(ret.is_some()); - // Note: The storage field is not accessible from here, so we cannot fetch the Schema for the private chain - // and manually populate the cache and meta tables or alter the accessed_at timestamp. - // We can only test that the function runs to completion without error. - let result = store.clear_stale_call_cache(7, None).await; + // Now we need to update the accessed_at timestamp to be stale, so it gets deleted + // Get namespace from chains table + let namespace: String = diesel::sql_query(format!( + "SELECT namespace FROM public.chains WHERE name = '{}'", + chain_store.chain + )) + .get_result::(&mut conn) + .unwrap() + .namespace; + + // Determine the correct meta table name + let meta_table: String = match namespace.as_str() { + "public" => "eth_call_meta".to_owned(), + _ => format!("{namespace}.call_meta"), + }; + + // Update accessed_at to be 8 days ago, so it's stale for a 7 day threshold + let _ = diesel::sql_query(format!( + "UPDATE {meta_table} SET accessed_at = NOW() - INTERVAL '8 days' WHERE contract_address = $1" + )).bind::(address.as_bytes()) + .execute(&mut conn) + .unwrap(); + + let result = chain_store.clear_stale_call_cache(7, None).await; assert!(result.is_ok()); + + // Confirm the call cache entry was removed + let ret = chain_store.get_call(&call, BLOCK_ONE.block_ptr()).unwrap(); + assert!(ret.is_none()); }); }