From f9d8c930cb18259b29575232f695b0598ef169ca Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 7 Jan 2025 15:45:59 +0800 Subject: [PATCH 1/7] feat(storage): Splitting table change log from HummockVersion on CN side --- src/storage/hummock_sdk/src/change_log.rs | 4 + .../compaction_group/hummock_version_ext.rs | 32 ++-- src/storage/hummock_sdk/src/time_travel.rs | 5 +- src/storage/hummock_sdk/src/version.rs | 156 ++++++++++++++---- .../event_handler/hummock_event_handler.rs | 44 ++++- .../hummock/local_version/pinned_version.rs | 60 ++++++- src/storage/src/hummock/store/version.rs | 12 +- 7 files changed, 250 insertions(+), 63 deletions(-) diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 5a7bf0143c764..ad752c0965827 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -51,6 +51,10 @@ impl TableChangeLogCommon { } self.0.push_back(new_change_log); } + + pub fn change_log_into_iter(self) -> impl Iterator> { + self.0.into_iter() + } } pub type TableChangeLog = TableChangeLogCommon; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ee316f75ffd65..04900a7822418 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -36,7 +36,7 @@ use crate::level::{Level, LevelCommon, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ - GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta, + GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon, HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader, }; @@ -50,7 +50,7 @@ pub struct SstDeltaInfo { pub type BranchedSstInfo = HashMap>; -impl HummockVersion { +impl HummockVersionCommon { pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels { self.levels .get(&compaction_group_id) @@ -187,7 +187,7 @@ pub fn safe_epoch_read_table_watermarks_impl( .collect() } -impl HummockVersion { +impl HummockVersionCommon { pub fn count_new_ssts_in_group_split( &self, parent_group_id: CompactionGroupId, @@ -356,7 +356,10 @@ impl HummockVersion { .all(|level| !level.table_infos.is_empty())); } - pub fn build_sst_delta_infos(&self, version_delta: &HummockVersionDelta) -> Vec { + pub fn build_sst_delta_infos( + &self, + version_delta: &HummockVersionDeltaCommon, + ) -> Vec { let mut infos = vec![]; // Skip trivial move delta for refiller @@ -459,7 +462,10 @@ impl HummockVersion { infos } - pub fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) { + pub fn apply_version_delta( + &mut self, + version_delta: &HummockVersionDeltaCommon, + ) { assert_eq!(self.id, version_delta.prev_id); let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta( @@ -930,16 +936,10 @@ impl HummockVersion { } } -impl HummockVersionCommon +impl HummockVersionCommon where T: SstableIdReader + ObjectIdReader, { - pub fn get_combined_levels(&self) -> impl Iterator> + '_ { - self.levels - .values() - .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) - } - pub fn get_object_ids(&self) -> HashSet { self.get_sst_infos().map(|s| s.object_id()).collect() } @@ -1094,6 +1094,14 @@ impl Levels { } } +impl HummockVersionCommon { + pub fn get_combined_levels(&self) -> impl Iterator> + '_ { + self.levels + .values() + .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) + } +} + pub fn build_initial_compaction_group_levels( group_id: CompactionGroupId, compaction_config: &CompactionConfig, diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 0a651260b3789..a974449928337 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -28,7 +28,7 @@ use crate::version::{ }; use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId}; -pub type IncompleteHummockVersion = HummockVersionCommon; +pub type IncompleteHummockVersion = HummockVersionCommon; /// Populates `SstableInfo` for `table_id`. /// `SstableInfo` not associated with `table_id` is removed. @@ -159,7 +159,8 @@ fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet; +pub type IncompleteHummockVersionDelta = + HummockVersionDeltaCommon; /// `SStableInfo` will be stripped. impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 09e96860cc839..70bee30145b2c 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -30,7 +30,7 @@ use risingwave_pb::hummock::{ }; use tracing::warn; -use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon}; +use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon}; use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels; use crate::compaction_group::StaticCompactionGroupId; use crate::level::LevelsCommon; @@ -217,17 +217,19 @@ impl HummockVersionStateTableInfo { } #[derive(Debug, Clone, PartialEq)] -pub struct HummockVersionCommon { +pub struct HummockVersionCommon { pub id: HummockVersionId, pub levels: HashMap>, #[deprecated] pub(crate) max_committed_epoch: u64, pub table_watermarks: HashMap>, - pub table_change_log: HashMap>, + pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, } -pub type HummockVersion = HummockVersionCommon; +pub type HummockVersion = HummockVersionCommon; + +pub type LocalHummockVersion = HummockVersionCommon; impl Default for HummockVersion { fn default() -> Self { @@ -235,7 +237,7 @@ impl Default for HummockVersion { } } -impl HummockVersionCommon +impl HummockVersionCommon where T: for<'a> From<&'a PbSstableInfo>, PbSstableInfo: for<'a> From<&'a T>, @@ -289,7 +291,7 @@ impl HummockVersion { } } -impl From<&PbHummockVersion> for HummockVersionCommon +impl From<&PbHummockVersion> for HummockVersionCommon where T: for<'a> From<&'a PbSstableInfo>, { @@ -332,11 +334,11 @@ where } } -impl From<&HummockVersionCommon> for PbHummockVersion +impl From<&HummockVersionCommon> for PbHummockVersion where PbSstableInfo: for<'a> From<&'a T>, { - fn from(version: &HummockVersionCommon) -> Self { + fn from(version: &HummockVersionCommon) -> Self { #[expect(deprecated)] Self { id: version.id.0, @@ -361,12 +363,12 @@ where } } -impl From> for PbHummockVersion +impl From> for PbHummockVersion where PbSstableInfo: From, PbSstableInfo: for<'a> From<&'a T>, { - fn from(version: HummockVersionCommon) -> Self { + fn from(version: HummockVersionCommon) -> Self { #[expect(deprecated)] Self { id: version.id.0, @@ -433,13 +435,6 @@ impl HummockVersion { } } - pub fn table_committed_epoch(&self, table_id: TableId) -> Option { - self.state_table_info - .info() - .get(&table_id) - .map(|info| info.committed_epoch) - } - pub fn create_init_version(default_compaction_config: Arc) -> HummockVersion { #[expect(deprecated)] let mut init_version = HummockVersion { @@ -478,8 +473,17 @@ impl HummockVersion { } } +impl HummockVersionCommon { + pub fn table_committed_epoch(&self, table_id: TableId) -> Option { + self.state_table_info + .info() + .get(&table_id) + .map(|info| info.committed_epoch) + } +} + #[derive(Debug, PartialEq, Clone)] -pub struct HummockVersionDeltaCommon { +pub struct HummockVersionDeltaCommon { pub id: HummockVersionId, pub prev_id: HummockVersionId, pub group_deltas: HashMap>, @@ -488,11 +492,13 @@ pub struct HummockVersionDeltaCommon { pub trivial_move: bool, pub new_table_watermarks: HashMap, pub removed_table_ids: HashSet, - pub change_log_delta: HashMap>, + pub change_log_delta: HashMap>, pub state_table_info_delta: HashMap, } -pub type HummockVersionDelta = HummockVersionDeltaCommon; +pub type HummockVersionDelta = HummockVersionDeltaCommon; + +pub type LocalHummockVersionDelta = HummockVersionDeltaCommon; impl Default for HummockVersionDelta { fn default() -> Self { @@ -500,7 +506,7 @@ impl Default for HummockVersionDelta { } } -impl HummockVersionDeltaCommon +impl HummockVersionDeltaCommon where T: for<'a> From<&'a PbSstableInfo>, PbSstableInfo: for<'a> From<&'a T>, @@ -530,7 +536,7 @@ pub trait ObjectIdReader { fn object_id(&self) -> HummockSstableObjectId; } -impl HummockVersionDeltaCommon +impl HummockVersionDeltaCommon where T: SstableIdReader + ObjectIdReader, { @@ -583,7 +589,7 @@ impl HummockVersionDelta { } } -impl From<&PbHummockVersionDelta> for HummockVersionDeltaCommon +impl From<&PbHummockVersionDelta> for HummockVersionDeltaCommon where T: for<'a> From<&'a PbSstableInfo>, { @@ -639,11 +645,11 @@ where } } -impl From<&HummockVersionDeltaCommon> for PbHummockVersionDelta +impl From<&HummockVersionDeltaCommon> for PbHummockVersionDelta where PbSstableInfo: for<'a> From<&'a T>, { - fn from(version_delta: &HummockVersionDeltaCommon) -> Self { + fn from(version_delta: &HummockVersionDeltaCommon) -> Self { #[expect(deprecated)] Self { id: version_delta.id.0, @@ -679,11 +685,11 @@ where } } -impl From> for PbHummockVersionDelta +impl From> for PbHummockVersionDelta where PbSstableInfo: From, { - fn from(version_delta: HummockVersionDeltaCommon) -> Self { + fn from(version_delta: HummockVersionDeltaCommon) -> Self { #[expect(deprecated)] Self { id: version_delta.id.0, @@ -719,7 +725,7 @@ where } } -impl From for HummockVersionDeltaCommon +impl From for HummockVersionDeltaCommon where T: From, { @@ -1095,3 +1101,97 @@ where self.into() } } + +impl From for LocalHummockVersionDelta { + #[expect(deprecated)] + fn from(delta: HummockVersionDelta) -> Self { + Self { + id: delta.id, + prev_id: delta.prev_id, + group_deltas: delta.group_deltas, + max_committed_epoch: delta.max_committed_epoch, + trivial_move: delta.trivial_move, + new_table_watermarks: delta.new_table_watermarks, + removed_table_ids: delta.removed_table_ids, + change_log_delta: delta + .change_log_delta + .into_iter() + .map(|(k, v)| { + ( + k, + ChangeLogDeltaCommon { + truncate_epoch: v.truncate_epoch, + new_log: v.new_log.map(|new_log| EpochNewChangeLogCommon { + epochs: new_log.epochs, + new_value: Vec::new(), + old_value: Vec::new(), + }), + }, + ) + }) + .collect(), + state_table_info_delta: delta.state_table_info_delta, + } + } +} + +impl From<&HummockVersionDelta> for LocalHummockVersionDelta { + #[expect(deprecated)] + fn from(delta: &HummockVersionDelta) -> Self { + Self { + id: delta.id, + prev_id: delta.prev_id, + group_deltas: delta.group_deltas.clone(), + max_committed_epoch: delta.max_committed_epoch, + trivial_move: delta.trivial_move, + new_table_watermarks: delta.new_table_watermarks.clone(), + removed_table_ids: delta.removed_table_ids.clone(), + change_log_delta: delta + .change_log_delta + .iter() + .map(|(k, v)| { + ( + *k, + ChangeLogDeltaCommon { + truncate_epoch: v.truncate_epoch, + new_log: v.new_log.clone().map(|new_log| EpochNewChangeLogCommon { + epochs: new_log.epochs, + new_value: Vec::new(), + old_value: Vec::new(), + }), + }, + ) + }) + .collect(), + state_table_info_delta: delta.state_table_info_delta.clone(), + } + } +} + +impl From for LocalHummockVersion { + #[expect(deprecated)] + fn from(version: HummockVersion) -> Self { + Self { + id: version.id, + levels: version.levels, + max_committed_epoch: version.max_committed_epoch, + table_watermarks: version.table_watermarks, + table_change_log: version + .table_change_log + .into_iter() + .map(|(k, v)| { + let epoch_new_change_logs: Vec> = v + .change_log_into_iter() + .map(|epoch_new_change_log| EpochNewChangeLogCommon { + epochs: epoch_new_change_log.epochs, + new_value: Vec::new(), + old_value: Vec::new(), + }) + .collect(); + (k, TableChangeLogCommon::new(epoch_new_change_logs)) + }) + .collect(), + state_table_info: version.state_table_info, + } + } +} diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index fb914b226d351..b93ec9bf53c8e 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -28,6 +28,8 @@ use prometheus::{Histogram, IntGauge}; use risingwave_common::catalog::TableId; use risingwave_common::metrics::UintGauge; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; +use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta}; use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; use tokio::spawn; use tokio::sync::mpsc::error::SendError; @@ -522,27 +524,51 @@ impl HummockEventHandler { version_payload: HummockVersionUpdate, mut sst_delta_infos: Option<&mut Vec>, ) -> Option { - let newly_pinned_version = match version_payload { + match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { - let mut version_to_apply = (*pinned_version).clone(); + let mut version_to_apply = pinned_version.version().clone(); + let mut table_change_log_to_apply = + pinned_version.table_change_log().read().clone(); for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); + + // apply change_log + { + let mut state_table_info = version_to_apply.state_table_info.clone(); + let (changed_table_info, _is_commit_epoch) = state_table_info.apply_delta( + &version_delta.state_table_info_delta, + &version_delta.removed_table_ids, + ); + + HummockVersionCommon::::apply_change_log_delta( + &mut table_change_log_to_apply, + &version_delta.change_log_delta, + &version_delta.removed_table_ids, + &version_delta.state_table_info_delta, + &changed_table_info, + ); + } + + let local_hummock_version_delta = LocalHummockVersionDelta::from(version_delta); if let Some(sst_delta_infos) = &mut sst_delta_infos { sst_delta_infos.extend( version_to_apply - .build_sst_delta_infos(version_delta) + .build_sst_delta_infos(&local_hummock_version_delta) .into_iter(), ); } - version_to_apply.apply_version_delta(version_delta); + version_to_apply.apply_version_delta(&local_hummock_version_delta); } - version_to_apply + pinned_version.new_pin_version_with_table_change_log( + version_to_apply, + table_change_log_to_apply, + ) } - HummockVersionUpdate::PinnedVersion(version) => *version, - }; - - pinned_version.new_pin_version(newly_pinned_version) + HummockVersionUpdate::PinnedVersion(version) => { + pinned_version.new_pin_version(*version) + } + } } fn apply_version_update( diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 25e25938398f3..6eebf8212d1a3 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -12,16 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::iter::empty; use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use auto_enums::auto_enum; +use parking_lot::RwLock; use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::change_log::TableChangeLogCommon; use risingwave_hummock_sdk::level::{Level, Levels}; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion}; use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; @@ -74,12 +77,13 @@ impl Drop for PinnedVersionGuard { #[derive(Clone)] pub struct PinnedVersion { - version: Arc, + version: Arc, guard: Arc, + table_change_log: Arc>>>, } impl Deref for PinnedVersion { - type Target = HummockVersion; + type Target = LocalHummockVersion; fn deref(&self) -> &Self::Target { &self.version @@ -88,20 +92,22 @@ impl Deref for PinnedVersion { impl PinnedVersion { pub fn new( - version: HummockVersion, + mut version: HummockVersion, pinned_version_manager_tx: UnboundedSender, ) -> Self { let version_id = version.id; + let t = std::mem::take(&mut version.table_change_log); PinnedVersion { - version: Arc::new(version), guard: Arc::new(PinnedVersionGuard::new( version_id, pinned_version_manager_tx, )), + table_change_log: Arc::new(RwLock::new(t)), + version: Arc::new(LocalHummockVersion::from(version)), } } - pub fn new_pin_version(&self, version: HummockVersion) -> Option { + pub fn new_pin_version(&self, mut version: HummockVersion) -> Option { assert!( version.id >= self.version.id, "pinning a older version {}. Current is {}", @@ -112,13 +118,41 @@ impl PinnedVersion { return None; } let version_id = version.id; + let t = std::mem::take(&mut version.table_change_log); Some(PinnedVersion { - version: Arc::new(version), guard: Arc::new(PinnedVersionGuard::new( version_id, self.guard.pinned_version_manager_tx.clone(), )), + table_change_log: Arc::new(RwLock::new(t)), + version: Arc::new(LocalHummockVersion::from(version)), + }) + } + + pub fn new_pin_version_with_table_change_log( + &self, + version: LocalHummockVersion, + table_change_log: HashMap>, + ) -> Option { + assert!( + version.id >= self.version.id, + "pinning a older version {}. Current is {}", + version.id, + self.version.id + ); + if version.id == self.version.id { + return None; + } + let version_id = version.id; + + Some(PinnedVersion { + guard: Arc::new(PinnedVersionGuard::new( + version_id, + self.guard.pinned_version_manager_tx.clone(), + )), + table_change_log: Arc::new(RwLock::new(table_change_log)), + version: Arc::new(version), }) } @@ -159,6 +193,16 @@ impl PinnedVersion { None => empty(), } } + + pub fn version(&self) -> &LocalHummockVersion { + &self.version + } + + pub fn table_change_log( + &self, + ) -> &Arc>>> { + &self.table_change_log + } } pub(crate) async fn start_pinned_version_worker( diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 8e0c7a589b203..98ca86bec7507 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -992,11 +992,15 @@ impl HummockVersionReader { key_range: TableKeyRange, options: ReadLogOptions, ) -> HummockResult { - let change_log = if let Some(change_log) = version.table_change_log.get(&options.table_id) { - change_log.filter_epoch(epoch_range).collect_vec() - } else { - Vec::new() + let change_log = { + let table_change_logs = version.table_change_log().read(); + if let Some(change_log) = table_change_logs.get(&options.table_id) { + change_log.filter_epoch(epoch_range).cloned().collect_vec() + } else { + Vec::new() + } }; + if let Some(max_epoch_change_log) = change_log.last() { let (_, max_epoch) = epoch_range; if !max_epoch_change_log.epochs.contains(&max_epoch) { From dd7a205572f2dc33e6a8a7d790ece026d05a35b6 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 7 Jan 2025 16:44:03 +0800 Subject: [PATCH 2/7] bug fix --- .../event_handler/hummock_event_handler.rs | 54 ++++++++++--------- .../hummock/local_version/pinned_version.rs | 10 +++- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index b93ec9bf53c8e..71a0b03bfa393 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -520,44 +520,48 @@ impl HummockEventHandler { } fn resolve_version_update_info( - pinned_version: PinnedVersion, + mut pinned_version: PinnedVersion, version_payload: HummockVersionUpdate, mut sst_delta_infos: Option<&mut Vec>, ) -> Option { match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { let mut version_to_apply = pinned_version.version().clone(); - let mut table_change_log_to_apply = - pinned_version.table_change_log().read().clone(); - for version_delta in &version_deltas { - assert_eq!(version_to_apply.id, version_delta.prev_id); - - // apply change_log - { - let mut state_table_info = version_to_apply.state_table_info.clone(); - let (changed_table_info, _is_commit_epoch) = state_table_info.apply_delta( - &version_delta.state_table_info_delta, - &version_delta.removed_table_ids, - ); - - HummockVersionCommon::::apply_change_log_delta( - &mut table_change_log_to_apply, + let table_change_log_to_apply = pinned_version.take_change_log(); + { + let mut table_change_log_to_apply_guard = table_change_log_to_apply.write(); + for version_delta in &version_deltas { + assert_eq!(version_to_apply.id, version_delta.prev_id); + + // apply change-log-delta + { + let mut state_table_info = version_to_apply.state_table_info.clone(); + let (changed_table_info, _is_commit_epoch) = state_table_info + .apply_delta( + &version_delta.state_table_info_delta, + &version_delta.removed_table_ids, + ); + + HummockVersionCommon::::apply_change_log_delta( + &mut *table_change_log_to_apply_guard, &version_delta.change_log_delta, &version_delta.removed_table_ids, &version_delta.state_table_info_delta, &changed_table_info, ); - } + } - let local_hummock_version_delta = LocalHummockVersionDelta::from(version_delta); - if let Some(sst_delta_infos) = &mut sst_delta_infos { - sst_delta_infos.extend( - version_to_apply - .build_sst_delta_infos(&local_hummock_version_delta) - .into_iter(), - ); + let local_hummock_version_delta = + LocalHummockVersionDelta::from(version_delta); + if let Some(sst_delta_infos) = &mut sst_delta_infos { + sst_delta_infos.extend( + version_to_apply + .build_sst_delta_infos(&local_hummock_version_delta) + .into_iter(), + ); + } + version_to_apply.apply_version_delta(&local_hummock_version_delta); } - version_to_apply.apply_version_delta(&local_hummock_version_delta); } pinned_version.new_pin_version_with_table_change_log( diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 6eebf8212d1a3..d6634d09bb332 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -133,7 +133,7 @@ impl PinnedVersion { pub fn new_pin_version_with_table_change_log( &self, version: LocalHummockVersion, - table_change_log: HashMap>, + table_change_log: Arc>>>, ) -> Option { assert!( version.id >= self.version.id, @@ -151,7 +151,7 @@ impl PinnedVersion { version_id, self.guard.pinned_version_manager_tx.clone(), )), - table_change_log: Arc::new(RwLock::new(table_change_log)), + table_change_log, version: Arc::new(version), }) } @@ -203,6 +203,12 @@ impl PinnedVersion { ) -> &Arc>>> { &self.table_change_log } + + pub fn take_change_log( + &mut self, + ) -> Arc>>> { + std::mem::take(&mut self.table_change_log) + } } pub(crate) async fn start_pinned_version_worker( From d7ff917da4c938aed2ba2564515f455e3dd7951f Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 15 Jan 2025 15:35:24 +0800 Subject: [PATCH 3/7] typo --- .../src/hummock/store/hummock_storage.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 76cf27ee95f89..f92072b4f3936 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::future::Future; use std::ops::Bound; use std::sync::Arc; @@ -20,17 +20,15 @@ use std::sync::Arc; use arc_swap::ArcSwap; use bytes::Bytes; use itertools::Itertools; -use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; use risingwave_common_service::{NotificationClient, ObserverManager}; -use risingwave_hummock_sdk::change_log::TableChangeLogCommon; use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion}; use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, SyncResult}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; @@ -643,12 +641,11 @@ impl StateStoreReadLog for HummockStorage { async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult { fn next_epoch( - table_change_logs: &Arc>>>, + version: &LocalHummockVersion, epoch: u64, table_id: TableId, ) -> HummockResult> { - let guard = table_change_logs.read(); - let table_change_log = guard.get(&table_id).ok_or_else(|| { + let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| { HummockError::next_epoch(format!("table {} has been dropped", table_id)) })?; table_change_log.next_epoch(epoch).map_err(|_| { @@ -663,7 +660,7 @@ impl StateStoreReadLog for HummockStorage { // fast path let recent_versions = self.recent_versions.load(); if let Some(next_epoch) = next_epoch( - recent_versions.latest_version().table_change_log(), + recent_versions.latest_version().version(), epoch, options.table_id, )? { @@ -674,9 +671,7 @@ impl StateStoreReadLog for HummockStorage { wait_for_update( &self.version_update_notifier_tx, |version| { - if let Some(next_epoch) = - next_epoch(version.table_change_log(), epoch, options.table_id)? - { + if let Some(next_epoch) = next_epoch(version.version(), epoch, options.table_id)? { next_epoch_ret = Some(next_epoch); Ok(true) } else { From 453ab6d1102efb90ddaebb0365742809a4264e73 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 17 Jan 2025 15:38:33 +0800 Subject: [PATCH 4/7] address comments --- src/storage/hummock_sdk/src/change_log.rs | 2 +- src/storage/hummock_sdk/src/compact_task.rs | 8 +- .../compaction_group/hummock_version_ext.rs | 2 +- src/storage/hummock_sdk/src/time_travel.rs | 2 +- src/storage/hummock_sdk/src/version.rs | 76 +++++++------------ .../event_handler/hummock_event_handler.rs | 19 +++-- .../hummock/local_version/pinned_version.rs | 33 ++++---- .../src/hummock/store/hummock_storage.rs | 10 +-- 8 files changed, 63 insertions(+), 89 deletions(-) diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index c8a1db3883192..e56c16eb21720 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -59,7 +59,7 @@ impl TableChangeLogCommon { .cloned() } - pub fn change_log_into_iter(self) -> impl Iterator> { + pub(crate) fn change_log_into_iter(self) -> impl Iterator> { self.0.into_iter() } } diff --git a/src/storage/hummock_sdk/src/compact_task.rs b/src/storage/hummock_sdk/src/compact_task.rs index 3dd5187936540..d173f6d252725 100644 --- a/src/storage/hummock_sdk/src/compact_task.rs +++ b/src/storage/hummock_sdk/src/compact_task.rs @@ -115,8 +115,8 @@ impl CompactTask { } impl From for CompactTask { - #[expect(deprecated)] fn from(pb_compact_task: PbCompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: pb_compact_task .input_ssts @@ -168,8 +168,8 @@ impl From for CompactTask { } impl From<&PbCompactTask> for CompactTask { - #[expect(deprecated)] fn from(pb_compact_task: &PbCompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: pb_compact_task .input_ssts @@ -221,8 +221,8 @@ impl From<&PbCompactTask> for CompactTask { } impl From for PbCompactTask { - #[expect(deprecated)] fn from(compact_task: CompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: compact_task .input_ssts @@ -272,8 +272,8 @@ impl From for PbCompactTask { } impl From<&CompactTask> for PbCompactTask { - #[expect(deprecated)] fn from(compact_task: &CompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: compact_task .input_ssts diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index b762828224ea9..8ea85ec57dbe3 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -936,7 +936,7 @@ impl HummockVersionCommon { } } -impl HummockVersionCommon +impl HummockVersionCommon where T: SstableIdReader + ObjectIdReader, { diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 408085bc8096b..c351629025be9 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -28,7 +28,7 @@ use crate::version::{ }; use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId}; -pub type IncompleteHummockVersion = HummockVersionCommon; +pub type IncompleteHummockVersion = HummockVersionCommon; /// Populates `SstableInfo` for `table_id`. /// `SstableInfo` not associated with `table_id` is removed. diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index e46f9833695c0..8148acf0dbb20 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -30,7 +30,9 @@ use risingwave_pb::hummock::{ }; use tracing::warn; -use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon}; +use crate::change_log::{ + ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon, +}; use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels; use crate::compaction_group::StaticCompactionGroupId; use crate::level::LevelsCommon; @@ -217,7 +219,7 @@ impl HummockVersionStateTableInfo { } #[derive(Debug, Clone, PartialEq)] -pub struct HummockVersionCommon { +pub struct HummockVersionCommon { pub id: HummockVersionId, pub levels: HashMap>, #[deprecated] @@ -237,7 +239,7 @@ impl Default for HummockVersion { } } -impl HummockVersionCommon +impl HummockVersionCommon where T: for<'a> From<&'a PbSstableInfo>, PbSstableInfo: for<'a> From<&'a T>, @@ -291,7 +293,7 @@ impl HummockVersion { } } -impl From<&PbHummockVersion> for HummockVersionCommon +impl From<&PbHummockVersion> for HummockVersionCommon where T: for<'a> From<&'a PbSstableInfo>, { @@ -334,11 +336,11 @@ where } } -impl From<&HummockVersionCommon> for PbHummockVersion +impl From<&HummockVersionCommon> for PbHummockVersion where PbSstableInfo: for<'a> From<&'a T>, { - fn from(version: &HummockVersionCommon) -> Self { + fn from(version: &HummockVersionCommon) -> Self { #[expect(deprecated)] Self { id: version.id.0, @@ -363,12 +365,12 @@ where } } -impl From> for PbHummockVersion +impl From> for PbHummockVersion where PbSstableInfo: From, PbSstableInfo: for<'a> From<&'a T>, { - fn from(version: HummockVersionCommon) -> Self { + fn from(version: HummockVersionCommon) -> Self { #[expect(deprecated)] Self { id: version.id.0, @@ -471,6 +473,13 @@ impl HummockVersion { state_table_info_delta: Default::default(), } } + + pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap) { + let table_change_log = std::mem::take(&mut self.table_change_log); + let local_version = LocalHummockVersion::from(self); + + (local_version, table_change_log) + } } impl HummockVersionCommon { @@ -483,7 +492,7 @@ impl HummockVersionCommon { } #[derive(Debug, PartialEq, Clone)] -pub struct HummockVersionDeltaCommon { +pub struct HummockVersionDeltaCommon { pub id: HummockVersionId, pub prev_id: HummockVersionId, pub group_deltas: HashMap>, @@ -506,7 +515,7 @@ impl Default for HummockVersionDelta { } } -impl HummockVersionDeltaCommon +impl HummockVersionDeltaCommon where T: for<'a> From<&'a PbSstableInfo>, PbSstableInfo: for<'a> From<&'a T>, @@ -536,7 +545,7 @@ pub trait ObjectIdReader { fn object_id(&self) -> HummockSstableObjectId; } -impl HummockVersionDeltaCommon +impl HummockVersionDeltaCommon where T: SstableIdReader + ObjectIdReader, { @@ -589,7 +598,7 @@ impl HummockVersionDelta { } } -impl From<&PbHummockVersionDelta> for HummockVersionDeltaCommon +impl From<&PbHummockVersionDelta> for HummockVersionDeltaCommon where T: for<'a> From<&'a PbSstableInfo>, { @@ -645,11 +654,11 @@ where } } -impl From<&HummockVersionDeltaCommon> for PbHummockVersionDelta +impl From<&HummockVersionDeltaCommon> for PbHummockVersionDelta where PbSstableInfo: for<'a> From<&'a T>, { - fn from(version_delta: &HummockVersionDeltaCommon) -> Self { + fn from(version_delta: &HummockVersionDeltaCommon) -> Self { #[expect(deprecated)] Self { id: version_delta.id.0, @@ -685,11 +694,11 @@ where } } -impl From> for PbHummockVersionDelta +impl From> for PbHummockVersionDelta where PbSstableInfo: From, { - fn from(version_delta: HummockVersionDeltaCommon) -> Self { + fn from(version_delta: HummockVersionDeltaCommon) -> Self { #[expect(deprecated)] Self { id: version_delta.id.0, @@ -725,7 +734,7 @@ where } } -impl From for HummockVersionDeltaCommon +impl From for HummockVersionDeltaCommon where T: From, { @@ -1135,39 +1144,6 @@ impl From for LocalHummockVersionDelta { } } -impl From<&HummockVersionDelta> for LocalHummockVersionDelta { - #[expect(deprecated)] - fn from(delta: &HummockVersionDelta) -> Self { - Self { - id: delta.id, - prev_id: delta.prev_id, - group_deltas: delta.group_deltas.clone(), - max_committed_epoch: delta.max_committed_epoch, - trivial_move: delta.trivial_move, - new_table_watermarks: delta.new_table_watermarks.clone(), - removed_table_ids: delta.removed_table_ids.clone(), - change_log_delta: delta - .change_log_delta - .iter() - .map(|(k, v)| { - ( - *k, - ChangeLogDeltaCommon { - truncate_epoch: v.truncate_epoch, - new_log: EpochNewChangeLogCommon { - epochs: v.new_log.epochs.clone(), - new_value: Vec::new(), - old_value: Vec::new(), - }, - }, - ) - }) - .collect(), - state_table_info_delta: delta.state_table_info_delta.clone(), - } - } -} - impl From for LocalHummockVersion { #[expect(deprecated)] fn from(version: HummockVersion) -> Self { diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 71a0b03bfa393..bdec40cd164be 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; +use std::ops::Deref; use std::pin::pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -510,7 +511,7 @@ impl HummockEventHandler { let mut sst_delta_infos = vec![]; if let Some(new_pinned_version) = Self::resolve_version_update_info( - pinned_version.clone(), + &pinned_version, version_payload, Some(&mut sst_delta_infos), ) { @@ -520,16 +521,17 @@ impl HummockEventHandler { } fn resolve_version_update_info( - mut pinned_version: PinnedVersion, + pinned_version: &PinnedVersion, version_payload: HummockVersionUpdate, mut sst_delta_infos: Option<&mut Vec>, ) -> Option { match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { - let mut version_to_apply = pinned_version.version().clone(); - let table_change_log_to_apply = pinned_version.take_change_log(); + let mut version_to_apply = pinned_version.deref().clone(); { - let mut table_change_log_to_apply_guard = table_change_log_to_apply.write(); + let mut table_change_log_to_apply_guard = + pinned_version.table_change_log_write_lock(); + // let mut table_change_log_to_apply_guard = table_change_log_to_apply.write(); for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); @@ -552,7 +554,7 @@ impl HummockEventHandler { } let local_hummock_version_delta = - LocalHummockVersionDelta::from(version_delta); + LocalHummockVersionDelta::from(version_delta.clone()); if let Some(sst_delta_infos) = &mut sst_delta_infos { sst_delta_infos.extend( version_to_apply @@ -560,13 +562,14 @@ impl HummockEventHandler { .into_iter(), ); } + version_to_apply.apply_version_delta(&local_hummock_version_delta); } } - pinned_version.new_pin_version_with_table_change_log( + pinned_version.new_with_change_log( version_to_apply, - table_change_log_to_apply, + pinned_version.table_change_log().clone(), ) } HummockVersionUpdate::PinnedVersion(version) => { diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index d6634d09bb332..e1cb900b53f23 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -92,22 +92,22 @@ impl Deref for PinnedVersion { impl PinnedVersion { pub fn new( - mut version: HummockVersion, + version: HummockVersion, pinned_version_manager_tx: UnboundedSender, ) -> Self { let version_id = version.id; - let t = std::mem::take(&mut version.table_change_log); + let (local_version, table_id_to_change_logs) = version.split_change_log(); PinnedVersion { guard: Arc::new(PinnedVersionGuard::new( version_id, pinned_version_manager_tx, )), - table_change_log: Arc::new(RwLock::new(t)), - version: Arc::new(LocalHummockVersion::from(version)), + table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)), + version: Arc::new(local_version), } } - pub fn new_pin_version(&self, mut version: HummockVersion) -> Option { + pub fn new_pin_version(&self, version: HummockVersion) -> Option { assert!( version.id >= self.version.id, "pinning a older version {}. Current is {}", @@ -118,19 +118,18 @@ impl PinnedVersion { return None; } let version_id = version.id; - let t = std::mem::take(&mut version.table_change_log); - + let (local_version, table_id_to_change_logs) = version.split_change_log(); Some(PinnedVersion { guard: Arc::new(PinnedVersionGuard::new( version_id, self.guard.pinned_version_manager_tx.clone(), )), - table_change_log: Arc::new(RwLock::new(t)), - version: Arc::new(LocalHummockVersion::from(version)), + table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)), + version: Arc::new(local_version), }) } - pub fn new_pin_version_with_table_change_log( + pub fn new_with_change_log( &self, version: LocalHummockVersion, table_change_log: Arc>>>, @@ -144,6 +143,7 @@ impl PinnedVersion { if version.id == self.version.id { return None; } + let version_id = version.id; Some(PinnedVersion { @@ -194,20 +194,17 @@ impl PinnedVersion { } } - pub fn version(&self) -> &LocalHummockVersion { - &self.version - } - pub fn table_change_log( &self, ) -> &Arc>>> { &self.table_change_log } - pub fn take_change_log( - &mut self, - ) -> Arc>>> { - std::mem::take(&mut self.table_change_log) + pub fn table_change_log_write_lock( + &self, + ) -> parking_lot::RwLockWriteGuard<'_, HashMap>> + { + self.table_change_log.write() } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index f92072b4f3936..6b231f045e56a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -659,11 +659,9 @@ impl StateStoreReadLog for HummockStorage { { // fast path let recent_versions = self.recent_versions.load(); - if let Some(next_epoch) = next_epoch( - recent_versions.latest_version().version(), - epoch, - options.table_id, - )? { + if let Some(next_epoch) = + next_epoch(recent_versions.latest_version(), epoch, options.table_id)? + { return Ok(next_epoch); } } @@ -671,7 +669,7 @@ impl StateStoreReadLog for HummockStorage { wait_for_update( &self.version_update_notifier_tx, |version| { - if let Some(next_epoch) = next_epoch(version.version(), epoch, options.table_id)? { + if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? { next_epoch_ret = Some(next_epoch); Ok(true) } else { From b91fc40a77503cb042281896dc94000d30d0f40f Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 17 Jan 2025 15:50:26 +0800 Subject: [PATCH 5/7] typo --- src/storage/hummock_sdk/src/time_travel.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index c351629025be9..9f628808909ff 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -159,8 +159,7 @@ fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet; +pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon; /// `SStableInfo` will be stripped. impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { From d516e3826006c5293f62fc297f07e65d96b463ff Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 20 Jan 2025 14:06:08 +0800 Subject: [PATCH 6/7] address comments --- src/storage/hummock_sdk/src/change_log.rs | 6 +++++ src/storage/hummock_sdk/src/version.rs | 27 ++++++++++++++++--- .../event_handler/hummock_event_handler.rs | 27 ++++++++----------- .../hummock/local_version/pinned_version.rs | 15 +++++------ src/storage/src/hummock/store/version.rs | 2 +- 5 files changed, 47 insertions(+), 30 deletions(-) diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index e56c16eb21720..9bb4505fb808c 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -62,6 +62,12 @@ impl TableChangeLogCommon { pub(crate) fn change_log_into_iter(self) -> impl Iterator> { self.0.into_iter() } + + pub(crate) fn change_log_iter_mut( + &mut self, + ) -> impl Iterator> { + self.0.iter_mut() + } } pub type TableChangeLog = TableChangeLogCommon; diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 8148acf0dbb20..9e875fec6225b 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; use std::mem::{replace, size_of}; use std::ops::Deref; use std::sync::{Arc, LazyLock}; @@ -229,7 +229,7 @@ pub struct HummockVersionCommon { pub state_table_info: HummockVersionStateTableInfo, } -pub type HummockVersion = HummockVersionCommon; +pub type HummockVersion = HummockVersionCommon; pub type LocalHummockVersion = HummockVersionCommon; @@ -475,7 +475,26 @@ impl HummockVersion { } pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap) { - let table_change_log = std::mem::take(&mut self.table_change_log); + let table_change_log = { + let mut table_change_log = HashMap::new(); + for (table_id, log) in &mut self.table_change_log { + let mut change_log = VecDeque::new(); + for item in log.change_log_iter_mut() { + let new_value = EpochNewChangeLogCommon { + new_value: std::mem::take(&mut item.new_value), + old_value: std::mem::take(&mut item.old_value), + epochs: item.epochs.clone(), + }; + + change_log.push_back(new_value); + } + table_change_log + .insert(*table_id, TableChangeLogCommon::new(change_log.into_iter())); + } + + table_change_log + }; + let local_version = LocalHummockVersion::from(self); (local_version, table_change_log) @@ -505,7 +524,7 @@ pub struct HummockVersionDeltaCommon { pub state_table_info_delta: HashMap, } -pub type HummockVersionDelta = HummockVersionDeltaCommon; +pub type HummockVersionDelta = HummockVersionDeltaCommon; pub type LocalHummockVersionDelta = HummockVersionDeltaCommon; diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index bdec40cd164be..b584b1f55e5be 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; -use std::ops::Deref; use std::pin::pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -527,12 +526,11 @@ impl HummockEventHandler { ) -> Option { match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { - let mut version_to_apply = pinned_version.deref().clone(); + let mut version_to_apply = (**pinned_version).clone(); { let mut table_change_log_to_apply_guard = pinned_version.table_change_log_write_lock(); - // let mut table_change_log_to_apply_guard = table_change_log_to_apply.write(); - for version_delta in &version_deltas { + for version_delta in version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); // apply change-log-delta @@ -544,17 +542,17 @@ impl HummockEventHandler { &version_delta.removed_table_ids, ); - HummockVersionCommon::::apply_change_log_delta( - &mut *table_change_log_to_apply_guard, - &version_delta.change_log_delta, - &version_delta.removed_table_ids, - &version_delta.state_table_info_delta, - &changed_table_info, - ); + HummockVersionCommon::::apply_change_log_delta( + &mut *table_change_log_to_apply_guard, + &version_delta.change_log_delta, + &version_delta.removed_table_ids, + &version_delta.state_table_info_delta, + &changed_table_info, + ); } let local_hummock_version_delta = - LocalHummockVersionDelta::from(version_delta.clone()); + LocalHummockVersionDelta::from(version_delta); if let Some(sst_delta_infos) = &mut sst_delta_infos { sst_delta_infos.extend( version_to_apply @@ -567,10 +565,7 @@ impl HummockEventHandler { } } - pinned_version.new_with_change_log( - version_to_apply, - pinned_version.table_change_log().clone(), - ) + pinned_version.new_with_local_version(version_to_apply) } HummockVersionUpdate::PinnedVersion(version) => { pinned_version.new_pin_version(*version) diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index e1cb900b53f23..082879be992ee 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -129,11 +129,8 @@ impl PinnedVersion { }) } - pub fn new_with_change_log( - &self, - version: LocalHummockVersion, - table_change_log: Arc>>>, - ) -> Option { + /// Create a new `PinnedVersion` with the given `LocalHummockVersion`. Referring to the usage in the `hummock_event_handler`. + pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option { assert!( version.id >= self.version.id, "pinning a older version {}. Current is {}", @@ -151,7 +148,7 @@ impl PinnedVersion { version_id, self.guard.pinned_version_manager_tx.clone(), )), - table_change_log, + table_change_log: self.table_change_log.clone(), version: Arc::new(version), }) } @@ -194,10 +191,10 @@ impl PinnedVersion { } } - pub fn table_change_log( + pub fn table_change_log_read_lock( &self, - ) -> &Arc>>> { - &self.table_change_log + ) -> parking_lot::RwLockReadGuard<'_, HashMap>> { + self.table_change_log.read() } pub fn table_change_log_write_lock( diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 98ca86bec7507..acf7c6503fb7f 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -993,7 +993,7 @@ impl HummockVersionReader { options: ReadLogOptions, ) -> HummockResult { let change_log = { - let table_change_logs = version.table_change_log().read(); + let table_change_logs = version.table_change_log_read_lock(); if let Some(change_log) = table_change_logs.get(&options.table_id) { change_log.filter_epoch(epoch_range).cloned().collect_vec() } else { From b3f7f7beda2e4a107dd88f598d760b75a4f3af97 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 20 Jan 2025 15:59:17 +0800 Subject: [PATCH 7/7] address comments --- src/storage/hummock_sdk/src/version.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 9e875fec6225b..84cba3804a96c 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::mem::{replace, size_of}; use std::ops::Deref; use std::sync::{Arc, LazyLock}; @@ -478,18 +478,14 @@ impl HummockVersion { let table_change_log = { let mut table_change_log = HashMap::new(); for (table_id, log) in &mut self.table_change_log { - let mut change_log = VecDeque::new(); - for item in log.change_log_iter_mut() { - let new_value = EpochNewChangeLogCommon { - new_value: std::mem::take(&mut item.new_value), - old_value: std::mem::take(&mut item.old_value), - epochs: item.epochs.clone(), - }; - - change_log.push_back(new_value); - } - table_change_log - .insert(*table_id, TableChangeLogCommon::new(change_log.into_iter())); + let change_log_iter = + log.change_log_iter_mut() + .map(|item| EpochNewChangeLogCommon { + new_value: std::mem::take(&mut item.new_value), + old_value: std::mem::take(&mut item.old_value), + epochs: item.epochs.clone(), + }); + table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter)); } table_change_log