diff --git a/src/mariadb.rs b/src/mariadb.rs index ab8fc93..0c1bf2d 100644 --- a/src/mariadb.rs +++ b/src/mariadb.rs @@ -42,6 +42,7 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams { image: "mariadb", pre_args: "--ulimit nofile=65536:65536 -p 127.0.0.1:3306:3306 -e MARIADB_ROOT_PASSWORD=mariadb -e MARIADB_DATABASE=bench".to_string(), post_args: match options.optimised { + // Optimised configuration true => format!( "--max-connections=1024 \ --innodb-buffer-pool-size={buffer_pool_gb}G \ @@ -64,6 +65,10 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams { --innodb-adaptive-hash-index=ON \ --innodb-use-native-aio=1 \ --innodb-doublewrite=OFF \ + --log-bin=mariadb-bin \ + --binlog-format=ROW \ + --server-id=1 \ + --binlog-row-image=MINIMAL \ --sync_binlog={} \ --innodb-flush-log-at-trx-commit={}", if options.sync { @@ -77,8 +82,13 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams { "0" } ), + // Default configuration false => format!( "--max-connections=1024 \ + --log-bin=mariadb-bin \ + --binlog-format=ROW \ + --server-id=1 \ + --binlog-row-image=FULL \ --sync_binlog={} \ --innodb-flush-log-at-trx-commit={}", if options.sync { @@ -432,6 +442,56 @@ impl MariadbClient { } } + /// Helper function to execute a statement and verify the affected rows count + /// Retries on deadlock errors (MySQL/MariaDB error 1213) + async fn exec_and_verify( + &self, + stm: String, + params: Vec, + expected_count: usize, + operation: &str, + ) -> Result<()> { + const MAX_RETRIES: u32 = 5; + const INITIAL_BACKOFF_MS: u64 = 10; + + let mut attempt = 0; + loop { + let mut conn = self.conn.lock().await; + match conn.exec_iter(&stm, params.clone()).await { + Ok(result) => { + let affected = result.affected_rows(); + drop(result); + drop(conn); + + if affected != expected_count as u64 { + return Err(anyhow::anyhow!( + "{}: expected {} rows affected, got {}", + operation, + expected_count, + affected + )); + } + return Ok(()); + } + Err(e) => { + drop(conn); + + let error_msg = e.to_string(); + let is_deadlock = error_msg.contains("1213") || error_msg.contains("Deadlock"); + + if is_deadlock && attempt < MAX_RETRIES { + attempt += 1; + let backoff_ms = INITIAL_BACKOFF_MS * (1 << (attempt - 1)); + tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await; + continue; + } + + return Err(e.into()); + } + } + } + } + async fn batch_create(&self, key_vals: Vec<(T, Value)>) -> Result<()> where T: ToValue + Sync, @@ -439,6 +499,7 @@ impl MariadbClient { if key_vals.is_empty() { return Ok(()); } + let expected_count = key_vals.len(); let columns = self .columns .0 @@ -446,7 +507,7 @@ impl MariadbClient { .map(|(name, _)| MySqlDialect::escape_field(name.clone())) .collect::>() .join(", "); - let placeholders = (0..key_vals.len()) + let placeholders = (0..expected_count) .map(|_| { let value_placeholders = (0..self.columns.0.len()).map(|_| "?").collect::>().join(", "); @@ -478,12 +539,13 @@ impl MariadbClient { serde_json::to_string(v).unwrap().as_bytes().to_vec(), ), }); + } else { + return Err(anyhow::anyhow!("Missing value for column {}", name)); } } } } - let _: Vec = self.conn.lock().await.exec(stm, params).await?; - Ok(()) + self.exec_and_verify(stm, params, expected_count, "batch_create").await } async fn batch_read(&self, keys: Vec) -> Result<()> @@ -497,6 +559,7 @@ impl MariadbClient { let stm = format!("SELECT * FROM record WHERE id IN ({placeholders})"); let params: Vec = keys.iter().map(|k| k.to_value()).collect(); let res: Vec = self.conn.lock().await.exec(stm, params).await?; + assert_eq!(res.len(), keys.len()); for row in res { black_box(self.consume(row).unwrap()); } @@ -510,6 +573,7 @@ impl MariadbClient { if key_vals.is_empty() { return Ok(()); } + let expected_count = key_vals.len(); let columns = self .columns .0 @@ -551,14 +615,15 @@ impl MariadbClient { serde_json::to_string(v).unwrap().as_bytes().to_vec(), ), }); + } else { + return Err(anyhow::anyhow!("Missing value for column {}", name)); } } } for (key, _) in &key_vals { params.push(key.to_value()); } - let _: Vec = self.conn.lock().await.exec(stm, params).await?; - Ok(()) + self.exec_and_verify(stm, params, expected_count, "batch_update").await } async fn batch_delete(&self, keys: Vec) -> Result<()> @@ -568,10 +633,10 @@ impl MariadbClient { if keys.is_empty() { return Ok(()); } + let expected_count = keys.len(); let placeholders = keys.iter().map(|_| "?").collect::>().join(", "); let stm = format!("DELETE FROM record WHERE id IN ({placeholders})"); let params: Vec = keys.iter().map(|k| k.to_value()).collect(); - let _: Vec = self.conn.lock().await.exec(stm, params).await?; - Ok(()) + self.exec_and_verify(stm, params, expected_count, "batch_delete").await } } diff --git a/src/mysql.rs b/src/mysql.rs index 0948684..b08c145 100644 --- a/src/mysql.rs +++ b/src/mysql.rs @@ -25,17 +25,20 @@ fn calculate_mysql_memory() -> (u64, u64, u64) { let memory = Config::new(); // Use ~100% of recommended cache allocation let buffer_pool_gb = memory.cache_gb; - // Use ~10% of buffer pool, min 1GB, max 8GB - let log_file_gb = (memory.cache_gb / 10).clamp(1, 8); + // Use ~10% of buffer pool for redo log capacity, min 1GB, max 8GB + // Note: In MySQL 8.0.30+, innodb_log_file_size was replaced with innodb_redo_log_capacity + let redo_log_gb = (memory.cache_gb / 10).clamp(1, 8); // Use 1 buffer pool instance per 2GB, max 64 let buffer_pool_instances = (buffer_pool_gb / 2).clamp(1, 64); // Return configuration - (buffer_pool_gb, log_file_gb, buffer_pool_instances) + (buffer_pool_gb, redo_log_gb, buffer_pool_instances) } +/// Returns the Docker parameters required to run a MySQL instance for benchmarking, +/// with configuration optimized based on the provided benchmark options. pub(crate) fn docker(options: &Benchmark) -> DockerParams { // Calculate memory allocation - let (buffer_pool_gb, log_file_gb, buffer_pool_instances) = calculate_mysql_memory(); + let (buffer_pool_gb, redo_log_gb, buffer_pool_instances) = calculate_mysql_memory(); // Return Docker parameters DockerParams { image: "mysql", @@ -46,7 +49,7 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams { "--max-connections=1024 \ --innodb-buffer-pool-size={buffer_pool_gb}G \ --innodb-buffer-pool-instances={buffer_pool_instances} \ - --innodb-log-file-size={log_file_gb}G \ + --innodb-redo-log-capacity={redo_log_gb}G \ --innodb-log-buffer-size=256M \ --innodb-flush-method=O_DIRECT \ --innodb-io-capacity=2000 \ @@ -61,7 +64,10 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams { --join-buffer-size=32M \ --tmp-table-size=1G \ --max-heap-table-size=1G \ - --query-cache-size=0 \ + --log-bin=mysql-bin \ + --binlog-format=ROW \ + --server-id=1 \ + --binlog-row-image=MINIMAL \ --sync_binlog={} \ --innodb-flush-log-at-trx-commit={}", if options.sync { @@ -78,6 +84,10 @@ pub(crate) fn docker(options: &Benchmark) -> DockerParams { // Default configuration false => format!( "--max-connections=1024 \ + --log-bin=mysql-bin \ + --binlog-format=ROW \ + --server-id=1 \ + --binlog-row-image=FULL \ --sync_binlog={} \ --innodb-flush-log-at-trx-commit={}", if options.sync { @@ -281,7 +291,6 @@ impl BenchmarkClient for MysqlClient { impl MysqlClient { fn consume(&self, mut row: Row) -> Result { let mut val: Map = Map::new(); - // for (i, c) in row.columns().iter().enumerate() { val.insert( c.name_str().to_string(), @@ -432,6 +441,56 @@ impl MysqlClient { } } + /// Helper function to execute a statement and verify the affected rows count + /// Retries on deadlock errors (MySQL error 1213) + async fn exec_and_verify( + &self, + stm: String, + params: Vec, + expected_count: usize, + operation: &str, + ) -> Result<()> { + const MAX_RETRIES: u32 = 5; + const INITIAL_BACKOFF_MS: u64 = 10; + + let mut attempt = 0; + loop { + let mut conn = self.conn.lock().await; + match conn.exec_iter(&stm, params.clone()).await { + Ok(result) => { + let affected = result.affected_rows(); + drop(result); + drop(conn); + + if affected != expected_count as u64 { + return Err(anyhow::anyhow!( + "{}: expected {} rows affected, got {}", + operation, + expected_count, + affected + )); + } + return Ok(()); + } + Err(e) => { + drop(conn); + + let error_msg = e.to_string(); + let is_deadlock = error_msg.contains("1213") || error_msg.contains("Deadlock"); + + if is_deadlock && attempt < MAX_RETRIES { + attempt += 1; + let backoff_ms = INITIAL_BACKOFF_MS * (1 << (attempt - 1)); + tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await; + continue; + } + + return Err(e.into()); + } + } + } + } + async fn batch_create(&self, key_vals: Vec<(T, Value)>) -> Result<()> where T: ToValue + Sync, @@ -439,6 +498,7 @@ impl MysqlClient { if key_vals.is_empty() { return Ok(()); } + let expected_count = key_vals.len(); let columns = self .columns .0 @@ -446,7 +506,7 @@ impl MysqlClient { .map(|(name, _)| MySqlDialect::escape_field(name.clone())) .collect::>() .join(", "); - let placeholders = (0..key_vals.len()) + let placeholders = (0..expected_count) .map(|_| { let value_placeholders = (0..self.columns.0.len()).map(|_| "?").collect::>().join(", "); @@ -478,12 +538,13 @@ impl MysqlClient { serde_json::to_string(v).unwrap().as_bytes().to_vec(), ), }); + } else { + return Err(anyhow::anyhow!("Missing value for column {}", name)); } } } } - let _: Vec = self.conn.lock().await.exec(stm, params).await?; - Ok(()) + self.exec_and_verify(stm, params, expected_count, "batch_create").await } async fn batch_read(&self, keys: Vec) -> Result<()> @@ -497,6 +558,7 @@ impl MysqlClient { let stm = format!("SELECT * FROM record WHERE id IN ({placeholders})"); let params: Vec = keys.iter().map(|k| k.to_value()).collect(); let res: Vec = self.conn.lock().await.exec(stm, params).await?; + assert_eq!(res.len(), keys.len()); for row in res { black_box(self.consume(row).unwrap()); } @@ -510,6 +572,7 @@ impl MysqlClient { if key_vals.is_empty() { return Ok(()); } + let expected_count = key_vals.len(); let columns = self .columns .0 @@ -531,34 +594,35 @@ impl MysqlClient { for (name, _) in &self.columns.0 { for (key, val) in &key_vals { params.push(key.to_value()); - if let Value::Object(map) = val - && let Some(v) = map.get(name) - { - params.push(match v { - Value::Null => mysql_async::Value::NULL, - Value::Bool(b) => mysql_async::Value::Int(*b as i64), - Value::Number(n) => { - if let Some(i) = n.as_i64() { - mysql_async::Value::Int(i) - } else if let Some(f) = n.as_f64() { - mysql_async::Value::Double(f) - } else { - mysql_async::Value::NULL + if let Value::Object(map) = val { + if let Some(v) = map.get(name) { + params.push(match v { + Value::Null => mysql_async::Value::NULL, + Value::Bool(b) => mysql_async::Value::Int(*b as i64), + Value::Number(n) => { + if let Some(i) = n.as_i64() { + mysql_async::Value::Int(i) + } else if let Some(f) = n.as_f64() { + mysql_async::Value::Double(f) + } else { + mysql_async::Value::NULL + } } - } - Value::String(s) => mysql_async::Value::Bytes(s.as_bytes().to_vec()), - Value::Array(_) | Value::Object(_) => mysql_async::Value::Bytes( - serde_json::to_string(v).unwrap().as_bytes().to_vec(), - ), - }); + Value::String(s) => mysql_async::Value::Bytes(s.as_bytes().to_vec()), + Value::Array(_) | Value::Object(_) => mysql_async::Value::Bytes( + serde_json::to_string(v).unwrap().as_bytes().to_vec(), + ), + }); + } else { + return Err(anyhow::anyhow!("Missing value for column {}", name)); + } } } } for (key, _) in &key_vals { params.push(key.to_value()); } - let _: Vec = self.conn.lock().await.exec(stm, params).await?; - Ok(()) + self.exec_and_verify(stm, params, expected_count, "batch_update").await } async fn batch_delete(&self, keys: Vec) -> Result<()> @@ -568,10 +632,10 @@ impl MysqlClient { if keys.is_empty() { return Ok(()); } + let expected_count = keys.len(); let placeholders = keys.iter().map(|_| "?").collect::>().join(", "); let stm = format!("DELETE FROM record WHERE id IN ({placeholders})"); let params: Vec = keys.iter().map(|k| k.to_value()).collect(); - let _: Vec = self.conn.lock().await.exec(stm, params).await?; - Ok(()) + self.exec_and_verify(stm, params, expected_count, "batch_delete").await } }