Skip to content

Commit

Permalink
docs(integration/object_store): add example for datafusion (#5543)
Browse files Browse the repository at this point in the history
  • Loading branch information
meteorgan authored Jan 15, 2025
1 parent 781682c commit a8b793b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 2 deletions.
2 changes: 2 additions & 0 deletions integrations/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] }
anyhow = "1.0.86"
libtest-mimic = "0.7.3"
uuid = "1.11.0"
datafusion = "44.0.0"
url = "2.5.2"
83 changes: 81 additions & 2 deletions integrations/object_store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@ This crate can help you to access 30 more storage services with the same object_

## Examples

`opendal_store_opendal` depends on the `opendal` crate. Please make sure to always use the latest versions of both.

latest `object_store_opendal` ![Crate](https://img.shields.io/crates/v/object_store_opendal.svg)

latest `opendal` ![Crate](https://img.shields.io/crates/v/opendal.svg)

### 1. using `object_store` API to access S3

Add the following dependencies to your `Cargo.toml` with correct version:

```toml
[dependencies]
object_store = "0.11.0"
object_store_opendal = "0.47.0"
opendal = { version = "0.49.0", features = ["services-s3"] }
object_store_opendal = "xxx" # see the latest version above
opendal = { version = "xxx", features = ["services-s3"] } # see the latest version above
```

Build `OpendalStore` via `opendal::Operator`:
Expand Down Expand Up @@ -78,6 +86,77 @@ async fn main() {
}
```

### 2. querying data in a S3 bucket using DataFusion

Add the following dependencies to your `Cargo.toml` with correct version:

```toml
[dependencies]
object_store = "0.11.0"
object_store_opendal = "xxx" # see the latest version above
opendal = { version = "xxx", features = ["services-s3"] } # see the latest version above
datafusion = "44.0.0"
url = "2.5.2"
```

Build `OpendalStore` via `opendal::Operator` and register it to `DataFusion`:

```rust
use datafusion::error::DataFusionError;
use datafusion::error::Result;
use datafusion::prelude::*;
use opendal::services::S3;
use opendal::Operator;
use std::sync::Arc;
use url::Url;


#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();

// Configure OpenDAL for S3
let region = "my_region";
let bucket_name = "my_bucket";
let builder = S3::default()
.endpoint("my_endpoint")
.bucket(bucket_name)
.region(region)
.access_key_id("my_access_key")
.secret_access_key("my_secret_key");
let op = Operator::new(builder)
.map_err(|err| DataFusionError::External(Box::new(err)))?
.finish();
let store = object_store_opendal::OpendalStore::new(op);

// Register the object store
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
ctx.register_object_store(&s3_url, Arc::new(store));

// Register CSV file as a table
let path = format!("s3://{bucket_name}/csv/data.csv");
ctx.register_csv("trips", &path, CsvReadOptions::default())
.await?;

// Execute the query
let df = ctx.sql("SELECT * FROM trips LIMIT 10").await?;
// Print the results
df.show().await?;

// Dynamic query using the file path directly
let ctx = ctx.enable_url_table();
let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;
// Print the results
df.show().await?;

Ok(())
}
```


## WASM support

To build with `wasm32-unknown-unknown` target, you need to enable the `send_wrapper` feature:
Expand Down
52 changes: 52 additions & 0 deletions integrations/object_store/examples/datafusion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use datafusion::error::DataFusionError;
use datafusion::error::Result;
use datafusion::prelude::*;
use opendal::services::S3;
use opendal::Operator;
use std::sync::Arc;
use url::Url;

/// This example demonstrates querying data in a S3 bucket using DataFusion and `object_store_opendal`
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();

// Configure OpenDAL for S3
let region = "my_region";
let bucket_name = "my_bucket";
let builder = S3::default()
.endpoint("my_endpoint")
.bucket(bucket_name)
.region(region)
.access_key_id("my_access_key")
.secret_access_key("my_secret_key");
let op = Operator::new(builder)
.map_err(|err| DataFusionError::External(Box::new(err)))?
.finish();
let store = object_store_opendal::OpendalStore::new(op);

// Register the object store
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
ctx.register_object_store(&s3_url, Arc::new(store));

// Register CSV file as a table
let path = format!("s3://{bucket_name}/csv/data.csv");
ctx.register_csv("trips", &path, CsvReadOptions::default())
.await?;

// Execute the query
let df = ctx.sql("SELECT * FROM trips LIMIT 10").await?;
// Print the results
df.show().await?;

// Dynamic query using the file path directly
let ctx = ctx.enable_url_table();
let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;
// Print the results
df.show().await?;

Ok(())
}

0 comments on commit a8b793b

Please sign in to comment.