Skip to content

Commit 5131850

Browse files
Abdullahsab3rtyler
authored andcommitted
Rust API documentation
Signed-off-by: Abdullahsab3 <[email protected]>
1 parent a70559e commit 5131850

13 files changed

+966
-314
lines changed

docs/src/rust/check_constraints.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
66
// --8<-- [start:add_constraint]
77
let table = deltalake::open_table("../rust/tests/data/simple_table").await?;
88
let ops = DeltaOps(table);
9-
ops.with_constraint("id_gt_0", "id > 0").await?;
9+
ops.add_constraint().with_constraint("id_gt_0", "id > 0").await?;
1010
// --8<-- [end:add_constraint]
1111

1212
// --8<-- [start:add_data]
13-
let table = deltalake::open_table("../rust/tests/data/simple_table").await?;
14-
let schema = table.get_state().arrow_schema()?;
13+
let mut table = deltalake::open_table("../rust/tests/data/simple_table").await?;
14+
let schema = table.snapshot()?.arrow_schema()?;
1515
let invalid_values: Vec<Arc<dyn Array>> = vec![
1616
Arc::new(Int32Array::from(vec![-10]))
1717
];
1818
let batch = RecordBatch::try_new(schema, invalid_values)?;
19-
table.write(vec![batch]).await?;
19+
let mut writer = RecordBatchWriter::for_table(&table)?;
20+
writer.write(batch).await?;
21+
writer.flush_and_commit(&mut table).await?;
2022
// --8<-- [end:add_data]
2123

2224
Ok(())

docs/src/rust/read_cdf.rs

+26-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,37 @@
11
#[tokio::main]
22
async fn main() -> Result<(), Box<dyn std::error::Error>> {
33

4-
let table = deltalake::open_table("../rust/tests/data/cdf-table").await?;
4+
let table = deltalake::open_table("tmp/some-table").await?;
5+
let ctx = SessionContext::new();
56
let ops = DeltaOps(table);
6-
let cdf = ops.load_cdf()
7+
let cdf = ops
8+
.load_cdf()
79
.with_starting_version(0)
810
.with_ending_version(4)
911
.build()
1012
.await?;
1113

12-
arrow_cast::pretty::print_batches(&cdf)?;
14+
let batches = collect_batches(
15+
cdf.properties().output_partitioning().partition_count(),
16+
&cdf,
17+
ctx,
18+
).await?;
19+
arrow_cast::pretty::print_batches(&batches)?;
20+
1321

1422
Ok(())
15-
}
23+
}
24+
25+
async fn collect_batches(
26+
num_partitions: usize,
27+
stream: &impl ExecutionPlan,
28+
ctx: SessionContext,
29+
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
30+
let mut batches = vec![];
31+
for p in 0..num_partitions {
32+
let data: Vec<RecordBatch> =
33+
collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?;
34+
batches.extend_from_slice(&data);
35+
}
36+
Ok(batches)
37+
}

docs/usage/appending-overwriting-delta-lake-table.md

+58-14
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,30 @@ Suppose you have a Delta table with the following contents:
1818

1919
Append two additional rows of data to the table:
2020

21-
```python
22-
from deltalake import write_deltalake, DeltaTable
23-
24-
df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]})
25-
write_deltalake("tmp/some-table", df, mode="append")
26-
```
21+
=== "Python"
22+
23+
```python
24+
from deltalake import write_deltalake, DeltaTable
25+
26+
df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]})
27+
write_deltalake("tmp/some-table", df, mode="append")
28+
```
29+
30+
=== "Rust"
31+
```rust
32+
let table = open_table("tmp/some-table").await?;
33+
DeltaOps(table).write(RecordBatch::try_new(
34+
Arc::new(Schema::new(vec![
35+
Field::new("num", DataType::Int32, false),
36+
Field::new("letter", DataType::Utf8, false),
37+
])),
38+
vec![
39+
Arc::new(Int32Array::from(vec![8, 9])),
40+
Arc::new(StringArray::from(vec![
41+
"dd", "ee"
42+
])),
43+
])).with_save_mode(SaveMode::Append).await?;
44+
```
2745

2846
Here are the updated contents of the Delta table:
2947

@@ -44,12 +62,27 @@ Now let's see how to perform an overwrite transaction.
4462
## Delta Lake overwrite transactions
4563

