Skip to content

Commit c1addd5

Browse files
committed
feat: add execute_update example for rust
1 parent 27fbc94 commit c1addd5

File tree

2 files changed

+54
-21
lines changed

2 files changed

+54
-21
lines changed

rust/bin/main.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,34 @@ async fn main() -> Result<()> {
117117
// +---------------------------+-----+-------+------+
118118
print_batches(&result);
119119

120+
// Closes the prepared statement to notify releasing resources on server side.
121+
client.close_prepared(prepared_stmt).await?;
122+
123+
// There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete.
124+
// This interface directly returns the affected rows which might be convenient for some use cases.
125+
//
126+
// Note, Datalayers does not support Update and the development for Delete is in progress.
127+
sql = r#"
128+
INSERT INTO rust.demo (ts, sid, value, flag) VALUES
129+
('2024-09-03T10:00:00+08:00', 1, 4.5, 0),
130+
('2024-09-03T10:05:00+08:00', 2, 11.6, 1);
131+
"#;
132+
let affected_rows = client.execute_update(sql).await?;
133+
// The output should be:
134+
// Affected rows: 2
135+
println!("Affected rows: {}", affected_rows);
136+
137+
// Checks that the data are inserted successfully.
138+
sql = "SELECT * FROM rust.demo where ts >= '2024-09-03T10:00:00+08:00'";
139+
result = client.execute(sql).await?;
140+
// The result should be:
141+
// +---------------------------+-----+-------+------+
142+
// | ts | sid | value | flag |
143+
// +---------------------------+-----+-------+------+
144+
// | 2024-09-03T10:00:00+08:00 | 1 | 4.5 | 0 |
145+
// | 2024-09-03T10:05:00+08:00 | 2 | 11.6 | 1 |
146+
// +---------------------------+-----+-------+------+
147+
print_batches(&result);
148+
120149
Ok(())
121150
}

rust/src/client.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl Client {
6464
.handshake(&config.username, &config.password)
6565
.await
6666
.inspect_err(|e| {
67-
println!("Failed to do handshake: {}", filter_message(&e.to_string()));
67+
println!("{}", filter_message(&e.to_string()));
6868
exit(1)
6969
});
7070

@@ -83,10 +83,7 @@ impl Client {
8383
.execute(sql.to_string(), None)
8484
.await
8585
.inspect_err(|e| {
86-
println!(
87-
"Failed to execute a sql: {}",
88-
filter_message(&e.to_string())
89-
);
86+
println!("{}", filter_message(&e.to_string()));
9087
exit(1)
9188
})?;
9289
let ticket = flight_info
@@ -100,16 +97,25 @@ impl Client {
10097
Ok(batches)
10198
}
10299

100+
pub async fn execute_update(&mut self, sql: &str) -> Result<i64> {
101+
let affected_rows = self
102+
.inner
103+
.execute_update(sql.to_string(), None)
104+
.await
105+
.inspect_err(|e| {
106+
println!("{}", filter_message(&e.to_string()));
107+
exit(1)
108+
})?;
109+
Ok(affected_rows)
110+
}
111+
103112
pub async fn prepare(&mut self, sql: &str) -> Result<PreparedStatement<Channel>> {
104113
let prepared_stmt = self
105114
.inner
106115
.prepare(sql.to_string(), None)
107116
.await
108117
.inspect_err(|e| {
109-
println!(
110-
"Failed to execute a sql: {}",
111-
filter_message(&e.to_string())
112-
);
118+
println!("{}", filter_message(&e.to_string()));
113119
exit(1)
114120
})?;
115121
Ok(prepared_stmt)
@@ -124,10 +130,7 @@ impl Client {
124130
.set_parameters(binding)
125131
.context("Failed to bind a record batch to the prepared statement")?;
126132
let flight_info = prepared_stmt.execute().await.inspect_err(|e| {
127-
println!(
128-
"Failed to execute the prepared statement: {}",
129-
filter_message(&e.to_string())
130-
);
133+
println!("{}", filter_message(&e.to_string()));
131134
exit(1)
132135
})?;
133136
let ticket = flight_info
@@ -141,19 +144,20 @@ impl Client {
141144
Ok(batches)
142145
}
143146

147+
pub async fn close_prepared(&self, prepared_stmt: PreparedStatement<Channel>) -> Result<()> {
148+
prepared_stmt
149+
.close()
150+
.await
151+
.context("Failed to close a prepared statement")
152+
}
153+
144154
async fn do_get(&mut self, ticket: Ticket) -> Result<Vec<RecordBatch>> {
145155
let stream = self.inner.do_get(ticket).await.inspect_err(|e| {
146-
println!(
147-
"Failed to perform do_get: {}",
148-
filter_message(&e.to_string())
149-
);
156+
println!("{}", filter_message(&e.to_string()));
150157
exit(1)
151158
})?;
152159
let batches = stream.try_collect::<Vec<_>>().await.inspect_err(|e| {
153-
println!(
154-
"Failed to consume flight record batch stream: {}",
155-
filter_message(&e.to_string())
156-
);
160+
println!("{}", filter_message(&e.to_string()));
157161
exit(1)
158162
})?;
159163
if batches.is_empty() {

0 commit comments

Comments
 (0)