@@ -2524,67 +2524,73 @@ impl Coordinator {
25242524 }
25252525 }
25262526
2527+ /// Inner method that performs the actual real-time recency timestamp determination.
2528+ /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
2529+ /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
2530+ pub ( crate ) async fn determine_real_time_recent_timestamp (
2531+ & self ,
2532+ source_ids : impl Iterator < Item = GlobalId > ,
2533+ real_time_recency_timeout : Duration ,
2534+ ) -> Result < Option < BoxFuture < ' static , Result < Timestamp , StorageError < Timestamp > > > > , AdapterError >
2535+ {
2536+ let item_ids = source_ids. map ( |gid| self . catalog . resolve_item_id ( & gid) ) ;
2537+
2538+ // Find all dependencies transitively because we need to ensure that
2539+ // RTR queries determine the timestamp from the sources' (i.e.
2540+ // storage objects that ingest data from external systems) remap
2541+ // data. We "cheat" a little bit and filter out any IDs that aren't
2542+ // user objects because we know they are not a RTR source.
2543+ let mut to_visit = VecDeque :: from_iter ( item_ids. filter ( CatalogItemId :: is_user) ) ;
2544+ // If none of the sources are user objects, we don't need to provide
2545+ // a RTR timestamp.
2546+ if to_visit. is_empty ( ) {
2547+ return Ok ( None ) ;
2548+ }
2549+
2550+ let mut timestamp_objects = BTreeSet :: new ( ) ;
2551+
2552+ while let Some ( id) = to_visit. pop_front ( ) {
2553+ timestamp_objects. insert ( id) ;
2554+ to_visit. extend (
2555+ self . catalog ( )
2556+ . get_entry ( & id)
2557+ . uses ( )
2558+ . into_iter ( )
2559+ . filter ( |id| !timestamp_objects. contains ( id) && id. is_user ( ) ) ,
2560+ ) ;
2561+ }
2562+ let timestamp_objects = timestamp_objects
2563+ . into_iter ( )
2564+ . flat_map ( |item_id| self . catalog ( ) . get_entry ( & item_id) . global_ids ( ) )
2565+ . collect ( ) ;
2566+
2567+ let r = self
2568+ . controller
2569+ . determine_real_time_recent_timestamp ( timestamp_objects, real_time_recency_timeout)
2570+ . await ?;
2571+
2572+ Ok ( Some ( r) )
2573+ }
2574+
25272575 /// Checks to see if the session needs a real time recency timestamp and if so returns
25282576 /// a future that will return the timestamp.
2529- pub ( super ) async fn determine_real_time_recent_timestamp (
2577+ pub ( crate ) async fn determine_real_time_recent_timestamp_if_needed (
25302578 & self ,
25312579 session : & Session ,
2532- source_ids : impl Iterator < Item = CatalogItemId > ,
2580+ source_ids : impl Iterator < Item = GlobalId > ,
25332581 ) -> Result < Option < BoxFuture < ' static , Result < Timestamp , StorageError < Timestamp > > > > , AdapterError >
25342582 {
25352583 let vars = session. vars ( ) ;
25362584
2537- // Ideally this logic belongs inside of
2538- // `mz-adapter::coord::timestamp_selection::determine_timestamp`. However, including the
2539- // logic in there would make it extremely difficult and inconvenient to pull the waiting off
2540- // of the main coord thread.
2541- let r = if vars. real_time_recency ( )
2585+ if vars. real_time_recency ( )
25422586 && vars. transaction_isolation ( ) == & IsolationLevel :: StrictSerializable
25432587 && !session. contains_read_timestamp ( )
25442588 {
2545- // Find all dependencies transitively because we need to ensure that
2546- // RTR queries determine the timestamp from the sources' (i.e.
2547- // storage objects that ingest data from external systems) remap
2548- // data. We "cheat" a little bit and filter out any IDs that aren't
2549- // user objects because we know they are not a RTR source.
2550- let mut to_visit = VecDeque :: from_iter ( source_ids. filter ( CatalogItemId :: is_user) ) ;
2551- // If none of the sources are user objects, we don't need to provide
2552- // a RTR timestamp.
2553- if to_visit. is_empty ( ) {
2554- return Ok ( None ) ;
2555- }
2556-
2557- let mut timestamp_objects = BTreeSet :: new ( ) ;
2558-
2559- while let Some ( id) = to_visit. pop_front ( ) {
2560- timestamp_objects. insert ( id) ;
2561- to_visit. extend (
2562- self . catalog ( )
2563- . get_entry ( & id)
2564- . uses ( )
2565- . into_iter ( )
2566- . filter ( |id| !timestamp_objects. contains ( id) && id. is_user ( ) ) ,
2567- ) ;
2568- }
2569- let timestamp_objects = timestamp_objects
2570- . into_iter ( )
2571- . flat_map ( |item_id| self . catalog ( ) . get_entry ( & item_id) . global_ids ( ) )
2572- . collect ( ) ;
2573-
2574- let r = self
2575- . controller
2576- . determine_real_time_recent_timestamp (
2577- timestamp_objects,
2578- * vars. real_time_recency_timeout ( ) ,
2579- )
2580- . await ?;
2581-
2582- Some ( r)
2589+ self . determine_real_time_recent_timestamp ( source_ids, * vars. real_time_recency_timeout ( ) )
2590+ . await
25832591 } else {
2584- None
2585- } ;
2586-
2587- Ok ( r)
2592+ Ok ( None )
2593+ }
25882594 }
25892595
25902596 #[ instrument]
0 commit comments