Skip to content

Commit 5e9c3d7

Browse files
committed
Pass Database into consumers; StreamReader conn
Create a single Database instance for each consumer group and pass it into consumer run functions and ChainConsumerRunner instead of creating new DB clients inside each task. Update ChainConsumerRunner::new to accept a Database and use runner.database in consumers (store, indexer, rewards). Add optional StreamConnection support to StreamReader (from_connection, create_channel, try_connect_shared) so readers can be created from an existing connection and reconnect uses the shared connection when available. Adjust imports and reader creation sites accordingly.
1 parent 99d3d7c commit 5e9c3d7

5 files changed

Lines changed: 112 additions & 42 deletions

File tree

apps/daemon/src/consumers/indexer/mod.rs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use cacher::CacherClient;
1212
use primitives::{Chain, NFTChain};
1313
use settings::Settings;
1414
use storage::Database;
15-
use streamer::{ChainAddressPayload, ConsumerStatusReporter, FetchAssetsPayload, FetchBlocksPayload, QueueName, ShutdownReceiver, run_consumer};
15+
use streamer::{ChainAddressPayload, ConsumerStatusReporter, FetchAssetsPayload, FetchBlocksPayload, QueueName, ShutdownReceiver, StreamConnection, StreamReader, run_consumer};
1616

1717
use crate::consumers::runner::ChainConsumerRunner;
18-
use crate::consumers::{chain_providers, consumer_config, reader_for_queue};
18+
use crate::consumers::{chain_providers, consumer_config, reader_config};
1919

2020
use fetch_address_transactions_consumer::FetchAddressTransactionsConsumer;
2121
use fetch_assets_consumer::FetchAssetsConsumer;
@@ -25,23 +25,34 @@ use fetch_nft_assets_addresses_consumer::FetchNftAssetsAddressesConsumer;
2525
use fetch_token_addresses_consumer::FetchTokenAddressesConsumer;
2626

