@@ -268,10 +268,10 @@ struct PriceInfo {
268268
269269async fn handle_nats_messages ( jetstream : jetstream:: Context , clients : Clients ) -> Result < ( ) > {
270270 let stream_name = "PYTH_PRICE_UPDATES" ;
271- let consumer_name = "websocket_server" ;
272271
273272 let consumer_config = consumer:: pull:: Config {
274- durable_name : Some ( consumer_name. to_string ( ) ) ,
273+ deliver_policy : consumer:: DeliverPolicy :: All ,
274+ ack_policy : consumer:: AckPolicy :: None ,
275275 ..Default :: default ( )
276276 } ;
277277
@@ -280,7 +280,7 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -
280280 . await
281281 . context ( "Failed to create NATS consumer" ) ?;
282282
283- info ! ( stream = %stream_name, consumer = %consumer_name , "Started handling NATS messages" ) ;
283+ info ! ( stream = %stream_name, "Started handling NATS messages" ) ;
284284
285285 loop {
286286 let mut messages = consumer
@@ -336,13 +336,6 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -
336336 ) ;
337337 }
338338 }
339-
340- // Spawn a new task for acknowledgment
341- tokio:: spawn ( async move {
342- if let Err ( e) = msg. ack ( ) . await {
343- warn ! ( error = %e, "Failed to acknowledge NATS message" ) ;
344- }
345- } ) ;
346339 }
347340 Err ( e) => {
348341 error ! ( error = %e, "Error receiving message from NATS" ) ;
0 commit comments