@@ -6,7 +6,7 @@ use std::{
66 time:: { Duration , SystemTime } ,
77} ;
88
9- use n0_error:: { Result , StdResultExt , StackResultExt , format_err} ;
9+ use n0_error:: { Result , StdResultExt , format_err} ;
1010use pkarr:: { SignedPacket , Timestamp } ;
1111use redb:: {
1212 Database , MultimapTableDefinition , ReadableTable , TableDefinition , backends:: InMemoryBackend ,
@@ -128,90 +128,90 @@ impl Actor {
128128 tokio:: pin!( timeout) ;
129129 for _ in 0 ..self . options . max_batch_size {
130130 tokio:: select! {
131- _ = self . cancel. cancelled( ) => {
132- drop( tables) ;
133- transaction. commit( ) . e( ) ?;
134- return Ok ( ( ) ) ;
135- }
136- _ = & mut timeout => break ,
137- Some ( msg) = self . recv. recv( ) => {
138- match msg {
139- Message :: Get { key, res } => {
140- match get_packet( & tables. signed_packets, & key) {
141- Ok ( packet) => {
142- trace!( "get {key}: {}" , packet. is_some( ) ) ;
143- res. send( packet) . ok( ) ;
144- } ,
145- Err ( err) => {
146- warn!( "get {key} failed: {err:#}" ) ;
147- return Err ( err) . std_context( format!( "get packet for {key} failed" ) )
131+ _ = self . cancel. cancelled( ) => {
132+ drop( tables) ;
133+ transaction. commit( ) . e( ) ?;
134+ return Ok ( ( ) ) ;
135+ }
136+ _ = & mut timeout => break ,
137+ Some ( msg) = self . recv. recv( ) => {
138+ match msg {
139+ Message :: Get { key, res } => {
140+ match get_packet( & tables. signed_packets, & key) {
141+ Ok ( packet) => {
142+ trace!( "get {key}: {}" , packet. is_some( ) ) ;
143+ res. send( packet) . ok( ) ;
144+ } ,
145+ Err ( err) => {
146+ warn!( "get {key} failed: {err:#}" ) ;
147+ return Err ( err) . std_context( format!( "get packet for {key} failed" ) )
148+ }
148149 }
149150 }
150- }
151- Message :: Upsert { packet, res } => {
152- let key = PublicKeyBytes :: from_signed_packet( & packet) ;
153- trace!( "upsert {}" , key) ;
154- let replaced = match get_packet( & tables. signed_packets, & key) ? { Some ( existing) => {
155- if existing. more_recent_than( & packet) {
156- res. send( false ) . ok( ) ;
157- continue ;
151+ Message :: Upsert { packet, res } => {
152+ let key = PublicKeyBytes :: from_signed_packet( & packet) ;
153+ trace!( "upsert {}" , key) ;
154+ let replaced = match get_packet( & tables. signed_packets, & key) ? { Some ( existing) => {
155+ if existing. more_recent_than( & packet) {
156+ res. send( false ) . ok( ) ;
157+ continue ;
158+ } else {
159+ // remove the old packet from the update time index
160+ tables. update_time. remove( & existing. timestamp( ) . to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
161+ true
162+ }
163+ } _ => {
164+ false
165+ } } ;
166+ let value = packet. serialize( ) ;
167+ tables. signed_packets
168+ . insert( key. as_bytes( ) , & value[ ..] ) . e( ) ?;
169+ tables. update_time
170+ . insert( & packet. timestamp( ) . to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
171+ if replaced {
172+ self . metrics. store_packets_updated. inc( ) ;
158173 } else {
159- // remove the old packet from the update time index
160- tables. update_time. remove( & existing. timestamp( ) . to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
161- true
174+ self . metrics. store_packets_inserted. inc( ) ;
162175 }
163- } _ => {
164- false
165- } } ;
166- let value = packet. serialize( ) ;
167- tables. signed_packets
168- . insert( key. as_bytes( ) , & value[ ..] ) . e( ) ?;
169- tables. update_time
170- . insert( & packet. timestamp( ) . to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
171- if replaced {
172- self . metrics. store_packets_updated. inc( ) ;
173- } else {
174- self . metrics. store_packets_inserted. inc( ) ;
176+ res. send( true ) . ok( ) ;
175177 }
176- res. send( true ) . ok( ) ;
177- }
178- Message :: Remove { key, res } => {
179- trace!( "remove {}" , key) ;
180- let updated = match tables. signed_packets. remove( key. as_bytes( ) ) . e( ) ? { Some ( row) => {
181- let packet = SignedPacket :: deserialize( row. value( ) ) . e( ) ?;
182- tables. update_time. remove( & packet. timestamp( ) . to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
183- self . metrics. store_packets_removed. inc( ) ;
184- true
185- } _ => {
186- false
187- } } ;
188- res. send( updated) . ok( ) ;
189- }
190- Message :: Snapshot { res } => {
191- trace!( "snapshot" ) ;
192- res. send( Snapshot :: new( & self . db) ?) . ok( ) ;
193- }
194- Message :: CheckExpired { key, time } => {
195- trace!( "check expired {} at {}" , key, fmt_time( time) ) ;
196- match get_packet( & tables. signed_packets, & key) ? { Some ( packet) => {
197- let expired = Timestamp :: now( ) - expiry_us;
198- if packet. timestamp( ) < expired {
199- tables. update_time. remove( & time. to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
200- let _ = tables. signed_packets. remove( key. as_bytes( ) ) . e( ) ?;
201- self . metrics. store_packets_expired. inc( ) ;
202- debug!( "removed expired packet {key}" ) ;
203- } else {
204- debug!( "packet {key} is no longer expired, removing obsolete expiry entry" ) ;
178+ Message :: Remove { key, res } => {
179+ trace!( "remove {}" , key) ;
180+ let updated = match tables. signed_packets. remove( key. as_bytes( ) ) . e( ) ? { Some ( row) => {
181+ let packet = SignedPacket :: deserialize( row. value( ) ) . e( ) ?;
182+ tables. update_time. remove( & packet. timestamp( ) . to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
183+ self . metrics. store_packets_removed. inc( ) ;
184+ true
185+ } _ => {
186+ false
187+ } } ;
188+ res. send( updated) . ok( ) ;
189+ }
190+ Message :: Snapshot { res } => {
191+ trace!( "snapshot" ) ;
192+ res. send( Snapshot :: new( & self . db) ?) . ok( ) ;
193+ }
194+ Message :: CheckExpired { key, time } => {
195+ trace!( "check expired {} at {}" , key, fmt_time( time) ) ;
196+ match get_packet( & tables. signed_packets, & key) ? { Some ( packet) => {
197+ let expired = Timestamp :: now( ) - expiry_us;
198+ if packet. timestamp( ) < expired {
199+ tables. update_time. remove( & time. to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
200+ let _ = tables. signed_packets. remove( key. as_bytes( ) ) . e( ) ?;
201+ self . metrics. store_packets_expired. inc( ) ;
202+ debug!( "removed expired packet {key}" ) ;
203+ } else {
204+ debug!( "packet {key} is no longer expired, removing obsolete expiry entry" ) ;
205+ tables. update_time. remove( & time. to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
206+ }
207+ } _ => {
208+ debug!( "expired packet {key} not found, remove from expiry table" ) ;
205209 tables. update_time. remove( & time. to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
206- }
207- } _ => {
208- debug!( "expired packet {key} not found, remove from expiry table" ) ;
209- tables. update_time. remove( & time. to_bytes( ) , key. as_bytes( ) ) . e( ) ?;
210- } }
210+ } }
211+ }
211212 }
212213 }
213214 }
214- }
215215 }
216216 drop ( tables) ;
217217 transaction. commit ( ) . e ( ) ?;
0 commit comments