diff --git a/datafusion-testing b/datafusion-testing
deleted file mode 160000
index 3462eaa78745..000000000000
--- a/datafusion-testing
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 3462eaa787459957e38df267a4a21f5bea605807
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 18a1318dd40d..fd186fbfe7dd 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -347,12 +347,5 @@ mod tests {
 
             Ok(RecordBatch::try_new(schema, new_columns).unwrap())
         }
-
-        fn map_partial_batch(
-            &self,
-            batch: RecordBatch,
-        ) -> datafusion_common::Result<RecordBatch> {
-            self.map_batch(batch)
-        }
     }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 888f3ad9e3b9..b5534d6b3d1c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -224,6 +224,327 @@ mod tests {
         )
     }
 
+    #[tokio::test]
+    async fn test_pushdown_with_missing_column_in_file() {
+        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+
+        let file_schema =
+            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
+
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c2", DataType::Int32, true),
+        ]));
+
+        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();
+
+        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
+        // the default behavior is to fill in missing columns with nulls.
+        // Thus this predicate will come back as false.
+        let filter = col("c2").eq(lit(1_i32));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let total_rows = rt
+            .batches
+            .unwrap()
+            .iter()
+            .map(|b| b.num_rows())
+            .sum::<usize>();
+        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+        // If we excplicitly allow nulls the rest of the predicate should work
+        let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let batches = rt.batches.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+----+----+",
+            "| c1 | c2 |",
+            "+----+----+",
+            "| 1  |    |",
+            "+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &batches);
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 2, "Expected all rows to be pruned");
+    }
+
+    #[tokio::test]
+    async fn test_pushdown_with_missing_column_in_file_multiple_types() {
+        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+
+        let file_schema =
+            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
+
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c2", DataType::Utf8, true),
+        ]));
+
+        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();
+
+        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
+        // the default behavior is to fill in missing columns with nulls.
+        // Thus this predicate will come back as false.
+        let filter = col("c2").eq(lit("abc"));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let total_rows = rt
+            .batches
+            .unwrap()
+            .iter()
+            .map(|b| b.num_rows())
+            .sum::<usize>();
+        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+        // If we excplicitly allow nulls the rest of the predicate should work
+        let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let batches = rt.batches.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+----+----+",
+            "| c1 | c2 |",
+            "+----+----+",
+            "| 1  |    |",
+            "+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &batches);
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 2, "Expected all rows to be pruned");
+    }
+
+    #[tokio::test]
+    async fn test_pushdown_with_missing_middle_column() {
+        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+        let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
+
+        let file_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c3", DataType::Int32, true),
+        ]));
+
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c2", DataType::Utf8, true),
+            Field::new("c3", DataType::Int32, true),
+        ]));
+
+        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();
+
+        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
+        // the default behavior is to fill in missing columns with nulls.
+        // Thus this predicate will come back as false.
+        let filter = col("c2").eq(lit("abc"));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let total_rows = rt
+            .batches
+            .unwrap()
+            .iter()
+            .map(|b| b.num_rows())
+            .sum::<usize>();
+        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+        // If we excplicitly allow nulls the rest of the predicate should work
+        let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let batches = rt.batches.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+----+----+----+",
+            "| c1 | c2 | c3 |",
+            "+----+----+----+",
+            "| 1  |    | 7  |",
+            "+----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &batches);
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 2, "Expected all rows to be pruned");
+    }
+
+    #[tokio::test]
+    async fn test_pushdown_with_file_column_order_mismatch() {
+        let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
+
+        let file_schema = Arc::new(Schema::new(vec![
+            Field::new("c3", DataType::Int32, true),
+            Field::new("c3", DataType::Int32, true),
+        ]));
+
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c2", DataType::Utf8, true),
+            Field::new("c3", DataType::Int32, true),
+        ]));
+
+        let batch =
+            RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap();
+
+        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
+        // the default behavior is to fill in missing columns with nulls.
+        // Thus this predicate will come back as false.
+        let filter = col("c2").eq(lit("abc"));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let total_rows = rt
+            .batches
+            .unwrap()
+            .iter()
+            .map(|b| b.num_rows())
+            .sum::<usize>();
+        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+        // If we excplicitly allow nulls the rest of the predicate should work
+        let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32)));
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+        let batches = rt.batches.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+----+----+----+",
+            "| c1 | c2 | c3 |",
+            "+----+----+----+",
+            "|    |    | 7  |",
+            "+----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &batches);
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 2, "Expected all rows to be pruned");
+    }
+
+    #[tokio::test]
+    async fn test_pushdown_with_missing_column_nested_conditions() {
+        // Create test data with c1 and c3 columns
+        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let c3: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50]));
+
+        let file_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c3", DataType::Int32, true),
+        ]));
+
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c2", DataType::Int32, true),
+            Field::new("c3", DataType::Int32, true),
+        ]));
+
+        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();
+
+        // Test with complex nested AND/OR:
+        // (c1 = 1 OR c2 = 5) AND (c3 = 10 OR c2 IS NULL)
+        // Should return 1 row where c1=1 AND c3=10 (since c2 IS NULL is always true)
+        let filter = col("c1")
+            .eq(lit(1_i32))
+            .or(col("c2").eq(lit(5_i32)))
+            .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null()));
+
+        let rt = RoundTrip::new()
+            .with_schema(table_schema.clone())
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch.clone()])
+            .await;
+
+        let batches = rt.batches.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+----+----+----+",
+            "| c1 | c2 | c3 |",
+            "+----+----+----+",
+            "| 1  |    | 10 |",
+            "+----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &batches);
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 4, "Expected 4 rows to be pruned");
+
+        // Test a more complex nested condition:
+        // (c1 < 3 AND c2 IS NOT NULL) OR (c3 > 20 AND c2 IS NULL)
+        // First part should return 0 rows (c2 IS NOT NULL is always false)
+        // Second part should return rows where c3 > 20 (3 rows: where c3 is 30, 40, 50)
+        let filter = col("c1")
+            .lt(lit(3_i32))
+            .and(col("c2").is_not_null())
+            .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null()));
+
+        let rt = RoundTrip::new()
+            .with_schema(table_schema)
+            .with_predicate(filter.clone())
+            .with_pushdown_predicate()
+            .round_trip(vec![batch])
+            .await;
+
+        let batches = rt.batches.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+----+----+----+",
+            "| c1 | c2 | c3 |",
+            "+----+----+----+",
+            "| 3  |    | 30 |",
+            "| 4  |    | 40 |",
+            "| 5  |    | 50 |",
+            "+----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &batches);
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        let metric = get_value(&metrics, "pushdown_rows_pruned");
+        assert_eq!(metric, 2, "Expected 2 rows to be pruned");
+    }
+
     #[tokio::test]
     async fn evolved_schema() {
         let c1: ArrayRef =
diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs
index 3c623f558e43..fed5e6d5d59c 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -107,6 +107,7 @@ impl FileOpener for ParquetOpener {
 
         let projected_schema =
             SchemaRef::from(self.table_schema.project(&self.projection)?);
+        let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
         let schema_adapter = self
             .schema_adapter_factory
             .create(projected_schema, Arc::clone(&self.table_schema));
@@ -173,7 +174,7 @@ impl FileOpener for ParquetOpener {
                     builder.metadata(),
                     reorder_predicates,
                     &file_metrics,
-                    Arc::clone(&schema_mapping),
+                    &schema_adapter_factory,
                 );
 
                 match row_filter {
diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs
index 39fcecf37c6d..da6bf114d71d 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -64,7 +64,7 @@ use std::collections::BTreeSet;
 use std::sync::Arc;
 
 use arrow::array::BooleanArray;
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
 use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
@@ -72,12 +72,10 @@ use parquet::arrow::ProjectionMask;
 use parquet::file::metadata::ParquetMetaData;
 
 use datafusion_common::cast::as_boolean_array;
-use datafusion_common::tree_node::{
-    Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
-};
-use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
-use datafusion_datasource::schema_adapter::SchemaMapper;
-use datafusion_physical_expr::expressions::{Column, Literal};
+use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
+use datafusion_common::Result;
+use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
+use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::utils::reassign_predicate_columns;
 use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
 
@@ -102,8 +100,6 @@ pub(crate) struct DatafusionArrowPredicate {
     /// Path to the columns in the parquet schema required to evaluate the
     /// expression
     projection_mask: ProjectionMask,
-    /// Columns required to evaluate the expression in the arrow schema
-    projection: Vec<usize>,
     /// how many rows were filtered out by this predicate
     rows_pruned: metrics::Count,
     /// how many rows passed this predicate
@@ -111,34 +107,24 @@ pub(crate) struct DatafusionArrowPredicate {
     /// how long was spent evaluating this predicate
     time: metrics::Time,
     /// used to perform type coercion while filtering rows
-    schema_mapping: Arc<dyn SchemaMapper>,
+    schema_mapper: Arc<dyn SchemaMapper>,
 }
 
 impl DatafusionArrowPredicate {
     /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
     pub fn try_new(
         candidate: FilterCandidate,
-        schema: &Schema,
         metadata: &ParquetMetaData,
         rows_pruned: metrics::Count,
         rows_matched: metrics::Count,
         time: metrics::Time,
-        schema_mapping: Arc<dyn SchemaMapper>,
     ) -> Result<Self> {
-        let schema = Arc::new(schema.project(&candidate.projection)?);
-        let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?;
-
-        // ArrowPredicate::evaluate is passed columns in the order they appear in the file
-        // If the predicate has multiple columns, we therefore must project the columns based
-        // on the order they appear in the file
-        let projection = match candidate.projection.len() {
-            0 | 1 => vec![],
-            2.. => remap_projection(&candidate.projection),
-        };
+        let projected_schema = Arc::clone(&candidate.filter_schema);
+        let physical_expr =
+            reassign_predicate_columns(candidate.expr, &projected_schema, true)?;
 
         Ok(Self {
             physical_expr,
-            projection,
             projection_mask: ProjectionMask::roots(
                 metadata.file_metadata().schema_descr(),
                 candidate.projection,
@@ -146,7 +132,7 @@ impl DatafusionArrowPredicate {
             rows_pruned,
             rows_matched,
             time,
-            schema_mapping,
+            schema_mapper: candidate.schema_mapper,
         })
     }
 }
@@ -156,12 +142,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
         &self.projection_mask
     }
 
-    fn evaluate(&mut self, mut batch: RecordBatch) -> ArrowResult<BooleanArray> {
-        if !self.projection.is_empty() {
-            batch = batch.project(&self.projection)?;
-        };
-
-        let batch = self.schema_mapping.map_partial_batch(batch)?;
+    fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+        let batch = self.schema_mapper.map_batch(batch)?;
 
         // scoped timer updates on drop
         let mut timer = self.time.timer();
@@ -194,9 +176,22 @@ impl ArrowPredicate for DatafusionArrowPredicate {
 /// See the module level documentation for more information.
 pub(crate) struct FilterCandidate {
     expr: Arc<dyn PhysicalExpr>,
+    /// Estimate for the total number of bytes that will need to be processed
+    /// to evaluate this filter. This is used to estimate the cost of evaluating
+    /// the filter and to order the filters when `reorder_predicates` is true.
+    /// This is generated by summing the compressed size of all columns that the filter references.
     required_bytes: usize,
+    /// Can this filter use an index (e.g. a page index) to prune rows?
     can_use_index: bool,
+    /// The projection to read from the file schema to get the columns
+    /// required to pass thorugh a `SchemaMapper` to the table schema
+    /// upon which we then evaluate the filter expression.
     projection: Vec<usize>,
+    ///  A `SchemaMapper` used to map batches read from the file schema to
+    /// the filter's projection of the table schema.
+    schema_mapper: Arc<dyn SchemaMapper>,
+    /// The projected table schema that this filter references
+    filter_schema: SchemaRef,
 }
 
 /// Helper to build a `FilterCandidate`.
@@ -220,41 +215,40 @@ pub(crate) struct FilterCandidate {
 /// but old files do not have the columns.
 ///
 /// When a file is missing a column from the table schema, the value of the
-/// missing column is filled in with `NULL`  via a `SchemaAdapter`.
+/// missing column is filled in by a `SchemaAdapter` (by default as `NULL`).
 ///
 /// When a predicate is pushed down to the parquet reader, the predicate is
-/// evaluated in the context of the file schema. If the predicate references a
-/// column that is in the table schema but not in the file schema, the column
-/// reference must be rewritten to a literal expression that represents the
-/// `NULL` value that would be produced by the `SchemaAdapter`.
-///
-/// For example, if:
-/// * The table schema is `id, name, address`
-/// * The file schema is  `id, name` (missing the `address` column)
-/// * predicate is `address = 'foo'`
-///
-/// When evaluating the predicate as a filter on the parquet file, the predicate
-/// must be rewritten to `NULL = 'foo'` as the `address` column will be filled
-/// in with `NULL` values during the rest of the evaluation.
-struct FilterCandidateBuilder<'a> {
+/// evaluated in the context of the file schema.
+/// For each predicate we build a filter schema which is the projection of the table
+/// schema that contains only the columns that this filter references.
+/// If any columns from the file schema are missing from a particular file they are
+/// added by the `SchemaAdapter`, by default as `NULL`.
+struct FilterCandidateBuilder {
     expr: Arc<dyn PhysicalExpr>,
-    /// The schema of this parquet file
-    file_schema: &'a Schema,
+    /// The schema of this parquet file.
+    /// Columns may have different types from the table schema and there may be
+    /// columns in the file schema that are not in the table schema or columns that
+    /// are in the table schema that are not in the file schema.
+    file_schema: SchemaRef,
     /// The schema of the table (merged schema) -- columns may be in different
     /// order than in the file and have columns that are not in the file schema
-    table_schema: &'a Schema,
+    table_schema: SchemaRef,
+    /// A `SchemaAdapterFactory` used to map the file schema to the table schema.
+    schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
 }
 
-impl<'a> FilterCandidateBuilder<'a> {
+impl FilterCandidateBuilder {
     pub fn new(
         expr: Arc<dyn PhysicalExpr>,
-        file_schema: &'a Schema,
-        table_schema: &'a Schema,
+        file_schema: Arc<Schema>,
+        table_schema: Arc<Schema>,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
     ) -> Self {
         Self {
             expr,
             file_schema,
             table_schema,
+            schema_adapter_factory,
         }
     }
 
@@ -266,20 +260,32 @@ impl<'a> FilterCandidateBuilder<'a> {
     /// * `Ok(None)` if the expression cannot be used as an ArrowFilter
     /// * `Err(e)` if an error occurs while building the candidate
     pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
-        let Some((required_indices, rewritten_expr)) =
-            pushdown_columns(self.expr, self.file_schema, self.table_schema)?
+        let Some(required_indices_into_table_schema) =
+            pushdown_columns(&self.expr, &self.table_schema)?
         else {
             return Ok(None);
         };
 
-        let required_bytes = size_of_columns(&required_indices, metadata)?;
-        let can_use_index = columns_sorted(&required_indices, metadata)?;
+        let projected_table_schema = Arc::new(
+            self.table_schema
+                .project(&required_indices_into_table_schema)?,
+        );
+
+        let (schema_mapper, projection_into_file_schema) = self
+            .schema_adapter_factory
+            .create(Arc::clone(&projected_table_schema), self.table_schema)
+            .map_schema(&self.file_schema)?;
+
+        let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?;
+        let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?;
 
         Ok(Some(FilterCandidate {
-            expr: rewritten_expr,
+            expr: self.expr,
             required_bytes,
             can_use_index,
-            projection: required_indices.into_iter().collect(),
+            projection: projection_into_file_schema,
+            schema_mapper: Arc::clone(&schema_mapper),
+            filter_schema: Arc::clone(&projected_table_schema),
         }))
     }
 }
@@ -294,33 +300,29 @@ struct PushdownChecker<'schema> {
     /// Does the expression reference any columns that are in the table
     /// schema but not in the file schema?
     projected_columns: bool,
-    // the indices of all the columns found within the given expression which exist inside the given
-    // [`file_schema`]
-    required_column_indices: BTreeSet<usize>,
-    file_schema: &'schema Schema,
+    // Indices into the table schema of the columns required to evaluate the expression
+    required_columns: BTreeSet<usize>,
     table_schema: &'schema Schema,
 }
 
 impl<'schema> PushdownChecker<'schema> {
-    fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> Self {
+    fn new(table_schema: &'schema Schema) -> Self {
         Self {
             non_primitive_columns: false,
             projected_columns: false,
-            required_column_indices: BTreeSet::default(),
-            file_schema,
+            required_columns: BTreeSet::default(),
             table_schema,
         }
     }
 
     fn check_single_column(&mut self, column_name: &str) -> Option<TreeNodeRecursion> {
-        if let Ok(idx) = self.file_schema.index_of(column_name) {
-            self.required_column_indices.insert(idx);
-
-            if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+        if let Ok(idx) = self.table_schema.index_of(column_name) {
+            self.required_columns.insert(idx);
+            if DataType::is_nested(self.table_schema.field(idx).data_type()) {
                 self.non_primitive_columns = true;
                 return Some(TreeNodeRecursion::Jump);
             }
-        } else if self.table_schema.index_of(column_name).is_err() {
+        } else {
             // If the column does not exist in the (un-projected) table schema then
             // it must be a projected column.
             self.projected_columns = true;
@@ -336,82 +338,40 @@ impl<'schema> PushdownChecker<'schema> {
     }
 }
 
-impl TreeNodeRewriter for PushdownChecker<'_> {
+impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
     type Node = Arc<dyn PhysicalExpr>;
 
-    fn f_down(
-        &mut self,
-        node: Arc<dyn PhysicalExpr>,
-    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+    fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
         if let Some(column) = node.as_any().downcast_ref::<Column>() {
             if let Some(recursion) = self.check_single_column(column.name()) {
-                return Ok(Transformed::new(node, false, recursion));
+                return Ok(recursion);
             }
         }
 
-        Ok(Transformed::no(node))
-    }
-
-    /// After visiting all children, rewrite column references to nulls if
-    /// they are not in the file schema.
-    /// We do this because they won't be relevant if they're not in the file schema, since that's
-    /// the only thing we're dealing with here as this is only used for the parquet pushdown during
-    /// scanning
-    fn f_up(
-        &mut self,
-        expr: Arc<dyn PhysicalExpr>,
-    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
-        if let Some(column) = expr.as_any().downcast_ref::<Column>() {
-            // if the expression is a column, is it in the file schema?
-            if self.file_schema.field_with_name(column.name()).is_err() {
-                return self
-                    .table_schema
-                    .field_with_name(column.name())
-                    .and_then(|field| {
-                        // Replace the column reference with a NULL (using the type from the table schema)
-                        // e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'`
-                        //
-                        // See comments on `FilterCandidateBuilder` for more information
-                        let null_value = ScalarValue::try_from(field.data_type())?;
-                        Ok(Transformed::yes(Arc::new(Literal::new(null_value)) as _))
-                    })
-                    // If the column is not in the table schema, should throw the error
-                    .map_err(|e| arrow_datafusion_err!(e));
-            }
-        }
-
-        Ok(Transformed::no(expr))
+        Ok(TreeNodeRecursion::Continue)
     }
 }
 
-type ProjectionAndExpr = (BTreeSet<usize>, Arc<dyn PhysicalExpr>);
-
 // Checks if a given expression can be pushed down into `DataSourceExec` as opposed to being evaluated
 // post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns all the
 // columns in the given expression so that they can be used in the parquet scanning, along with the
 // expression rewritten as defined in [`PushdownChecker::f_up`]
 fn pushdown_columns(
-    expr: Arc<dyn PhysicalExpr>,
-    file_schema: &Schema,
+    expr: &Arc<dyn PhysicalExpr>,
     table_schema: &Schema,
-) -> Result<Option<ProjectionAndExpr>> {
-    let mut checker = PushdownChecker::new(file_schema, table_schema);
-
-    let expr = expr.rewrite(&mut checker).data()?;
-
-    Ok((!checker.prevents_pushdown()).then_some((checker.required_column_indices, expr)))
+) -> Result<Option<Vec<usize>>> {
+    let mut checker = PushdownChecker::new(table_schema);
+    expr.visit(&mut checker)?;
+    Ok((!checker.prevents_pushdown())
+        .then_some(checker.required_columns.into_iter().collect()))
 }
 
 /// creates a PushdownChecker for a single use to check a given column with the given schemes. Used
 /// to check preemptively if a column name would prevent pushdowning.
 /// effectively does the inverse of [`pushdown_columns`] does, but with a single given column
 /// (instead of traversing the entire tree to determine this)
-fn would_column_prevent_pushdown(
-    column_name: &str,
-    file_schema: &Schema,
-    table_schema: &Schema,
-) -> bool {
-    let mut checker = PushdownChecker::new(file_schema, table_schema);
+fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bool {
+    let mut checker = PushdownChecker::new(table_schema);
 
     // the return of this is only used for [`PushdownChecker::f_down()`], so we can safely ignore
     // it here. I'm just verifying we know the return type of this so nobody accidentally changes
@@ -427,14 +387,13 @@ fn would_column_prevent_pushdown(
 /// Otherwise, true.
 pub fn can_expr_be_pushed_down_with_schemas(
     expr: &datafusion_expr::Expr,
-    file_schema: &Schema,
+    _file_schema: &Schema,
     table_schema: &Schema,
 ) -> bool {
     let mut can_be_pushed = true;
     expr.apply(|expr| match expr {
         datafusion_expr::Expr::Column(column) => {
-            can_be_pushed &=
-                !would_column_prevent_pushdown(column.name(), file_schema, table_schema);
+            can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema);
             Ok(if can_be_pushed {
                 TreeNodeRecursion::Jump
             } else {
@@ -447,41 +406,12 @@ pub fn can_expr_be_pushed_down_with_schemas(
     can_be_pushed
 }
 
-/// Computes the projection required to go from the file's schema order to the projected
-/// order expected by this filter
-///
-/// Effectively this computes the rank of each element in `src`
-fn remap_projection(src: &[usize]) -> Vec<usize> {
-    let len = src.len();
-
-    // Compute the column mapping from projected order to file order
-    // i.e. the indices required to sort projected schema into the file schema
-    //
-    // e.g. projection: [5, 9, 0] -> [2, 0, 1]
-    let mut sorted_indexes: Vec<_> = (0..len).collect();
-    sorted_indexes.sort_unstable_by_key(|x| src[*x]);
-
-    // Compute the mapping from schema order to projected order
-    // i.e. the indices required to sort file schema into the projected schema
-    //
-    // Above we computed the order of the projected schema according to the file
-    // schema, and so we can use this as the comparator
-    //
-    // e.g. sorted_indexes [2, 0, 1] -> [1, 2, 0]
-    let mut projection: Vec<_> = (0..len).collect();
-    projection.sort_unstable_by_key(|x| sorted_indexes[*x]);
-    projection
-}
-
 /// Calculate the total compressed size of all `Column`'s required for
 /// predicate `Expr`.
 ///
 /// This value represents the total amount of IO required to evaluate the
 /// predicate.
-fn size_of_columns(
-    columns: &BTreeSet<usize>,
-    metadata: &ParquetMetaData,
-) -> Result<usize> {
+fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
     let mut total_size = 0;
     let row_groups = metadata.row_groups();
     for idx in columns {
@@ -498,10 +428,7 @@ fn size_of_columns(
 ///
 /// Sorted columns may be queried more efficiently in the presence of
 /// a PageIndex.
-fn columns_sorted(
-    _columns: &BTreeSet<usize>,
-    _metadata: &ParquetMetaData,
-) -> Result<bool> {
+fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
     // TODO How do we know this?
     Ok(false)
 }
@@ -522,12 +449,12 @@ fn columns_sorted(
 /// `a = 1` and `c = 3`.
 pub fn build_row_filter(
     expr: &Arc<dyn PhysicalExpr>,
-    file_schema: &Schema,
-    table_schema: &Schema,
+    file_schema: &SchemaRef,
+    table_schema: &SchemaRef,
     metadata: &ParquetMetaData,
     reorder_predicates: bool,
     file_metrics: &ParquetFileMetrics,
-    schema_mapping: Arc<dyn SchemaMapper>,
+    schema_adapter_factory: &Arc<dyn SchemaAdapterFactory>,
 ) -> Result<Option<RowFilter>> {
     let rows_pruned = &file_metrics.pushdown_rows_pruned;
     let rows_matched = &file_metrics.pushdown_rows_matched;
@@ -541,8 +468,13 @@ pub fn build_row_filter(
     let mut candidates: Vec<FilterCandidate> = predicates
         .into_iter()
         .map(|expr| {
-            FilterCandidateBuilder::new(Arc::clone(expr), file_schema, table_schema)
-                .build(metadata)
+            FilterCandidateBuilder::new(
+                Arc::clone(expr),
+                Arc::clone(file_schema),
+                Arc::clone(table_schema),
+                Arc::clone(schema_adapter_factory),
+            )
+            .build(metadata)
         })
         .collect::<Result<Vec<_>, _>>()?
         .into_iter()
@@ -568,12 +500,10 @@ pub fn build_row_filter(
         .map(|candidate| {
             DatafusionArrowPredicate::try_new(
                 candidate,
-                file_schema,
                 metadata,
                 rows_pruned.clone(),
                 rows_matched.clone(),
                 time.clone(),
-                Arc::clone(&schema_mapping),
             )
             .map(|pred| Box::new(pred) as _)
         })
@@ -584,19 +514,17 @@ pub fn build_row_filter(
 #[cfg(test)]
 mod test {
     use super::*;
-    use datafusion_datasource::schema_adapter::{
-        DefaultSchemaAdapterFactory, SchemaAdapterFactory,
-    };
+    use datafusion_common::ScalarValue;
 
     use arrow::datatypes::{Field, Fields, TimeUnit::Nanosecond};
-    use datafusion_expr::{cast, col, lit, Expr};
+    use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
+    use datafusion_expr::{col, Expr};
     use datafusion_physical_expr::planner::logical2physical;
     use datafusion_physical_plan::metrics::{Count, Time};
 
     use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
     use parquet::arrow::parquet_to_arrow_schema;
     use parquet::file::reader::{FileReader, SerializedFileReader};
-    use rand::prelude::*;
 
     // We should ignore predicate that read non-primitive columns
     #[test]
@@ -616,51 +544,19 @@ mod test {
         let expr = col("int64_list").is_not_null();
         let expr = logical2physical(&expr, &table_schema);
 
-        let candidate = FilterCandidateBuilder::new(expr, &table_schema, &table_schema)
-            .build(metadata)
-            .expect("building candidate");
-
-        assert!(candidate.is_none());
-    }
-
-    // If a column exists in the table schema but not the file schema it should be rewritten to a null expression
-    #[test]
-    fn test_filter_candidate_builder_rewrite_missing_column() {
-        let testdata = datafusion_common::test_util::parquet_test_data();
-        let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
-            .expect("opening file");
-
-        let reader = SerializedFileReader::new(file).expect("creating reader");
-
-        let metadata = reader.metadata();
-
-        let table_schema =
-            parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
-                .expect("parsing schema");
-
-        let file_schema = Schema::new(vec![
-            Field::new("bigint_col", DataType::Int64, true),
-            Field::new("float_col", DataType::Float32, true),
-        ]);
-
-        // The parquet file with `file_schema` just has `bigint_col` and `float_col` column, and don't have the `int_col`
-        let expr = col("bigint_col").eq(cast(col("int_col"), DataType::Int64));
-        let expr = logical2physical(&expr, &table_schema);
-        let expected_candidate_expr =
-            col("bigint_col").eq(cast(lit(ScalarValue::Int32(None)), DataType::Int64));
-        let expected_candidate_expr =
-            logical2physical(&expected_candidate_expr, &table_schema);
-
-        let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
-            .build(metadata)
-            .expect("building candidate");
+        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+        let table_schema = Arc::new(table_schema.clone());
 
-        assert!(candidate.is_some());
+        let candidate = FilterCandidateBuilder::new(
+            expr,
+            table_schema.clone(),
+            table_schema,
+            schema_adapter_factory,
+        )
+        .build(metadata)
+        .expect("building candidate");
 
-        assert_eq!(
-            candidate.unwrap().expr.to_string(),
-            expected_candidate_expr.to_string()
-        );
+        assert!(candidate.is_none());
     }
 
     #[test]
@@ -682,42 +578,43 @@ mod test {
             false,
         )]);
 
-        let table_ref = Arc::new(table_schema.clone());
-        let schema_adapter =
-            DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref), table_ref);
-        let (schema_mapping, _) = schema_adapter
-            .map_schema(&file_schema)
-            .expect("creating schema mapping");
-
-        let mut parquet_reader = parquet_reader_builder.build().expect("building reader");
-
-        // Parquet file is small, we only need 1 record batch
-        let first_rb = parquet_reader
-            .next()
-            .expect("expected record batch")
-            .expect("expected error free record batch");
-
         // Test all should fail
         let expr = col("timestamp_col").lt(Expr::Literal(
             ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
         ));
         let expr = logical2physical(&expr, &table_schema);
-        let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
-            .build(&metadata)
-            .expect("building candidate")
-            .expect("candidate expected");
+        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+        let table_schema = Arc::new(table_schema.clone());
+        let candidate = FilterCandidateBuilder::new(
+            expr,
+            file_schema.clone(),
+            table_schema.clone(),
+            schema_adapter_factory,
+        )
+        .build(&metadata)
+        .expect("building candidate")
+        .expect("candidate expected");
 
         let mut row_filter = DatafusionArrowPredicate::try_new(
             candidate,
-            &file_schema,
             &metadata,
             Count::new(),
             Count::new(),
             Time::new(),
-            Arc::clone(&schema_mapping),
         )
         .expect("creating filter predicate");
 
+        let mut parquet_reader = parquet_reader_builder
+            .with_projection(row_filter.projection().clone())
+            .build()
+            .expect("building reader");
+
+        // Parquet file is small, we only need 1 record batch
+        let first_rb = parquet_reader
+            .next()
+            .expect("expected record batch")
+            .expect("expected error free record batch");
+
         let filtered = row_filter.evaluate(first_rb.clone());
         assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8])));
 
@@ -726,19 +623,23 @@ mod test {
             ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
         ));
         let expr = logical2physical(&expr, &table_schema);
-        let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
-            .build(&metadata)
-            .expect("building candidate")
-            .expect("candidate expected");
+        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+        let candidate = FilterCandidateBuilder::new(
+            expr,
+            file_schema,
+            table_schema,
+            schema_adapter_factory,
+        )
+        .build(&metadata)
+        .expect("building candidate")
+        .expect("candidate expected");
 
         let mut row_filter = DatafusionArrowPredicate::try_new(
             candidate,
-            &file_schema,
             &metadata,
             Count::new(),
             Count::new(),
             Time::new(),
-            schema_mapping,
         )
         .expect("creating filter predicate");
 
@@ -746,24 +647,6 @@ mod test {
         assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
     }
 
-    #[test]
-    fn test_remap_projection() {
-        let mut rng = thread_rng();
-        for _ in 0..100 {
-            // A random selection of column indexes in arbitrary order
-            let projection: Vec<_> = (0..100).map(|_| rng.gen()).collect();
-
-            // File order is the projection sorted
-            let mut file_order = projection.clone();
-            file_order.sort_unstable();
-
-            let remap = remap_projection(&projection);
-            // Applying the remapped projection to the file order should yield the original
-            let remapped: Vec<_> = remap.iter().map(|r| file_order[*r]).collect();
-            assert_eq!(projection, remapped)
-        }
-    }
-
     #[test]
     fn nested_data_structures_prevent_pushdown() {
         let table_schema = get_basic_table_schema();
@@ -803,9 +686,10 @@ mod test {
     fn basic_expr_doesnt_prevent_pushdown() {
         let table_schema = get_basic_table_schema();
 
-        let file_schema = Schema::new(vec![Field::new("str_col", DataType::Utf8, true)]);
+        let file_schema =
+            Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]);
 
-        let expr = col("str_col").is_null();
+        let expr = col("string_col").is_null();
 
         assert!(can_expr_be_pushed_down_with_schemas(
             &expr,
@@ -819,13 +703,13 @@ mod test {
         let table_schema = get_basic_table_schema();
 
         let file_schema = Schema::new(vec![
-            Field::new("str_col", DataType::Utf8, true),
-            Field::new("int_col", DataType::UInt64, true),
+            Field::new("string_col", DataType::Utf8, true),
+            Field::new("bigint_col", DataType::Int64, true),
         ]);
 
-        let expr = col("str_col")
+        let expr = col("string_col")
             .is_not_null()
-            .or(col("int_col").gt(Expr::Literal(ScalarValue::UInt64(Some(5)))));
+            .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)))));
 
         assert!(can_expr_be_pushed_down_with_schemas(
             &expr,
diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs
index e3a4ea4918c1..4164cda8cba1 100644
--- a/datafusion/datasource/src/schema_adapter.rs
+++ b/datafusion/datasource/src/schema_adapter.rs
@@ -96,19 +96,6 @@ pub trait SchemaAdapter: Send + Sync {
 pub trait SchemaMapper: Debug + Send + Sync {
     /// Adapts a `RecordBatch` to match the `table_schema`
     fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
-
-    /// Adapts a [`RecordBatch`] that does not have all the columns from the
-    /// file schema.
-    ///
-    /// This method is used, for example,  when applying a filter to a subset of
-    /// the columns as part of `DataFusionArrowPredicate` when `filter_pushdown`
-    /// is enabled.
-    ///
-    /// This method is slower than `map_batch` as it looks up columns by name.
-    fn map_partial_batch(
-        &self,
-        batch: RecordBatch,
-    ) -> datafusion_common::Result<RecordBatch>;
 }
 
 /// Default  [`SchemaAdapterFactory`] for mapping schemas.
@@ -215,11 +202,10 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
     fn create(
         &self,
         projected_table_schema: SchemaRef,
-        table_schema: SchemaRef,
+        _table_schema: SchemaRef,
     ) -> Box<dyn SchemaAdapter> {
         Box::new(DefaultSchemaAdapter {
             projected_table_schema,
-            table_schema,
         })
     }
 }
@@ -231,12 +217,6 @@ pub(crate) struct DefaultSchemaAdapter {
     /// The schema for the table, projected to include only the fields being output (projected) by the
     /// associated ParquetSource
     projected_table_schema: SchemaRef,
-    /// The entire table schema for the table we're using this to adapt.
-    ///
-    /// This is used to evaluate any filters pushed down into the scan
-    /// which may refer to columns that are not referred to anywhere
-    /// else in the plan.
-    table_schema: SchemaRef,
 }
 
 impl SchemaAdapter for DefaultSchemaAdapter {
@@ -290,7 +270,6 @@ impl SchemaAdapter for DefaultSchemaAdapter {
             Arc::new(SchemaMapping {
                 projected_table_schema: Arc::clone(&self.projected_table_schema),
                 field_mappings,
-                table_schema: Arc::clone(&self.table_schema),
             }),
             projection,
         ))
@@ -300,27 +279,12 @@ impl SchemaAdapter for DefaultSchemaAdapter {
 /// The SchemaMapping struct holds a mapping from the file schema to the table
 /// schema and any necessary type conversions.
 ///
-/// Note, because `map_batch` and `map_partial_batch` functions have different
-/// needs, this struct holds two schemas:
-///
-/// 1. The projected **table** schema
-/// 2. The full table schema
-///
 /// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
 /// has the projected schema, since that's the schema which is supposed to come
 /// out of the execution of this query. Thus `map_batch` uses
 /// `projected_table_schema` as it can only operate on the projected fields.
 ///
-/// [`map_partial_batch`]  is used to create a RecordBatch with a schema that
-/// can be used for Parquet predicate pushdown, meaning that it may contain
-/// fields which are not in the projected schema (as the fields that parquet
-/// pushdown filters operate can be completely distinct from the fields that are
-/// projected (output) out of the ParquetSource). `map_partial_batch` thus uses
-/// `table_schema` to create the resulting RecordBatch (as it could be operating
-/// on any fields in the schema).
-///
 /// [`map_batch`]: Self::map_batch
-/// [`map_partial_batch`]: Self::map_partial_batch
 #[derive(Debug)]
 pub struct SchemaMapping {
     /// The schema of the table. This is the expected schema after conversion
@@ -332,18 +296,12 @@ pub struct SchemaMapping {
     /// They are Options instead of just plain `usize`s because the table could
     /// have fields that don't exist in the file.
     field_mappings: Vec<Option<usize>>,
-    /// The entire table schema, as opposed to the projected_table_schema (which
-    /// only contains the columns that we are projecting out of this query).
-    /// This contains all fields in the table, regardless of if they will be
-    /// projected out or not.
-    table_schema: SchemaRef,
 }
 
 impl SchemaMapper for SchemaMapping {
     /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
-    /// conversions. The produced RecordBatch has a schema that contains only the projected
-    /// columns, so if one needs a RecordBatch with a schema that references columns which are not
-    /// in the projected, it would be better to use `map_partial_batch`
+    /// conversions.
+    /// The produced RecordBatch has a schema that contains only the projected columns.
     fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
         let batch_rows = batch.num_rows();
         let batch_cols = batch.columns().to_vec();
@@ -376,54 +334,4 @@ impl SchemaMapper for SchemaMapping {
         let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
         Ok(record_batch)
     }
-
-    /// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only
-    /// contains the fields that exist in both the file schema and table schema.
-    ///
-    /// Unlike `map_batch` this method also preserves the columns that
-    /// may not appear in the final output (`projected_table_schema`) but may
-    /// appear in push down predicates
-    fn map_partial_batch(
-        &self,
-        batch: RecordBatch,
-    ) -> datafusion_common::Result<RecordBatch> {
-        let batch_cols = batch.columns().to_vec();
-        let schema = batch.schema();
-
-        // for each field in the batch's schema (which is based on a file, not a table)...
-        let (cols, fields) = schema
-            .fields()
-            .iter()
-            .zip(batch_cols.iter())
-            .flat_map(|(field, batch_col)| {
-                self.table_schema
-                    // try to get the same field from the table schema that we have stored in self
-                    .field_with_name(field.name())
-                    // and if we don't have it, that's fine, ignore it. This may occur when we've
-                    // created an external table whose fields are a subset of the fields in this
-                    // file, then tried to read data from the file into this table. If that is the
-                    // case here, it's fine to ignore because we don't care about this field
-                    // anyways
-                    .ok()
-                    // but if we do have it,
-                    .map(|table_field| {
-                        // try to cast it into the correct output type. we don't want to ignore this
-                        // error, though, so it's propagated.
-                        cast(batch_col, table_field.data_type())
-                            // and if that works, return the field and column.
-                            .map(|new_col| (new_col, table_field.clone()))
-                    })
-            })
-            .collect::<Result<Vec<_>, _>>()?
-            .into_iter()
-            .unzip::<_, _, Vec<_>, Vec<_>>();
-
-        // Necessary to handle empty batches
-        let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
-
-        let schema =
-            Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
-        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
-        Ok(record_batch)
-    }
 }