Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core)!: implement write returns metadata #5562

Merged
merged 18 commits into from
Feb 18, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add one chaos test for block_write and multipart_write
meteorgan committed Feb 17, 2025
commit ee8e2e54192e4a7f1b9023d1da156da2d4e32eaf
49 changes: 48 additions & 1 deletion core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
@@ -275,7 +275,18 @@ mod tests {
}

impl BlockWrite for Arc<Mutex<TestWrite>> {
async fn write_once(&self, _: u64, _: Buffer) -> Result<Metadata> {
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
sleep(Duration::from_nanos(50)).await;

if thread_rng().gen_bool(1.0 / 10.0) {
return Err(
Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
);
}

let mut this = self.lock().unwrap();
this.length = size;
this.content = Some(body);
Ok(Metadata::default())
}

@@ -355,4 +366,40 @@ mod tests {
"content must be the same"
);
}

#[tokio::test]
async fn test_block_writer_with_retry_when_write_once_error() {
let mut rng = thread_rng();

for _ in 1..100 {
let mut w = BlockWriter::new(TestWrite::new(), Some(Executor::new()), 8);

let size = rng.gen_range(1..1024);
let mut bs = vec![0; size];
rng.fill_bytes(&mut bs);

loop {
match w.write(bs.clone().into()).await {
Ok(_) => break,
Err(_) => continue,
}
}

loop {
match w.close().await {
Ok(_) => break,
Err(_) => continue,
}
}

let inner = w.w.lock().unwrap();
assert_eq!(size as u64, inner.length, "length must be the same");
assert!(inner.content.is_some());
assert_eq!(
bs,
inner.content.clone().unwrap().to_bytes(),
"content must be the same"
);
}
}
}
47 changes: 45 additions & 2 deletions core/src/raw/oio/write/multipart_write.rs
Original file line number Diff line number Diff line change
@@ -324,6 +324,7 @@ mod tests {
upload_id: String,
part_numbers: Vec<usize>,
length: u64,
content: Option<Buffer>,
}

impl TestWrite {
@@ -332,15 +333,26 @@ mod tests {
upload_id: uuid::Uuid::new_v4().to_string(),
part_numbers: Vec::new(),
length: 0,
content: None,
};

Arc::new(Mutex::new(v))
}
}

impl MultipartWrite for Arc<Mutex<TestWrite>> {
async fn write_once(&self, size: u64, _: Buffer) -> Result<Metadata> {
self.lock().await.length += size;
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
sleep(Duration::from_nanos(50)).await;

if thread_rng().gen_bool(1.0 / 10.0) {
return Err(
Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
);
}

let mut this = self.lock().await;
this.length = size;
this.content = Some(body);
Ok(Metadata::default().with_content_length(size))
}

@@ -473,4 +485,35 @@ mod tests {
let actual_size = w.w.lock().await.length;
assert_eq!(actual_size, total_size);
}

#[tokio::test]
async fn test_multipart_writer_with_retry_when_write_once_error() {
let mut rng = thread_rng();

for _ in 0..100 {
let mut w = MultipartWriter::new(TestWrite::new(), None, 200);
let size = rng.gen_range(1..1024);
let mut bs = vec![0; size];
rng.fill_bytes(&mut bs);

loop {
match w.write(bs.clone().into()).await {
Ok(_) => break,
Err(_) => continue,
}
}

loop {
match w.close().await {
Ok(_) => break,
Err(_) => continue,
}
}

let inner = w.w.lock().await;
assert_eq!(inner.length, size as u64);
assert!(inner.content.is_some());
assert_eq!(inner.content.clone().unwrap().to_bytes(), bs);
}
}
}