From 33f03b9891065d695490cd23d4a41b84377a559f Mon Sep 17 00:00:00 2001 From: Alexander Falk Date: Mon, 14 Oct 2024 12:37:31 +0200 Subject: [PATCH] feat: Support register/deregister of tables in ListingSchemaProvider incl. func to load a table at a later point Signed-off-by: Alexander Falk --- crates/core/src/data_catalog/storage/mod.rs | 61 +++++++++++++++------ 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 3e08c5f463..4c9ec6fb55 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -13,9 +13,9 @@ use futures::TryStreamExt; use object_store::ObjectStore; use crate::errors::DeltaResult; -use crate::open_table_with_storage_options; -use crate::storage::*; use crate::table::builder::ensure_table_uri; +use crate::DeltaTableBuilder; +use crate::{storage::*, DeltaTable}; const DELTA_LOG_FOLDER: &str = "_delta_log"; @@ -36,7 +36,7 @@ pub struct ListingSchemaProvider { /// Underlying object store store: Arc, /// A map of table names to a fully quilfied storage location - tables: DashMap, + tables: DashMap>, /// Options used to create underlying object stores storage_options: StorageOptions, } @@ -73,6 +73,7 @@ impl ListingSchemaProvider { parent = p; } } + for table in tables.into_iter() { let table_name = normalize_table_name(table)?; let table_path = table @@ -80,10 +81,34 @@ impl ListingSchemaProvider { .ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))? .to_string(); if !self.table_exist(&table_name) { - let table_url = format!("{}/{table_path}", self.authority); - self.tables.insert(table_name.to_string(), table_url); + let table_url = format!("{}/{}", self.authority, table_path); + let Ok(delta_table) = DeltaTableBuilder::from_uri(table_url) + .with_storage_options(self.storage_options.0.clone()) + .build() + else { + continue; + }; + let _ = self.register_table(table_name, Arc::new(delta_table)); + } + } + Ok(()) + } + + /// Tables are not initialized but have a reference setup. To initialize the delta + /// table, the `load()` function must be called on the delta table. This function helps with + /// that and ensures the DashMap is updated + pub async fn load_delta(&self, table_name: &str) -> datafusion::common::Result<()> { + if let Some(mut table) = self.tables.get_mut(&table_name.to_string()) { + if let Some(delta_table) = table.value().as_any().downcast_ref::() { + // If table has not yet been loaded, we remove it from the tables map and add it again + if delta_table.state.is_none() { + let mut delta_table = delta_table.clone(); + delta_table.load().await?; + *table = Arc::from(delta_table); + } } } + Ok(()) } } @@ -112,31 +137,31 @@ impl SchemaProvider for ListingSchemaProvider { } async fn table(&self, name: &str) -> datafusion_common::Result>> { - let Some(location) = self.tables.get(name).map(|t| t.clone()) else { + let Some(provider) = self.tables.get(name).map(|t| t.clone()) else { return Ok(None); }; - let provider = - open_table_with_storage_options(location, self.storage_options.0.clone()).await?; - Ok(Some(Arc::new(provider) as Arc)) + Ok(Some(provider)) } fn register_table( &self, - _name: String, - _table: Arc, + name: String, + table: Arc, ) -> datafusion_common::Result>> { - Err(DataFusionError::Execution( - "schema provider does not support registering tables".to_owned(), - )) + if !self.table_exist(name.as_str()) { + self.tables.insert(name, table.clone()); + } + Ok(Some(table)) } fn deregister_table( &self, - _name: &str, + name: &str, ) -> datafusion_common::Result>> { - Err(DataFusionError::Execution( - "schema provider does not support deregistering tables".to_owned(), - )) + if let Some(table) = self.tables.remove(name) { + return Ok(Some(table.1)); + } + Ok(None) } fn table_exist(&self, name: &str) -> bool {