1- use std:: { marker:: PhantomData , sync:: Arc } ;
1+ use std:: {
2+ marker:: PhantomData ,
3+ sync:: {
4+ atomic:: { AtomicI64 , Ordering } ,
5+ Arc ,
6+ } ,
7+ } ;
28
39use bincode:: { deserialize, serialize} ;
10+ use log:: { error, warn} ;
411use prost:: Message ;
512use rocksdb:: { properties as RocksProperties , ColumnFamily } ;
613use serde:: de:: DeserializeOwned ;
@@ -14,7 +21,7 @@ use super::{
1421 rocks_db:: Rocks ,
1522} ;
1623use crate :: {
17- database:: write_batch:: WriteBatch ,
24+ database:: { columns :: DIRTY_COUNT , write_batch:: WriteBatch } ,
1825 errors:: { LedgerError , LedgerResult } ,
1926 metrics:: {
2027 maybe_enable_rocksdb_perf, report_rocksdb_read_perf,
3441 pub column_options : Arc < LedgerColumnOptions > ,
3542 pub read_perf_status : PerfSamplingStatus ,
3643 pub write_perf_status : PerfSamplingStatus ,
44+ // We are caching the column item counts since they are expensive to obtain.
45+ // `-1` indicates that they are "dirty" //
46+ // // We are using an i64 to make this work even though the counts are usize,
47+ // // however if we had 50,000 transactions/sec and 50ms slots for 100 years then:
48+ // //
49+ // // slots: 200 * 3600 * 24 * 365 * 100 = 630,720,000,000
50+ // // txs: 50,000 * 3600 * 24 * 365 * 100 = 157,680,000,000,000
51+ // // i64::MAX = 9,223,372,036,854,775,807
52+ pub entry_counter : AtomicI64 ,
3753}
3854
3955impl < C : Column + ColumnName > LedgerColumn < C > {
@@ -260,6 +276,39 @@ where
260276 pub fn flush ( & self ) -> LedgerResult < ( ) > {
261277 self . backend . flush_cf ( self . handle ( ) )
262278 }
279+
280+ pub fn count_column_using_cache ( & self ) -> LedgerResult < i64 > {
281+ let cached = self . entry_counter . load ( Ordering :: Relaxed ) ;
282+ if cached != DIRTY_COUNT {
283+ return Ok ( cached) ;
284+ }
285+
286+ self
287+ . iter ( IteratorMode :: Start )
288+ . map ( Iterator :: count)
289+ . map ( |val| if val > i64:: MAX as usize {
290+ // NOTE: this value is only used for metrics/diagnostics and
291+ // aside from the fact that we will never encounter this case,
292+ // it is good enough to return i64::MAX
293+ error ! ( "Column {} count is too large: {} for metrics, returning max." , C :: NAME , val) ;
294+ i64:: MAX
295+ } else { val as i64 } )
296+ . inspect ( |updated| self . entry_counter . store ( * updated, Ordering :: Relaxed ) )
297+ }
298+
299+ /// Increases entries counter if it's not [`DIRTY_COUNT`]
300+ /// Otherwise just skips it until it is set
301+ #[ inline( always) ]
302+ pub fn try_increase_entry_counter ( & self , by : u64 ) {
303+ try_increase_entry_counter ( & self . entry_counter , by) ;
304+ }
305+
306+ /// Decreases entries counter if it's not [`DIRTY_COUNT`]
307+ /// Otherwise just skips it until it is set
308+ #[ inline( always) ]
309+ pub fn try_decrease_entry_counter ( & self , by : u64 ) {
310+ try_decrease_entry_counter ( & self . entry_counter , by) ;
311+ }
263312}
264313
265314impl < C > LedgerColumn < C >
@@ -490,3 +539,68 @@ where
490539 } )
491540 }
492541}
542+
543+ /// Increases entries counter if it's not [`DIRTY_COUNT`]
544+ /// Otherwise just skips it until it is set
545+ pub fn try_increase_entry_counter ( entry_counter : & AtomicI64 , by : u64 ) {
546+ loop {
547+ let prev = entry_counter. load ( Ordering :: Acquire ) ;
548+ if prev == DIRTY_COUNT {
549+ return ;
550+ }
551+
552+ // In case value changed to [`DIRTY_COUNT`] in between
553+ if entry_counter
554+ . compare_exchange (
555+ prev,
556+ prev + by as i64 ,
557+ Ordering :: AcqRel ,
558+ Ordering :: Relaxed ,
559+ )
560+ . is_ok ( )
561+ {
562+ return ;
563+ }
564+ }
565+ }
566+
567+ /// Decreases entries counter if it's not [`DIRTY_COUNT`]
568+ /// Otherwise just skips it until it is set
569+ pub fn try_decrease_entry_counter ( entry_counter : & AtomicI64 , by : u64 ) {
570+ loop {
571+ let prev = entry_counter. load ( Ordering :: Acquire ) ;
572+ if prev == DIRTY_COUNT {
573+ return ;
574+ }
575+
576+ let new = prev - by as i64 ;
577+ if new >= 0 {
578+ // In case value changed to [`DIRTY_COUNT`] in between
579+ if entry_counter
580+ . compare_exchange (
581+ prev,
582+ new,
583+ Ordering :: AcqRel ,
584+ Ordering :: Relaxed ,
585+ )
586+ . is_ok ( )
587+ {
588+ return ;
589+ }
590+ } else {
591+ warn ! ( "Negative entry counter!" ) ;
592+ // In case value fixed to valid one in between
593+ if entry_counter
594+ . compare_exchange (
595+ prev,
596+ DIRTY_COUNT ,
597+ Ordering :: AcqRel ,
598+ Ordering :: Relaxed ,
599+ )
600+ . is_ok ( )
601+ {
602+ return ;
603+ }
604+ }
605+ }
606+ }
0 commit comments