@@ -126,6 +126,7 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
126126 . await ?;
127127
128128 let mut update_count = 0 ;
129+ let mut duration_count = 0 ;
129130 let mut unique_price_feeds = HashSet :: new ( ) ;
130131 let mut last_report_time = Instant :: now ( ) ;
131132 let jetstream_clone = jetstream. clone ( ) ;
@@ -158,7 +159,6 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
158159 msg_id_counter += 1 ;
159160 let mut headers = HeaderMap :: new ( ) ;
160161 headers. insert ( "Nats-Msg-Id" , msg_id. as_str ( ) ) ;
161- info ! ( "body: {:#?},msg_id: {}" , body. len( ) , msg_id) ;
162162 if let Err ( e) = jetstream_clone
163163 . publish_with_headers ( "pyth.publisher.updates" , headers, body. into ( ) )
164164 . await
@@ -172,6 +172,8 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
172172
173173 while let Some ( update) = notif. next ( ) . await {
174174 debug ! ( "Received price update" ) ;
175+ let start_time = Instant :: now ( ) ;
176+
175177 let account: Account = match update. value . account . decode ( ) {
176178 Some ( account) => account,
177179 _none => {
@@ -248,11 +250,7 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
248250 Err ( e) => warn ! ( "Failed to publish price update to JetStream: {}" , e) ,
249251 }
250252 } ) ;
251- info ! (
252- "update: {}, comp: {}" ,
253- update. context. slot,
254- price_account. comp. len( )
255- ) ;
253+
256254 for component in price_account. comp {
257255 let publisher = component. publisher . to_string ( ) ;
258256 let publisher_price_update = PublisherPriceUpdate {
@@ -261,28 +259,35 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
261259 price : price_account. agg . price . to_string ( ) ,
262260 slot : update. context . slot , // Add this field
263261 } ;
264- if publisher_price_update. feed_id == "7jAVut34sgRj6erznsYvLYvjc9GJwXTpN88ThZSDJ65G"
265- && publisher == "6DNocjFJjocPLZnKBZyEJAC5o2QaiT5Mx8AkphfxDm5i"
266- {
267- info ! ( "publisher_price_update: {:#?}" , publisher_price_update) ;
268- }
262+
269263 let key = ( publisher_price_update. feed_id . clone ( ) , publisher) ;
270264 let mut buf = publisher_buffer. lock ( ) . await ;
271265 buf. insert ( key, publisher_price_update) ;
272266 }
267+
268+ let end_time = Instant :: now ( ) ;
269+ let duration = end_time. duration_since ( start_time) ;
270+ duration_count += duration. as_micros ( ) ;
273271 update_count += 1 ;
274272 unique_price_feeds. insert ( price_account. prod ) ;
275273 } else {
276274 debug ! ( "Skipping price update due to invalid status or slot difference" ) ;
277275 }
278276
279277 // Report aggregate information every minute and emit metrics
280- if last_report_time. elapsed ( ) >= Duration :: from_secs ( 60 ) {
278+ if last_report_time. elapsed ( ) >= Duration :: from_secs ( 1 ) {
281279 info ! (
282- "Processed {} updates from {} unique price feeds in the last minute " ,
280+ "Processed {} updates from {} unique price feeds in the last 1 secs " ,
283281 update_count,
284282 unique_price_feeds. len( )
285283 ) ;
284+ info ! (
285+ "last slot: {}, comp: {}" ,
286+ update. context. slot,
287+ price_account. comp. len( )
288+ ) ;
289+ info ! ( "Average duration: {:?}" , duration_count / update_count) ;
290+ duration_count = 0 ;
286291 update_count = 0 ;
287292 unique_price_feeds. clear ( ) ;
288293 last_report_time = Instant :: now ( ) ;
0 commit comments