From c9e83bdd004e4426e4f77c47dbae75b2b7d7a58e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 14 Dec 2019 19:07:21 +0100 Subject: [PATCH] make it work with zfs 0.8 The parent of this commit was based on ZoL 0.8-rcX some time in Feb-April 2019. Since then, some changes happend to the zfs send code that broke the parent commit. Changes made in this commit: - Handle FREEOBJECTS(firstobj=X, numobjs=u64max-X) Apparently, this wasn't a thing in April 19, cannot remember exactly. - fixup from_ivset_guid field when squashing => f00ab3f22cc2c7f62cfd56be842945667b1d558f (from_ivset_guid was added to stream format) - 'show' subcommand to dump LSM's internal state - cargo fmt --- bindings/build.rs | 7 +- bindings/src/lib.rs | 2 - zquash/src/dmu_stream.rs | 74 +++++++++++++- zquash/src/fletcher4.rs | 1 - zquash/src/lib.rs | 2 +- zquash/src/lsm/lsm.rs | 8 +- zquash/src/lsm/lsm_srv.rs | 173 ++++++++++++++++++++++----------- zquash/src/lsm/object_merge.rs | 5 +- zquash/src/main.rs | 24 ++++- zquash/src/split_tree.rs | 1 - 10 files changed, 216 insertions(+), 81 deletions(-) diff --git a/bindings/build.rs b/bindings/build.rs index 105ad8a..0bb4930 100644 --- a/bindings/build.rs +++ b/bindings/build.rs @@ -8,7 +8,12 @@ fn main() { let helpers = Path::new(&dir).join("./helpers"); // compile helpers lib - cc::Build::new().file(helpers.join("helpers.c")).flag("-Werror").flag("-Wall").flag("-Wpedantic").compile("helpers"); + cc::Build::new() + .file(helpers.join("helpers.c")) + .flag("-Werror") + .flag("-Wall") + .flag("-Wpedantic") + .compile("helpers"); let clang_args = [format!("-I{}", helpers.display())]; let bindings = bindgen::Builder::default() diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs index 61d438c..f63532a 100644 --- a/bindings/src/lib.rs +++ b/bindings/src/lib.rs @@ -1,5 +1,3 @@ - - mod generated; pub use generated::*; diff --git a/zquash/src/dmu_stream.rs b/zquash/src/dmu_stream.rs index b26a625..9708881 100644 --- a/zquash/src/dmu_stream.rs +++ b/zquash/src/dmu_stream.rs @@ -47,7 +47,6 @@ impl ReplayRecordExt for dmu_replay_record { } } - #[derive(Clone)] pub struct RecordWithPayload { pub drr: dmu_replay_record, @@ -64,6 +63,79 @@ impl fmt::Debug for RecordWithPayload { } } +#[derive(Debug)] +pub enum DrrBeginMutateNvlistError { + DecodeOuterNvlist, + LookupCryptKeydata, + MutationCallbackError(E), + PackingFailedMutationNotCommitted, +} + +impl RecordWithPayload { + pub unsafe fn drr_begin_mutate_crypt_keydata( + &mut self, + cb: F, + ) -> Result> + where + F: FnOnce(*mut nvpair_sys::nvlist_t) -> Result, + { + assert_eq!(self.drr.drr_type, dmu_replay_record_DRR_BEGIN); + + use const_cstr::const_cstr; + let buf = self.payload.as_mut_ptr(); + let mut deser: *mut nvpair_sys::nvlist_t = std::ptr::null_mut(); + if nvpair_sys::nvlist_unpack(buf as *mut i8, self.payload.len(), &mut deser, 0) != 0 { + return Err(DrrBeginMutateNvlistError::DecodeOuterNvlist); + } + + let mut crypt_keydata: *mut nvpair_sys::nvlist_t = std::ptr::null_mut(); + if nvpair_sys::nvlist_lookup_nvlist( + deser, + const_cstr!("crypt_keydata").as_ptr(), + &mut crypt_keydata, + ) != 0 + { + nvpair_sys::nvlist_free(deser); + return Err(DrrBeginMutateNvlistError::LookupCryptKeydata); + } + + let cbres = cb(crypt_keydata); + + if cbres.is_ok() { + // commit mutations by replacing self.payload with updated one + let mut packed_allocated_buf: *mut i8 = std::ptr::null_mut(); + let mut packed_allocated_buf_size: usize = 0; + if nvpair_sys::nvlist_pack( + deser, + &mut packed_allocated_buf, + &mut packed_allocated_buf_size, + nvpair_sys::NV_ENCODE_NATIVE, + 0, + ) != 0 + { + nvpair_sys::nvlist_free(deser); + return Err(DrrBeginMutateNvlistError::PackingFailedMutationNotCommitted); + } + let buf_as_slice = std::slice::from_raw_parts( + packed_allocated_buf as *const u8, + packed_allocated_buf_size, + ); + assert_eq!(self.payload.len(), self.drr.drr_payloadlen as usize); + self.payload = Vec::from(buf_as_slice); // copy + use std::convert::TryInto; + self.drr.drr_payloadlen = packed_allocated_buf_size.try_into().unwrap(); + libc::free(packed_allocated_buf as *mut std::ffi::c_void); + } + + nvpair_sys::nvlist_free(deser); + + match cbres { + Ok(o) => Ok(o), + Err(e) => Err(DrrBeginMutateNvlistError::MutationCallbackError(e)), + } + } +} + pub struct Record<'r> { pub header: dmu_replay_record, pub payload_len: u64, diff --git a/zquash/src/fletcher4.rs b/zquash/src/fletcher4.rs index 1558473..8a931d7 100644 --- a/zquash/src/fletcher4.rs +++ b/zquash/src/fletcher4.rs @@ -63,5 +63,4 @@ impl Fletcher4 { pub fn checksum(&self) -> [u64; 4] { [self.a, self.b, self.c, self.d] } - } diff --git a/zquash/src/lib.rs b/zquash/src/lib.rs index ce65c4b..05349fe 100644 --- a/zquash/src/lib.rs +++ b/zquash/src/lib.rs @@ -4,4 +4,4 @@ extern crate derive_more; pub mod dmu_stream; pub mod fletcher4; pub mod lsm; -pub mod split_tree; \ No newline at end of file +pub mod split_tree; diff --git a/zquash/src/lsm/lsm.rs b/zquash/src/lsm/lsm.rs index 3f1d7a6..e391f9e 100644 --- a/zquash/src/lsm/lsm.rs +++ b/zquash/src/lsm/lsm.rs @@ -1,5 +1,3 @@ - - extern crate bincode; extern crate serde; @@ -231,9 +229,7 @@ pub struct SortedLSMWriter { } impl SortedLSMWriter { - pub fn new( - path: &std::path::Path - ) -> SortedLSMWriter { + pub fn new(path: &std::path::Path) -> SortedLSMWriter { SortedLSMWriter { file: std::io::BufWriter::new(std::fs::File::create(path).unwrap()), last_entry: None, @@ -280,5 +276,3 @@ mod tests { std::fs::remove_file(&pathout); } } - - diff --git a/zquash/src/lsm/lsm_srv.rs b/zquash/src/lsm/lsm_srv.rs index 88b9354..fa65677 100644 --- a/zquash/src/lsm/lsm_srv.rs +++ b/zquash/src/lsm/lsm_srv.rs @@ -155,26 +155,23 @@ impl Iterator for ObjectRangeIter { let mut stream = self.stream.borrow_mut(); let (end_record, _) = stream.peek().unwrap(); - if end_record.drr_type == dmu_replay_record_DRR_END { - return None; - } - if end_record.drr_type == dmu_replay_record_DRR_FREEOBJECTS - && unsafe { - end_record.drr_u.drr_freeobjects.drr_firstobj > 0 - && end_record.drr_u.drr_freeobjects.drr_numobjs == 0 - } - { - // the trailing FREEOBJECTS record + assert!( + ObjectsWithinObjectRangeIterator::is_follow(&end_record), + "{:?}", + drr_debug(end_record) + ); + assert!( + !ObjectsWithinObjectRangeIterator::should_consume(end_record), + "{:?}", + drr_debug(end_record) + ); + if Self::is_follow(end_record) { return None; } + // INVARIANT: all follows of ObjectsWithinObjectRangeIterator::is_follow covered let (or_record, payload) = stream.next().expect("expecting a record"); - assert_eq!( - or_record.drr_type, - dmu_replay_record_DRR_OBJECT_RANGE, - "{:?}", - drr_debug(or_record) - ); + use itertools::Itertools; let object_range_drr = RecordWithPayload { drr: or_record, @@ -240,7 +237,7 @@ impl Iterator for ObjectsWithinObjectRangeIterator { } assert!( - self.should_consume(next_record) + Self::should_consume(next_record) " unexpected record type {:?}", drr_debug(next_record) @@ -261,6 +258,22 @@ impl Iterator for ObjectsWithinObjectRangeIterator { } } +impl ObjectRangeIter { + fn is_follow(drr: &dmu_replay_record) -> bool { + // END RECORD + if drr.drr_type == dmu_replay_record_DRR_END { + return true; + } + + // Trailing FREEOBJECTS + if LSMKey::is_trailing_freeobjects(drr) { + return true; + } + + return false; + } +} + impl ObjectsWithinObjectRangeIterator { // returns `true` iff drr is a follow-element of an object range stream, // i.e. the a stream element not consumed by this iterator @@ -270,20 +283,15 @@ impl ObjectsWithinObjectRangeIterator { return true; } - // Trailing FREEOBJECTS is not preceded by OBJECT_RANGE - if drr.drr_type == dmu_replay_record_DRR_FREEOBJECTS - && unsafe { - drr.drr_u.drr_freeobjects.drr_firstobj > 0 - && drr.drr_u.drr_freeobjects.drr_numobjs == 0 - } - { + // we are nested within ObjectRangeIter + if ObjectRangeIter::is_follow(drr) { return true; } return false; } - fn should_consume(&self, drr: &dmu_replay_record) -> bool { + fn should_consume(drr: &dmu_replay_record) -> bool { if drr.drr_type == dmu_replay_record_DRR_WRITE || drr.drr_type == dmu_replay_record_DRR_FREE { let _obj_id = unsafe { LSMKey(*drr).lower_obj_id() }.unwrap(); @@ -303,7 +311,7 @@ impl ObjectsWithinObjectRangeIterator { None } else { assert!( - self.should_consume(drr) + Self::should_consume(drr) " unexpected record type {:?}", drr_debug(drr) ); // FIXME @@ -406,6 +414,16 @@ unsafe fn symbolic_dump_consume_lsm_reader( } } +pub unsafe fn show(config: &LSMSrvConfig, loaded_stream: &str) { + let mut r = lsm::LSMReader::>::open(&sorted_stream_path( + config, + (*loaded_stream).to_owned(), + )); + for (k, _) in r { + println!("{:?}", drr_debug(&k.0)); + } +} + pub unsafe fn merge_streams( config: &LSMSrvConfig, streams_newest_to_oldest: &[&str], @@ -442,14 +460,36 @@ pub unsafe fn merge_streams( let begin = begins_oldest_to_newest.pop_front().unwrap(); let begin = begins_oldest_to_newest.into_iter().fold(begin, |mut b, r| { unsafe { + use const_cstr::const_cstr; assert_eq!(r.drr.drr_type, dmu_replay_record_DRR_BEGIN); assert_eq!( dbg!(&b).drr.drr_u.drr_begin.drr_toguid, dbg!(&r).drr.drr_u.drr_begin.drr_fromguid ); // TODO error let fromguid = b.drr.drr_u.drr_begin.drr_fromguid; + let from_ivset_guid = { + let mut from_ivset_guid: u64 = 0; + b.drr_begin_mutate_crypt_keydata(&mut |crypt_keydata| -> Result<(), ()> { + nvpair_sys::nvlist_lookup_uint64( + crypt_keydata, + const_cstr!("from_ivset_guid").as_ptr(), + &mut from_ivset_guid, + ); + Err(()) // abort mutation + }); + from_ivset_guid + }; b = r; b.drr.drr_u.drr_begin.drr_fromguid = fromguid; + b.drr_begin_mutate_crypt_keydata(&mut |crypt_keydata| -> Result<(), ()> { + nvpair_sys::nvlist_add_uint64( + crypt_keydata, + const_cstr!("from_ivset_guid").as_ptr(), + from_ivset_guid, + ); + Ok(()) + }) + .expect("mutate ivset guid"); b } }); @@ -682,6 +722,7 @@ pub unsafe fn merge_streams( }; r }; + dbg!(drr_debug(&free)); target.insert(LSMKey(free), Vec::new()); } End => panic!("merger emitted End record, unsupported"), @@ -749,6 +790,7 @@ use serde::{Deserialize, Serialize, Serializer}; /// FREEOBJECTS, OBJECT by object_id /// FREE, WRITE by (object_id, offset_in_object) /// FREEOBJECTS (firstobj=X, numobjs=0) +/// FREEOBJECTS (firstobj=X, numobjs=u64max-X) /// END #[derive(Serialize, Deserialize)] struct LSMKey(dmu_replay_record); @@ -797,6 +839,25 @@ impl LSMKey { _ => unimplemented!(), // TODO } } + + fn is_trailing_freeobjects(drr: &dmu_replay_record) -> bool { + if !drr.drr_type == dmu_replay_record_DRR_FREEOBJECTS { + return false; + } + unsafe { + let (firstobj, numobjs) = unsafe { + let fos = drr.drr_u.drr_freeobjects; + (fos.drr_firstobj, fos.drr_numobjs) + }; + if firstobj == 0 && numobjs == 0 { + return true; + } + if numobjs == std::u64::MAX - firstobj { + return true; + } + } + return false; + } } impl PartialEq for LSMKey { @@ -854,43 +915,39 @@ impl Ord for LSMKey { } // handle all special cases that do not fit into OBJECT_RANGE - // 1. FREEOBJECTS(firstobj=0, numobjs=0) - { - let is_freeobjects_0_0 = |drr: &dmu_replay_record| { - if drr.drr_type == dmu_replay_record_DRR_FREEOBJECTS - && unsafe { - drr.drr_u.drr_freeobjects.drr_firstobj == 0 - && drr.drr_u.drr_freeobjects.drr_numobjs == 0 + let order_for_records_after_last_object_range_and_its_frees_and_writes = + |drr: &dmu_replay_record| { + if drr.drr_type == dmu_replay_record_DRR_FREEOBJECTS { + let (firstobj, numobj) = { + let fos = drr.drr_u.drr_freeobjects; + (fos.drr_firstobj, fos.drr_numobjs) + }; + // FREEOBJECTS(firstobj=X, numobjs=0) + // < + // FREEOBJECTS (firstobj=X, numobjs=u64max-X) + if (numobj == 0) { + (1, 0) + } else if (numobj == std::u64::MAX - firstobj) { + (1, numobj) + } else { + (0, 0) // indeterminate } - { - 0 + } else if drr.drr_type == dmu_replay_record_DRR_END { + (2, 0) } else { - 1 + (0, 0) // indeterminate } }; - if is_freeobjects_0_0(&self.0).cmp(&is_freeobjects_0_0(&o.0)) != Ordering::Equal { - return is_freeobjects_0_0(&self.0).cmp(&is_freeobjects_0_0(&o.0)); - } else if is_freeobjects_0_0(&self.0) == 0 { - return Ordering::Equal; - } - } - // 1. FREEOBJECTS(firstobj=X, numobjs=0) { - let is_freeobjects_x_0 = |drr: &dmu_replay_record| { - if drr.drr_type == dmu_replay_record_DRR_FREEOBJECTS - && unsafe { drr.drr_u.drr_freeobjects.drr_numobjs == 0 } - { - let fo = drr.drr_u.drr_freeobjects.drr_firstobj; - assert!(fo > 0); // see below - fo - } else { - assert_ne!(drr.drr_type, dmu_replay_record_DRR_END); - // all other records lower than FREEOBJECTS(firstobj=X, numobjs=0) - 0 - } - }; - if is_freeobjects_x_0(&self.0).cmp(&is_freeobjects_x_0(&o.0)) != Ordering::Equal { - return is_freeobjects_x_0(&self.0).cmp(&is_freeobjects_x_0(&o.0)); + let cmp = + order_for_records_after_last_object_range_and_its_frees_and_writes(&self.0) + .cmp( + &order_for_records_after_last_object_range_and_its_frees_and_writes( + &o.0, + ), + ); + if cmp != Ordering::Equal { + return cmp; } } diff --git a/zquash/src/lsm/object_merge.rs b/zquash/src/lsm/object_merge.rs index 273e056..e0bb614 100644 --- a/zquash/src/lsm/object_merge.rs +++ b/zquash/src/lsm/object_merge.rs @@ -112,7 +112,6 @@ impl Knowledge { } } } - } } @@ -250,7 +249,6 @@ where out.push_back(dbg!(winner).clone()); Some(()) - } fn repeat_find_action(&mut self, out: &mut VecDeque>) { @@ -452,5 +450,4 @@ mod helper_tests { ] ); } - -} \ No newline at end of file +} diff --git a/zquash/src/main.rs b/zquash/src/main.rs index 2ebaadc..08f1080 100644 --- a/zquash/src/main.rs +++ b/zquash/src/main.rs @@ -37,10 +37,13 @@ enum Command { b_to_c: String, target: String, }, - write { + dump { stream: String, target: Option, }, + show { + stream: String, + }, } fn main() -> CliResult { @@ -84,7 +87,7 @@ fn main() -> CliResult { let cfg = lsm::LSMSrvConfig { root_dir: root }; unsafe { lsm::merge_streams(&cfg, &[a_to_b.as_ref(), b_to_c.as_ref()], target)? }; } - Command::write { stream, target } => { + Command::dump { stream, target } => { dotenv::dotenv().ok(); let root: PathBuf = std::env::var("ZS_LSM_ROOT") @@ -92,9 +95,9 @@ fn main() -> CliResult { .into(); let mut target: Box = match target { - Some(p) => { - Box::new(File::create(p.clone()).context(format!("create output file {:?}", p))?) - } + Some(p) => Box::new( + File::create(p.clone()).context(format!("create output file {:?}", p))?, + ), None => Box::new(std::io::stdout()), }; @@ -102,6 +105,17 @@ fn main() -> CliResult { unsafe { lsm::write_stream(&cfg, stream, &mut target) }?; } + Command::show { stream } => { + dotenv::dotenv().ok(); + + let root: PathBuf = std::env::var("ZS_LSM_ROOT") + .context("ZS_LSM_ROOT not set")? + .into(); + + let cfg = lsm::LSMSrvConfig { root_dir: root }; + + unsafe { lsm::show(&cfg, &stream) }; + } } Ok(()) diff --git a/zquash/src/split_tree.rs b/zquash/src/split_tree.rs index 6a8a313..17e1d6a 100644 --- a/zquash/src/split_tree.rs +++ b/zquash/src/split_tree.rs @@ -265,7 +265,6 @@ impl SplitTree { .last() .expect("always at least one split") } - } #[cfg(test)]