2727
pub async fn run_consumer_indexer(settings: Settings, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
28+
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
2829
let settings = Arc::new(settings);
2930

3031
futures::future::try_join_all(vec![
31-
tokio::spawn(run_fetch_blocks(settings.clone(), shutdown_rx.clone(), reporter.clone())),
32-
tokio::spawn(run_fetch_assets(settings.clone(), shutdown_rx.clone(), reporter.clone())),
33-
tokio::spawn(run_fetch_token_associations(settings.clone(), shutdown_rx.clone(), reporter.clone())),
34-
tokio::spawn(run_fetch_coin_associations(settings.clone(), shutdown_rx.clone(), reporter.clone())),
35-
tokio::spawn(run_fetch_nft_associations(settings.clone(), shutdown_rx.clone(), reporter.clone())),
36-
tokio::spawn(run_fetch_transaction_associations(settings.clone(), shutdown_rx.clone(), reporter.clone())),
32+
tokio::spawn(run_fetch_blocks(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
33+
tokio::spawn(run_fetch_assets(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
34+
tokio::spawn(run_fetch_token_associations(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
35+
tokio::spawn(run_fetch_coin_associations(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
36+
tokio::spawn(run_fetch_nft_associations(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
37+
tokio::spawn(run_fetch_transaction_associations(
38+
settings.clone(),
39+
database.clone(),
40+
shutdown_rx.clone(),
41+
reporter.clone(),
42+
)),
3743
])
3844
.await?;
3945

4046
Ok(())
4147
}
4248

43-
async fn run_fetch_blocks(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
44-
ChainConsumerRunner::new((*settings).clone(), QueueName::FetchBlocks, shutdown_rx, reporter)
49+
async fn run_fetch_blocks(
50+
settings: Arc<Settings>,
51+
database: Database,
52+
shutdown_rx: ShutdownReceiver,
53+
reporter: Arc<dyn ConsumerStatusReporter>,
54+
) -> Result<(), Box<dyn Error + Send + Sync>> {
55+
ChainConsumerRunner::new((*settings).clone(), database, QueueName::FetchBlocks, shutdown_rx, reporter)
4556
.await?
4657
.run(|runner, chain| async move {
4758
let queue = QueueName::FetchBlocks;
@@ -64,10 +75,17 @@ async fn run_fetch_blocks(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver
6475
.await
6576
}
6677

67-
async fn run_fetch_assets(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
68-
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
78+
async fn run_fetch_assets(
79+
settings: Arc<Settings>,
80+
database: Database,
81+
shutdown_rx: ShutdownReceiver,
82+
reporter: Arc<dyn ConsumerStatusReporter>,
83+
) -> Result<(), Box<dyn Error + Send + Sync>> {
6984
let queue = QueueName::FetchAssets;
70-
let (name, stream_reader) = reader_for_queue(&settings, &queue).await?;
85+
let name = queue.to_string();
86+
let connection = StreamConnection::new(&settings.rabbitmq.url, name.clone()).await?;
87+
let config = reader_config(&settings.rabbitmq, name.clone());
88+
let stream_reader = StreamReader::from_connection(&connection, config).await?;
7189
let cacher = CacherClient::new(&settings.redis.url).await;
7290
let consumer = FetchAssetsConsumer {
7391
providers: chain_providers(&settings, &name),
@@ -79,10 +97,11 @@ async fn run_fetch_assets(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver
7997

8098
async fn run_fetch_token_associations(
8199
settings: Arc<Settings>,
100+
database: Database,
82101
shutdown_rx: ShutdownReceiver,
83102
reporter: Arc<dyn ConsumerStatusReporter>,
84103
) -> Result<(), Box<dyn Error + Send + Sync>> {
85-
ChainConsumerRunner::new((*settings).clone(), QueueName::FetchTokenAssociations, shutdown_rx, reporter)
104+
ChainConsumerRunner::new((*settings).clone(), database, QueueName::FetchTokenAssociations, shutdown_rx, reporter)
86105
.await?
87106
.run(|runner, chain| async move {
88107
let queue = QueueName::FetchTokenAssociations;
@@ -107,10 +126,11 @@ async fn run_fetch_token_associations(
107126

108127
async fn run_fetch_coin_associations(
109128
settings: Arc<Settings>,
129+
database: Database,
110130
shutdown_rx: ShutdownReceiver,
111131
reporter: Arc<dyn ConsumerStatusReporter>,
112132
) -> Result<(), Box<dyn Error + Send + Sync>> {
113-
ChainConsumerRunner::new((*settings).clone(), QueueName::FetchCoinAssociations, shutdown_rx, reporter)
133+
ChainConsumerRunner::new((*settings).clone(), database, QueueName::FetchCoinAssociations, shutdown_rx, reporter)
114134
.await?
115135
.run(|runner, chain| async move {
116136
let queue = QueueName::FetchCoinAssociations;
@@ -132,9 +152,14 @@ async fn run_fetch_coin_associations(
132152
.await
133153
}
134154

135-
async fn run_fetch_nft_associations(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
155+
async fn run_fetch_nft_associations(
156+
settings: Arc<Settings>,
157+
database: Database,
158+
shutdown_rx: ShutdownReceiver,
159+
reporter: Arc<dyn ConsumerStatusReporter>,
160+
) -> Result<(), Box<dyn Error + Send + Sync>> {
136161
let chains: Vec<Chain> = NFTChain::all().into_iter().map(Into::into).collect();
137-
ChainConsumerRunner::new((*settings).clone(), QueueName::FetchNftAssociations, shutdown_rx, reporter)
162+
ChainConsumerRunner::new((*settings).clone(), database, QueueName::FetchNftAssociations, shutdown_rx, reporter)
138163
.await?
139164
.run_for_chains(chains, |runner, chain| async move {
140165
FetchNftAssetsAddressesConsumer::run(
@@ -154,10 +179,11 @@ async fn run_fetch_nft_associations(settings: Arc<Settings>, shutdown_rx: Shutdo
154179

155180
async fn run_fetch_transaction_associations(
156181
settings: Arc<Settings>,
182+
database: Database,
157183
shutdown_rx: ShutdownReceiver,
158184
reporter: Arc<dyn ConsumerStatusReporter>,
159185
) -> Result<(), Box<dyn Error + Send + Sync>> {
160-
ChainConsumerRunner::new((*settings).clone(), QueueName::FetchAddressTransactions, shutdown_rx, reporter)
186+
ChainConsumerRunner::new((*settings).clone(), database, QueueName::FetchAddressTransactions, shutdown_rx, reporter)
161187
.await?
162188
.run(|runner, chain| async move {
163189
let queue = QueueName::FetchAddressTransactions;

apps/daemon/src/consumers/rewards/mod.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,24 @@ use streamer::{ConsumerStatusReporter, QueueName, RewardsNotificationPayload, Re
2020
use crate::consumers::{consumer_config, producer_for_queue, reader_for_queue};
2121

2222
pub async fn run_consumer_rewards(settings: Settings, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
23+
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
2324
let settings = Arc::new(settings);
2425

2526
futures::future::try_join_all(vec![
26-
tokio::spawn(run_rewards_events(settings.clone(), shutdown_rx.clone(), reporter.clone())),
27-
tokio::spawn(run_rewards_redemptions(settings.clone(), shutdown_rx.clone(), reporter.clone())),
27+
tokio::spawn(run_rewards_events(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
28+
tokio::spawn(run_rewards_redemptions(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
2829
])
2930
.await?;
3031

3132
Ok(())
3233
}
3334

34-
async fn run_rewards_events(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
35-
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
35+
async fn run_rewards_events(
36+
settings: Arc<Settings>,
37+
database: Database,
38+
shutdown_rx: ShutdownReceiver,
39+
reporter: Arc<dyn ConsumerStatusReporter>,
40+
) -> Result<(), Box<dyn Error + Send + Sync>> {
3641
let queue = QueueName::RewardsEvents;
3742
let (name, stream_reader) = reader_for_queue(&settings, &queue).await?;
3843
let stream_producer = producer_for_queue(&settings, &name).await?;
@@ -41,8 +46,12 @@ async fn run_rewards_events(settings: Arc<Settings>, shutdown_rx: ShutdownReceiv
4146
run_consumer::<RewardsNotificationPayload, rewards_consumer::RewardsConsumer, usize>(&name, stream_reader, queue, None, consumer, consumer_config, shutdown_rx, reporter).await
4247
}
4348

44-
async fn run_rewards_redemptions(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
45-
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
49+
async fn run_rewards_redemptions(
50+
settings: Arc<Settings>,
51+
database: Database,
52+
shutdown_rx: ShutdownReceiver,
53+
reporter: Arc<dyn ConsumerStatusReporter>,
54+
) -> Result<(), Box<dyn Error + Send + Sync>> {
4655
let config = ConfigCacher::new(database.clone());
4756
let retry_config = rewards_redemption_consumer::RedemptionRetryConfig {
4857
max_retries: config.get_i64(ConfigKey::RedemptionRetryMaxRetries)? as u32,

apps/daemon/src/consumers/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ pub struct ChainConsumerRunner {
2525
impl ChainConsumerRunner {
2626
pub async fn new(
2727
settings: Settings,
28+
database: Database,
2829
queue: streamer::QueueName,
2930
shutdown_rx: ShutdownReceiver,
3031
reporter: Arc<dyn ConsumerStatusReporter>,
3132
) -> Result<Self, Box<dyn Error + Send + Sync>> {
32-
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
3333
let connection = StreamConnection::new(&settings.rabbitmq.url, queue.to_string()).await?;
3434
let cacher = CacherClient::new(&settings.redis.url).await;
3535
let config = consumer_config(&settings.consumer);

apps/daemon/src/consumers/store/mod.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,33 +26,38 @@ use store_charts_consumer::StoreChartsConsumer;
2626
use store_prices_consumer::StorePricesConsumer;
2727

2828
pub async fn run_consumer_store(settings: Settings, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
29+
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
2930
let settings = Arc::new(settings);
3031

3132
futures::future::try_join_all(vec![
32-
tokio::spawn(run_store_transactions(settings.clone(), shutdown_rx.clone(), reporter.clone())),
33-
tokio::spawn(run_store_prices(settings.clone(), shutdown_rx.clone(), reporter.clone())),
34-
tokio::spawn(run_store_charts(settings.clone(), shutdown_rx.clone(), reporter.clone())),
33+
tokio::spawn(run_store_transactions(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
34+
tokio::spawn(run_store_prices(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
35+
tokio::spawn(run_store_charts(settings.clone(), database.clone(), shutdown_rx.clone(), reporter.clone())),
3536
tokio::spawn(run_device_stream(settings.clone(), shutdown_rx.clone(), reporter.clone())),
3637
])
3738
.await?;
3839

3940
Ok(())
4041
}
4142

42-
async fn run_store_transactions(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
43-
ChainConsumerRunner::new((*settings).clone(), QueueName::StoreTransactions, shutdown_rx, reporter)
43+
async fn run_store_transactions(
44+
settings: Arc<Settings>,
45+
database: Database,
46+
shutdown_rx: ShutdownReceiver,
47+
reporter: Arc<dyn ConsumerStatusReporter>,
48+
) -> Result<(), Box<dyn Error + Send + Sync>> {
49+
ChainConsumerRunner::new((*settings).clone(), database, QueueName::StoreTransactions, shutdown_rx, reporter)
4450
.await?
4551
.run(|runner, chain| async move {
4652
let queue = QueueName::StoreTransactions;
4753
let name = format!("{}.{}", queue, chain.as_ref());
4854
let stream_reader = runner.stream_reader().await?;
4955
let stream_producer = runner.stream_producer().await?;
50-
let database = Database::new(&runner.settings.postgres.url, runner.settings.postgres.pool);
5156
let consumer = StoreTransactionsConsumer {
52-
database: database.clone(),
53-
config_cacher: ConfigCacher::new(database.clone()),
57+
database: runner.database.clone(),
58+
config_cacher: ConfigCacher::new(runner.database.clone()),
5459
stream_producer,
55-
pusher: Pusher::new(database.clone()),
60+
pusher: Pusher::new(runner.database.clone()),
5661
config: StoreTransactionsConsumerConfig {},
5762
};
5863
run_consumer::<TransactionsPayload, StoreTransactionsConsumer, usize>(
@@ -70,8 +75,12 @@ async fn run_store_transactions(settings: Arc<Settings>, shutdown_rx: ShutdownRe
7075
.await
7176
}
7277

73-
async fn run_store_prices(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
74-
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
78+
async fn run_store_prices(
79+
settings: Arc<Settings>,
80+
database: Database,
81+
shutdown_rx: ShutdownReceiver,
82+
reporter: Arc<dyn ConsumerStatusReporter>,
83+
) -> Result<(), Box<dyn Error + Send + Sync>> {
7584
let queue = QueueName::StorePrices;
7685
let (name, stream_reader) = reader_for_queue(&settings, &queue).await?;
7786
let cacher_client = CacherClient::new(&settings.redis.url).await;
@@ -82,8 +91,12 @@ async fn run_store_prices(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver
8291
run_consumer::<PricesPayload, StorePricesConsumer, usize>(&name, stream_reader, queue, None, consumer, consumer_config(&settings.consumer), shutdown_rx, reporter).await
8392
}
8493

85-
async fn run_store_charts(settings: Arc<Settings>, shutdown_rx: ShutdownReceiver, reporter: Arc<dyn ConsumerStatusReporter>) -> Result<(), Box<dyn Error + Send + Sync>> {
86-
let database = Database::new(&settings.postgres.url, settings.postgres.pool);
94+
async fn run_store_charts(
95+
settings: Arc<Settings>,
96+
database: Database,
97+
shutdown_rx: ShutdownReceiver,
98+
reporter: Arc<dyn ConsumerStatusReporter>,
99+
) -> Result<(), Box<dyn Error + Send + Sync>> {
87100
let queue = QueueName::StoreCharts;
88101
let (name, stream_reader) = reader_for_queue(&settings, &queue).await?;
89102
let cacher_client = CacherClient::new(&settings.redis.url).await;

crates/streamer/src/stream_reader.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ impl StreamReaderConfig {
2727
pub struct StreamReader {
2828
config: StreamReaderConfig,
2929
channel: Channel,
30+
connection: Option<StreamConnection>,
3031
}
3132

3233
impl StreamReader {
3334
pub async fn new(config: StreamReaderConfig) -> Result<Self, Box<dyn Error + Send + Sync>> {
3435
let channel = with_retry(&config.retry, &config.name, || Self::try_connect(&config)).await?;
35-
Ok(Self { config, channel })
36+
Ok(Self {
37+
config,
38+
channel,
39+
connection: None,
40+
})
3641
}
3742

3843
pub async fn from_connection(connection: &StreamConnection, config: StreamReaderConfig) -> Result<Self, Box<dyn Error + Send + Sync>> {
@@ -41,8 +46,12 @@ impl StreamReader {
4146
name: connection.name().to_string(),
4247
..config
4348
};
44-
let channel = with_retry(&config.retry, &config.name, || Self::try_connect(&config)).await?;
45-
Ok(Self { config, channel })
49+
let channel = Self::create_channel(connection, config.prefetch).await?;
50+
Ok(Self {
51+
config,
52+
channel,
53+
connection: Some(connection.clone()),
54+
})
4655
}
4756

4857
async fn try_connect(config: &StreamReaderConfig) -> Result<Channel, Box<dyn Error + Send + Sync>> {
@@ -58,6 +67,19 @@ impl StreamReader {
5867
Ok(())
5968
}
6069

70+
async fn create_channel(connection: &StreamConnection, prefetch: u16) -> Result<Channel, Box<dyn Error + Send + Sync>> {
71+
let channel = connection.create_channel().await?;
72+
channel.basic_qos(prefetch, BasicQosOptions { global: false }).await?;
73+
Ok(channel)
74+
}
75+
76+
async fn try_connect_shared(&self) -> Result<Channel, Box<dyn Error + Send + Sync>> {
77+
if let Some(ref conn) = self.connection {
78+
return Self::create_channel(conn, self.config.prefetch).await;
79+
}
80+
Self::try_connect(&self.config).await
81+
}
82+
6183
async fn reconnect(&mut self, shutdown_rx: &ShutdownReceiver) -> Result<bool, Box<dyn Error + Send + Sync>> {
6284
let mut delay = self.config.retry.delay;
6385
let mut attempt: u32 = 0;
@@ -66,7 +88,7 @@ impl StreamReader {
6688
return Ok(false);
6789
}
6890
attempt += 1;
69-
match Self::try_connect(&self.config).await {
91+
match self.try_connect_shared().await {
7092
Ok(channel) => {
7193
self.channel = channel;
7294
info_with_fields!("rabbitmq reconnected", connection = self.config.name.as_str(), attempt = attempt);

0 commit comments

Comments
 (0)