Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 72 additions & 7 deletions src/mariadb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams {
image: "mariadb",
pre_args: "--ulimit nofile=65536:65536 -p 127.0.0.1:3306:3306 -e MARIADB_ROOT_PASSWORD=mariadb -e MARIADB_DATABASE=bench".to_string(),
post_args: match options.optimised {
// Optimised configuration
true => format!(
"--max-connections=1024 \
--innodb-buffer-pool-size={buffer_pool_gb}G \
Expand All @@ -64,6 +65,10 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams {
--innodb-adaptive-hash-index=ON \
--innodb-use-native-aio=1 \
--innodb-doublewrite=OFF \
--log-bin=mariadb-bin \
--binlog-format=ROW \
--server-id=1 \
--binlog-row-image=MINIMAL \
--sync_binlog={} \
--innodb-flush-log-at-trx-commit={}",
if options.sync {
Expand All @@ -77,8 +82,13 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams {
"0"
}
),
// Default configuration
false => format!(
"--max-connections=1024 \
--log-bin=mariadb-bin \
--binlog-format=ROW \
--server-id=1 \
--binlog-row-image=FULL \
--sync_binlog={} \
--innodb-flush-log-at-trx-commit={}",
if options.sync {
Expand Down Expand Up @@ -432,21 +442,72 @@ impl MariadbClient {
}
}

/// Helper function to execute a statement and verify the affected rows count
/// Retries on deadlock errors (MySQL/MariaDB error 1213)
async fn exec_and_verify(
&self,
stm: String,
params: Vec<mysql_async::Value>,
expected_count: usize,
operation: &str,
) -> Result<()> {
const MAX_RETRIES: u32 = 5;
const INITIAL_BACKOFF_MS: u64 = 10;

let mut attempt = 0;
loop {
let mut conn = self.conn.lock().await;
match conn.exec_iter(&stm, params.clone()).await {
Ok(result) => {
let affected = result.affected_rows();
drop(result);
drop(conn);

if affected != expected_count as u64 {
return Err(anyhow::anyhow!(
"{}: expected {} rows affected, got {}",
operation,
expected_count,
affected
));
}
return Ok(());
}
Err(e) => {
drop(conn);

let error_msg = e.to_string();
let is_deadlock = error_msg.contains("1213") || error_msg.contains("Deadlock");

if is_deadlock && attempt < MAX_RETRIES {
attempt += 1;
let backoff_ms = INITIAL_BACKOFF_MS * (1 << (attempt - 1));
tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
continue;
}

return Err(e.into());
}
}
}
}

async fn batch_create<T>(&self, key_vals: Vec<(T, Value)>) -> Result<()>
where
T: ToValue + Sync,
{
if key_vals.is_empty() {
return Ok(());
}
let expected_count = key_vals.len();
let columns = self
.columns
.0
.iter()
.map(|(name, _)| MySqlDialect::escape_field(name.clone()))
.collect::<Vec<String>>()
.join(", ");
let placeholders = (0..key_vals.len())
let placeholders = (0..expected_count)
.map(|_| {
let value_placeholders =
(0..self.columns.0.len()).map(|_| "?").collect::<Vec<_>>().join(", ");
Expand Down Expand Up @@ -478,12 +539,13 @@ impl MariadbClient {
serde_json::to_string(v).unwrap().as_bytes().to_vec(),
),
});
} else {
return Err(anyhow::anyhow!("Missing value for column {}", name));
}
}
}
}
let _: Vec<Row> = self.conn.lock().await.exec(stm, params).await?;
Ok(())
self.exec_and_verify(stm, params, expected_count, "batch_create").await
}

async fn batch_read<T>(&self, keys: Vec<T>) -> Result<()>
Expand All @@ -497,6 +559,7 @@ impl MariadbClient {
let stm = format!("SELECT * FROM record WHERE id IN ({placeholders})");
let params: Vec<mysql_async::Value> = keys.iter().map(|k| k.to_value()).collect();
let res: Vec<Row> = self.conn.lock().await.exec(stm, params).await?;
assert_eq!(res.len(), keys.len());
for row in res {
black_box(self.consume(row).unwrap());
}
Expand All @@ -510,6 +573,7 @@ impl MariadbClient {
if key_vals.is_empty() {
return Ok(());
}
let expected_count = key_vals.len();
let columns = self
.columns
.0
Expand Down Expand Up @@ -551,14 +615,15 @@ impl MariadbClient {
serde_json::to_string(v).unwrap().as_bytes().to_vec(),
),
});
} else {
return Err(anyhow::anyhow!("Missing value for column {}", name));
}
}
}
for (key, _) in &key_vals {
params.push(key.to_value());
}
let _: Vec<Row> = self.conn.lock().await.exec(stm, params).await?;
Ok(())
self.exec_and_verify(stm, params, expected_count, "batch_update").await
}

async fn batch_delete<T>(&self, keys: Vec<T>) -> Result<()>
Expand All @@ -568,10 +633,10 @@ impl MariadbClient {
if keys.is_empty() {
return Ok(());
}
let expected_count = keys.len();
let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let stm = format!("DELETE FROM record WHERE id IN ({placeholders})");
let params: Vec<mysql_async::Value> = keys.iter().map(|k| k.to_value()).collect();
let _: Vec<Row> = self.conn.lock().await.exec(stm, params).await?;
Ok(())
self.exec_and_verify(stm, params, expected_count, "batch_delete").await
}
}
Loading
Loading