Skip to content

Commit

Permalink
refactor: mutex removed from ListingSchemaProvider, no longer loads o…
Browse files Browse the repository at this point in the history
…n register, and added a load_table function

Signed-off-by: Alexander Falk <[email protected]>
  • Loading branch information
Nordalf committed Feb 19, 2025
1 parent 45162d2 commit 3699cf7
Showing 1 changed file with 39 additions and 28 deletions.
67 changes: 39 additions & 28 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;
Expand All @@ -13,9 +13,9 @@ use futures::TryStreamExt;
use object_store::ObjectStore;

use crate::errors::DeltaResult;
use crate::storage::*;
use crate::table::builder::ensure_table_uri;
use crate::DeltaTableBuilder;
use crate::{storage::*, DeltaTable};

const DELTA_LOG_FOLDER: &str = "_delta_log";

Expand All @@ -36,7 +36,7 @@ pub struct ListingSchemaProvider {
/// Underlying object store
store: Arc<dyn ObjectStore>,
/// A map of table names to a fully quilfied storage location
tables: Mutex<DashMap<String, Arc<dyn TableProvider>>>,
tables: DashMap<String, Arc<dyn TableProvider>>,
/// Options used to create underlying object stores
storage_options: StorageOptions,
}
Expand All @@ -54,7 +54,7 @@ impl ListingSchemaProvider {
Ok(Self {
authority: uri.to_string(),
store,
tables: Mutex::new(DashMap::new()),
tables: DashMap::new(),
storage_options,
})
}
Expand Down Expand Up @@ -84,8 +84,7 @@ impl ListingSchemaProvider {
let table_url = format!("{}/{}", self.authority, table_path);
let Ok(delta_table) = DeltaTableBuilder::from_uri(table_url)
.with_storage_options(self.storage_options.0.clone())
.load()
.await
.build()
else {
continue;
};
Expand All @@ -94,6 +93,35 @@ impl ListingSchemaProvider {
}
Ok(())
}

/// Tables are not initialized in Eddytor but have a reference setup. To initialize the delta
/// table, the `load()` function must be called on the delta table. This function helps with
/// that and ensures the DashMap is updated
pub async fn load_delta(&self, table_name: &str) -> datafusion::common::Result<()> {
if let Some(mut table) = self.tables.get_mut(&table_name.to_string()) {
if let Some(delta_table) = table.value().as_any().downcast_ref::<DeltaTable>() {
// If table has not yet been loaded, we remove it from the tables map and add it again
if delta_table.state.is_none() {
let mut delta_table = delta_table.clone();
delta_table.load().await?;
*table = Arc::from(delta_table);
//match delta_table.load().await {
// Ok(()) => {
// // Add the table back to the DashMap
// }
// Err(err) => {
// return Err(DataFusionError::Internal(format!(
// "Cannot load delta table. See stacktrace: {}",
// err.to_string()
// )))
// }
//}
}
}
}

Ok(())
}
}

// normalizes a path fragment to be a valida table name in datafusion
Expand All @@ -116,22 +144,11 @@ impl SchemaProvider for ListingSchemaProvider {
}

fn table_names(&self) -> Vec<String> {
self.tables
.lock()
.expect("Can't lock tables")
.iter()
.map(|t| t.key().clone())
.collect()
self.tables.iter().map(|t| t.key().clone()).collect()
}

async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(provider) = self
.tables
.lock()
.expect("Can't lock tables")
.get(name)
.map(|t| t.clone())
else {
let Some(provider) = self.tables.get(name).map(|t| t.clone()) else {
return Ok(None);
};
Ok(Some(provider))
Expand All @@ -142,28 +159,22 @@ impl SchemaProvider for ListingSchemaProvider {
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.tables
.lock()
.expect("Can't lock tables")
.insert(name, table.clone());
self.tables.insert(name, table.clone());
Ok(Some(table))
}

fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if let Some(table) = self.tables.lock().expect("Can't lock tables").remove(name) {
if let Some(table) = self.tables.remove(name) {
return Ok(Some(table.1));
}
Ok(None)
}

fn table_exist(&self, name: &str) -> bool {
self.tables
.lock()
.expect("Can't lock tables")
.contains_key(name)
self.tables.contains_key(name)
}
}

Expand Down

0 comments on commit 3699cf7

Please sign in to comment.