@@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer, Serialize};
99use solana_account_decoder:: UiAccountEncoding ;
1010use solana_client:: nonblocking:: pubsub_client:: PubsubClient ;
1111use solana_client:: rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig } ;
12+ use solana_metrics:: datapoint_info;
1213use solana_sdk:: account:: Account ;
1314use solana_sdk:: commitment_config:: CommitmentConfig ;
1415use solana_sdk:: pubkey:: Pubkey ;
@@ -30,6 +31,15 @@ struct PriceUpdate {
3031 price_feed : PriceFeed ,
3132}
3233
34+ #[ derive( Debug , Serialize , Deserialize ) ]
35+ struct PublisherPriceUpdate {
36+ #[ serde( rename = "type" ) ]
37+ update_type : String ,
38+ publisher : String ,
39+ feed_id : String ,
40+ price_info : PriceInfo ,
41+ }
42+
3343#[ derive( Debug , Serialize , Deserialize ) ]
3444struct PriceFeed {
3545 id : String ,
@@ -130,6 +140,10 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
130140 continue ;
131141 }
132142 } ;
143+ // info!(
144+ // "Price Account: {:#?}, account: {:#?} \n\n",
145+ // price_account, account
146+ // );
133147
134148 // We want to send price updates whenever the aggregate changes but sometimes the accounts can change without the aggregate changing
135149 if price_account. agg . status == PriceStatus :: Trading
@@ -162,40 +176,88 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
162176 } ,
163177 } ;
164178
165- let message = serde_json:: to_string ( & price_update) ?;
179+ let price_update_message = serde_json:: to_string ( & price_update) ?;
166180
167181 // Create a unique message ID
168- let message_id = format ! (
182+ let price_update_message_id = format ! (
169183 "{}:{}" ,
170184 price_update. price_feed. id, price_update. price_feed. price. slot
171185 ) ;
172186
173187 // Create headers with the Nats-Msg-Id
174- let mut headers = HeaderMap :: new ( ) ;
175- headers . insert ( "Nats-Msg-Id" , message_id . as_str ( ) ) ;
188+ let mut price_update_headers = HeaderMap :: new ( ) ;
189+ price_update_headers . insert ( "Nats-Msg-Id" , price_update_message_id . as_str ( ) ) ;
176190
177191 let jetstream_clone = jetstream. clone ( ) ;
178192 task:: spawn ( async move {
179193 match jetstream_clone
180- . publish_with_headers ( "pyth.price.updates" , headers, message. into ( ) )
194+ . publish_with_headers (
195+ "pyth.price.updates" ,
196+ price_update_headers,
197+ price_update_message. into ( ) ,
198+ )
181199 . await
182200 {
183201 Ok ( _) => debug ! (
184202 "Published price update to JetStream with ID: {}" ,
185- message_id
203+ price_update_message_id
186204 ) ,
187205 Err ( e) => warn ! ( "Failed to publish price update to JetStream: {}" , e) ,
188206 }
189207 } ) ;
190208
209+ for component in price_account. comp {
210+ let jetstream_clone = jetstream. clone ( ) ;
211+ let publisher = component. publisher . to_string ( ) ;
212+ let publisher_price_update_message_id = format ! (
213+ "{}:{}:{}" ,
214+ price_update. price_feed. id, price_update. price_feed. price. slot, publisher
215+ ) ;
216+
217+ let mut publisher_price_updates = HeaderMap :: new ( ) ;
218+ publisher_price_updates
219+ . insert ( "Nats-Msg-Id" , publisher_price_update_message_id. as_str ( ) ) ;
220+
221+ let publisher_price_update = PublisherPriceUpdate {
222+ update_type : "publisher_price_update" . to_string ( ) ,
223+ feed_id : price_account. prod . to_string ( ) ,
224+ publisher : publisher,
225+ price_info : PriceInfo {
226+ price : price_account. agg . price . to_string ( ) ,
227+ conf : price_account. agg . conf . to_string ( ) ,
228+ expo : price_account. expo ,
229+ publish_time : price_account. timestamp ,
230+ slot : update. context . slot , // Add this field
231+ } ,
232+ } ;
233+ info ! ( "Publisher price update: {:?}" , publisher_price_update) ;
234+ let publisher_price_update_message =
235+ serde_json:: to_string ( & publisher_price_update) ?;
236+
237+ task:: spawn ( async move {
238+ match jetstream_clone
239+ . publish_with_headers (
240+ "pyth.publisher.updates" ,
241+ publisher_price_updates,
242+ publisher_price_update_message. into ( ) ,
243+ )
244+ . await
245+ {
246+ Ok ( _) => debug ! (
247+ "Published publisher price update to JetStream with ID: {}" ,
248+ publisher_price_update_message_id
249+ ) ,
250+ Err ( e) => warn ! ( "Failed to publish price update to JetStream: {}" , e) ,
251+ }
252+ } ) ;
253+ }
191254 update_count += 1 ;
192255 unique_price_feeds. insert ( price_account. prod ) ;
193256 } else {
194257 debug ! ( "Skipping price update due to invalid status or slot difference" ) ;
195258 }
196259
197- // Report aggregate information every minute
198- // TODO: add this as metrics
260+ // Report aggregate information every minute and emit metrics
199261 if last_report_time. elapsed ( ) >= Duration :: from_secs ( 60 ) {
200262 info ! (
201263 "Processed {} updates from {} unique price feeds in the last minute" ,
0 commit comments