Skip to content

Commit 030873b

Browse files
authored
Add temporary view option for into_view (apache#1267)
1 parent fe0cf8c commit 030873b

File tree

4 files changed

+111
-14
lines changed

4 files changed

+111
-14
lines changed

python/datafusion/dataframe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def __init__(self, df: DataFrameInternal) -> None:
318318
"""
319319
self.df = df
320320

321-
def into_view(self) -> Table:
321+
def into_view(self, temporary: bool = False) -> Table:
322322
"""Convert ``DataFrame`` into a :class:`~datafusion.Table`.
323323
324324
Examples:
@@ -332,7 +332,7 @@ def into_view(self) -> Table:
332332
"""
333333
from datafusion.catalog import Table as _Table
334334

335-
return _Table(self.df.into_view())
335+
return _Table(self.df.into_view(temporary))
336336

337337
def __getitem__(self, key: str | list[str]) -> DataFrame:
338338
"""Return a new :py:class`DataFrame` with the specified column or columns.

python/tests/test_context.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,16 @@ def test_register_table_from_dataframe(ctx):
357357
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
358358

359359

360-
def test_register_table_from_dataframe_into_view(ctx):
360+
@pytest.mark.parametrize("temporary", [True, False])
361+
def test_register_table_from_dataframe_into_view(ctx, temporary):
361362
df = ctx.from_pydict({"a": [1, 2]})
362-
table = df.into_view()
363+
table = df.into_view(temporary=temporary)
363364
assert isinstance(table, Table)
365+
if temporary:
366+
assert table.kind == "temporary"
367+
else:
368+
assert table.kind == "view"
369+
364370
ctx.register_table("view_tbl", table)
365371
result = ctx.sql("SELECT * FROM view_tbl").collect()
366372
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]

src/dataframe.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use arrow::pyarrow::FromPyArrow;
2828
use datafusion::arrow::datatypes::Schema;
2929
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
3030
use datafusion::arrow::util::pretty;
31+
use datafusion::catalog::TableProvider;
3132
use datafusion::common::UnnestOptions;
3233
use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions};
3334
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
@@ -49,7 +50,7 @@ use crate::expr::sort_expr::to_sort_expressions;
4950
use crate::physical_plan::PyExecutionPlan;
5051
use crate::record_batch::PyRecordBatchStream;
5152
use crate::sql::logical::PyLogicalPlan;
52-
use crate::table::PyTable;
53+
use crate::table::{PyTable, TempViewTable};
5354
use crate::utils::{
5455
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, validate_pycapsule, wait_for_future,
5556
};
@@ -420,11 +421,15 @@ impl PyDataFrame {
420421
/// because we're working with Python bindings
421422
/// where objects are shared
422423
#[allow(clippy::wrong_self_convention)]
423-
pub fn into_view(&self) -> PyDataFusionResult<PyTable> {
424-
// Call the underlying Rust DataFrame::into_view method.
425-
// Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
426-
// so that we don't invalidate this PyDataFrame.
427-
let table_provider = self.df.as_ref().clone().into_view();
424+
pub fn into_view(&self, temporary: bool) -> PyDataFusionResult<PyTable> {
425+
let table_provider = if temporary {
426+
Arc::new(TempViewTable::new(Arc::clone(&self.df))) as Arc<dyn TableProvider>
427+
} else {
428+
// Call the underlying Rust DataFrame::into_view method.
429+
// Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
430+
// so that we don't invalidate this PyDataFrame.
431+
self.df.as_ref().clone().into_view()
432+
};
428433
Ok(PyTable::from(table_provider))
429434
}
430435

src/table.rs

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,22 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::dataframe::PyDataFrame;
19+
use crate::dataset::Dataset;
20+
use crate::utils::table_provider_from_pycapsule;
21+
use arrow::datatypes::SchemaRef;
1822
use arrow::pyarrow::ToPyArrow;
23+
use async_trait::async_trait;
24+
use datafusion::catalog::Session;
25+
use datafusion::common::Column;
1926
use datafusion::datasource::{TableProvider, TableType};
27+
use datafusion::logical_expr::{Expr, LogicalPlanBuilder, TableProviderFilterPushDown};
28+
use datafusion::physical_plan::ExecutionPlan;
29+
use datafusion::prelude::DataFrame;
2030
use pyo3::prelude::*;
31+
use std::any::Any;
2132
use std::sync::Arc;
2233

23-
use crate::dataframe::PyDataFrame;
24-
use crate::dataset::Dataset;
25-
use crate::utils::table_provider_from_pycapsule;
26-
2734
/// This struct is used as a common method for all TableProviders,
2835
/// whether they refer to an FFI provider, an internally known
2936
/// implementation, a dataset, or a dataframe view.
@@ -104,3 +111,82 @@ impl From<Arc<dyn TableProvider>> for PyTable {
104111
Self { table }
105112
}
106113
}
114+
115+
#[derive(Clone, Debug)]
116+
pub(crate) struct TempViewTable {
117+
df: Arc<DataFrame>,
118+
}
119+
120+
/// This is nearly identical to `DataFrameTableProvider`
121+
/// except that it is for temporary tables.
122+
/// Remove when https://github.com/apache/datafusion/issues/18026
123+
/// closes.
124+
impl TempViewTable {
125+
pub(crate) fn new(df: Arc<DataFrame>) -> Self {
126+
Self { df }
127+
}
128+
}
129+
130+
#[async_trait]
131+
impl TableProvider for TempViewTable {
132+
fn as_any(&self) -> &dyn Any {
133+
self
134+
}
135+
136+
fn schema(&self) -> SchemaRef {
137+
Arc::new(self.df.schema().into())
138+
}
139+
140+
fn table_type(&self) -> TableType {
141+
TableType::Temporary
142+
}
143+
144+
async fn scan(
145+
&self,
146+
state: &dyn Session,
147+
projection: Option<&Vec<usize>>,
148+
filters: &[Expr],
149+
limit: Option<usize>,
150+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
151+
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
152+
let plan = self.df.logical_plan().clone();
153+
let mut plan = LogicalPlanBuilder::from(plan);
154+
155+
if let Some(filter) = filter {
156+
plan = plan.filter(filter)?;
157+
}
158+
159+
let mut plan = if let Some(projection) = projection {
160+
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
161+
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
162+
if projection == &current_projection {
163+
plan
164+
} else {
165+
let fields: Vec<Expr> = projection
166+
.iter()
167+
.map(|i| {
168+
Expr::Column(Column::from(
169+
self.df.logical_plan().schema().qualified_field(*i),
170+
))
171+
})
172+
.collect();
173+
plan.project(fields)?
174+
}
175+
} else {
176+
plan
177+
};
178+
179+
if let Some(limit) = limit {
180+
plan = plan.limit(0, Some(limit))?;
181+
}
182+
183+
state.create_physical_plan(&plan.build()?).await
184+
}
185+
186+
fn supports_filters_pushdown(
187+
&self,
188+
filters: &[&Expr],
189+
) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
190+
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
191+
}
192+
}

0 commit comments

Comments
 (0)