4664
Now let's see how to overwrite the exisitng Delta table.
47-
48-
```python
49-
df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]})
50-
write_deltalake("tmp/some-table", df, mode="overwrite")
51-
```
52-
65+
=== "Python"
66+
```python
67+
df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]})
68+
write_deltalake("tmp/some-table", df, mode="overwrite")
69+
```
70+
71+
=== "Rust"
72+
```rust
73+
let table = open_table("tmp/some-table").await?;
74+
DeltaOps(table).write(RecordBatch::try_new(
75+
Arc::new(Schema::new(vec![
76+
Field::new("num", DataType::Int32, false),
77+
Field::new("letter", DataType::Utf8, false),
78+
])),
79+
vec![
80+
Arc::new(Int32Array::from(vec![1, 2, 3])),
81+
Arc::new(StringArray::from(vec![
82+
"a", "b", "c",
83+
])),
84+
])).with_save_mode(SaveMode::Overwrite).await?;
85+
```
5386
Here are the contents of the Delta table after the overwrite operation:
5487

5588
```
@@ -63,9 +96,20 @@ Here are the contents of the Delta table after the overwrite operation:
6396

6497
Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable.
6598

66-
```python
67-
dt = DeltaTable("tmp/some-table", version=1)
99+
=== "Python"
68100

101+
```python
102+
dt = DeltaTable("tmp/some-table", version=1)
103+
```
104+
105+
=== "Rust"
106+
```rust
107+
let mut table = open_table("tmp/some-table").await?;
108+
table.load_version(1).await?;
109+
```
110+
111+
112+
```
69113
+-------+----------+
70114
| num | letter |
71115
|-------+----------|

docs/usage/create-delta-lake-table.md

+47
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,53 @@ You can easily write a DataFrame to a Delta table.
2424
df.write_delta("tmp/some-table")
2525
```
2626

27+
=== "Rust"
28+
29+
```rust
30+
let delta_ops = DeltaOps::try_from_uri("tmp/some-table").await?;
31+
let mut table = delta_ops
32+
.create()
33+
.with_table_name("some-table")
34+
.with_save_mode(SaveMode::Overwrite)
35+
.with_columns(
36+
StructType::new(vec![
37+
StructField::new(
38+
"num".to_string(),
39+
DataType::Primitive(PrimitiveType::Integer),
40+
true,
41+
),
42+
StructField::new(
43+
"letter".to_string(),
44+
DataType::Primitive(PrimitiveType::String),
45+
true,
46+
),
47+
])
48+
.fields()
49+
.cloned(),
50+
)
51+
.await?;
52+
53+
let mut record_batch_writer =
54+
deltalake::writer::RecordBatchWriter::for_table(&mut table)?;
55+
record_batch_writer
56+
.write(
57+
RecordBatch::try_new(
58+
Arc::new(Schema::new(vec![
59+
Field::new("num", DataType::Int32, true),
60+
Field::new("letter", Utf8, true),
61+
])),
62+
vec![
63+
Arc::new(Int32Array::from(vec![1, 2, 3])),
64+
Arc::new(StringArray::from(vec![
65+
"a", "b", "c",
66+
])),
67+
],
68+
)?,
69+
)
70+
.await?;
71+
record_batch_writer.flush_and_commit(&mut table).await?;
72+
```
73+
2774
Here are the contents of the Delta table in storage:
2875

2976
```

docs/usage/deleting-rows-from-delta-lake-table.md

+23-5
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,29 @@ Suppose you have the following Delta table with four rows:
1717

1818
Here's how to delete all the rows where the `num` is greater than 2:
1919

20-
```python
21-
dt = DeltaTable("tmp/my-table")
22-
dt.delete("num > 2")
23-
```
24-
20+
=== "Python"
21+
22+
```python
23+
dt = DeltaTable("tmp/my-table")
24+
dt.delete("num > 2")
25+
```
26+
27+
=== "Rust"
28+
```rust
29+
let table = deltalake::open_table("./data/simple_table").await?;
30+
let (table, delete_metrics) = DeltaOps(table)
31+
.delete()
32+
.with_predicate(col("num").gt(lit(2)))
33+
.await?;
34+
```
35+
`with_predicate` expects an argument that can be translated to a Datafusion `Expression`. This can be either using the Dataframe API, or using a `SQL where` clause:
36+
```rust
37+
let table = deltalake::open_table("./data/simple_table").await?;
38+
let (table, delete_metrics) = DeltaOps(table)
39+
.delete()
40+
.with_predicate("num > 2")
41+
.await?;
42+
```
2543
Here are the contents of the Delta table after the delete operation has been performed:
2644

2745
```

0 commit comments

Comments
 (0)