Skip to content

Perf: reduce mem usage when transaction is large #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions crates/loro-common/src/span.rs
Original file line number Diff line number Diff line change
@@ -67,6 +67,11 @@ impl CounterSpan {
}
}

#[inline(always)]
pub fn bidirectional(&self) -> bool {
(self.end - self.start).abs() == 1
}

#[inline(always)]
pub fn direction(&self) -> i32 {
if self.start < self.end {
@@ -141,6 +146,20 @@ impl CounterSpan {
self.end = new_end.min(self.start);
}
}

/// if we can merge element on the left, this method return the last atom of it
fn prev_pos(&self) -> i32 {
if self.start < self.end {
self.start - 1
} else {
self.start + 1
}
}

/// if we can merge element on the right, this method return the first atom of it
fn next_pos(&self) -> i32 {
self.end
}
}

impl HasLength for CounterSpan {
@@ -176,13 +195,32 @@ impl Sliceable for CounterSpan {
impl Mergable for CounterSpan {
#[inline]
fn is_mergable(&self, other: &Self, _: &()) -> bool {
// TODO: can use the similar logic as [DeleteSpan] to merge
self.end == other.start && self.direction() == other.direction()
match (self.bidirectional(), other.bidirectional()) {
(true, true) => self.start + 1 == other.start || self.start == other.start + 1,
(true, false) => self.start == other.prev_pos(),
(false, true) => self.next_pos() == other.start,
(false, false) => {
self.next_pos() == other.start && self.direction() == other.direction()
}
}
}

#[inline]
fn merge(&mut self, other: &Self, _: &()) {
self.end = other.end;
match (self.bidirectional(), other.bidirectional()) {
(true, true) => {
if self.start + 1 == other.start {
self.end = self.start + 2;
} else if self.start - 1 == other.start {
self.end = self.start - 2;
}
}
(true, false) => self.end = other.end,
(false, true) => self.end = self.end + self.direction(),
(false, false) => {
self.end = other.end;
}
}
}
}

@@ -444,4 +482,47 @@ mod test_id_span {
let slice: Vec<IdSpan> = id_span_vec.slice_iter(5, 14).map(|x| x.into()).collect();
assert_eq!(slice, id_spans!([0, 95, 90], [2, 2, 4], [2, 8, 6]).to_vec());
}

#[test]
fn merge() {
let mut a = CounterSpan::new(0, 2);
let b = CounterSpan::new(2, 1);
assert!(a.is_mergable(&b, &()));
a.merge(&b, &());
assert_eq!(a, CounterSpan::new(0, 3));

let mut a = CounterSpan::new(3, 2);
let b = CounterSpan::new(2, 1);
assert!(a.is_mergable(&b, &()));
a.merge(&b, &());
assert_eq!(a, CounterSpan::new(3, 1));

let mut a = CounterSpan::new(4, 2);
let b = CounterSpan::new(2, 3);
assert!(a.is_mergable(&b, &()));
a.merge(&b, &());
assert_eq!(a, CounterSpan::new(4, 1));

let mut a = CounterSpan::new(8, 9);
let b = CounterSpan::new(9, 8);
assert!(a.is_mergable(&b, &()));
a.merge(&b, &());
assert_eq!(a, CounterSpan::new(8, 10));

let mut a = CounterSpan::new(8, 9);
let b = CounterSpan::new(10, 11);
assert!(!a.is_mergable(&b, &()));

let mut a = CounterSpan::new(0, 2);
let b = CounterSpan::new(2, 4);
assert!(a.is_mergable(&b, &()));
a.merge(&b, &());
assert_eq!(a, CounterSpan::new(0, 4));

let mut a = CounterSpan::new(4, 2);
let b = CounterSpan::new(2, 0);
assert!(a.is_mergable(&b, &()));
a.merge(&b, &());
assert_eq!(a, CounterSpan::new(4, 0));
}
}
19 changes: 10 additions & 9 deletions crates/loro-internal/examples/encoding_refactored.rs
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@ use criterion::black_box;
use loro_internal::loro::LoroDoc;

fn main() {
log_size();
// bench_decode();
// log_size();
bench_decode();
// bench_decode_updates();
}

@@ -55,17 +55,18 @@ fn log_size() {

#[allow(unused)]
fn bench_decode() {
println!("Bench decode");
let actions = bench_utils::get_automerge_actions();
{
let loro = LoroDoc::default();
let mut loro = LoroDoc::default();
let text = loro.get_text("text");
loro.start_auto_commit();

#[allow(warnings)]
for TextAction { pos, ins, del } in actions.iter() {
let mut txn = loro.txn().unwrap();
text.delete(&mut txn, *pos, *del);
text.insert(&mut txn, *pos, ins);
txn.commit().unwrap();
for _ in 0..10 {
for TextAction { pos, ins, del } in actions.iter() {
text.delete_(*pos, *del);
text.insert_(*pos, ins);
}
}
let snapshot = loro.export_snapshot();
// for _ in 0..100 {
2 changes: 2 additions & 0 deletions crates/loro-internal/src/container/list/list_op.rs
Original file line number Diff line number Diff line change
@@ -190,6 +190,8 @@ impl Mergable for DeleteSpan {
where
Self: Sized,
{
// merge continuous deletions:
// note that the previous deletions will affect the position of the later deletions
match (self.bidirectional(), other.bidirectional()) {
(true, true) => self.pos == other.pos || self.pos == other.pos + 1,
(true, false) => self.pos == other.prev_pos(),
1 change: 1 addition & 0 deletions crates/loro-internal/src/container/richtext/tracker.rs
Original file line number Diff line number Diff line change
@@ -228,6 +228,7 @@ impl Tracker {
self._checkout(from, false);
self._checkout(to, true);
// debug_log::debug_dbg!(from, to, &self);
// self.id_to_cursor.diagnose();
self.rope.get_diff()
}
}
Original file line number Diff line number Diff line change
@@ -162,6 +162,23 @@ impl IdToCursor {
.cursor
.get_insert((id.counter - list[index].counter) as usize)
}

pub fn diagnose(&self) {
let fragment_num = self.map.iter().map(|x| x.1.len()).sum::<usize>();
let insert_pieces = self
.map
.iter()
.flat_map(|x| x.1.iter())
.map(|x| match &x.cursor {
Cursor::Insert { set, len } => set.len(),
Cursor::Delete(_) => 0,
})
.sum::<usize>();
eprintln!(
"fragments:{}, insert_pieces:{}",
fragment_num, insert_pieces
);
}
}

#[derive(Debug)]
17 changes: 11 additions & 6 deletions crates/loro-internal/src/handler.rs
Original file line number Diff line number Diff line change
@@ -265,19 +265,21 @@ impl TextHandler {
.get_entity_index_for_text_insert_event_index(pos)
});

let unicode_len = s.chars().count();
txn.apply_local_op(
self.container_idx,
crate::op::RawOpContent::List(crate::container::list::list_op::ListOp::Insert {
slice: ListSlice::RawStr {
str: Cow::Borrowed(s),
unicode_len: s.chars().count(),
unicode_len,
},
pos: entity_index,
}),
EventHint::InsertText {
pos,
pos: pos as u32,
// FIXME: this is wrong
styles: vec![],
len: unicode_len as u32,
},
&self.state,
)
@@ -333,7 +335,10 @@ impl TextHandler {
signed_len: (range.end - range.start) as isize,
})),
if is_first {
EventHint::DeleteText { pos, len }
EventHint::DeleteText(DeleteSpan {
pos: pos as isize,
signed_len: len as isize,
})
} else {
EventHint::None
},
@@ -408,8 +413,8 @@ impl TextHandler {
info: flag,
}),
EventHint::Mark {
start,
end,
start: start as u32,
end: end as u32,
style: crate::container::richtext::Style {
key: key.into(),
// FIXME: style meta is incorrect
@@ -541,7 +546,7 @@ impl ListHandler {
pos: pos as isize,
signed_len: len as isize,
})),
EventHint::DeleteList { pos, len },
EventHint::DeleteList(DeleteSpan::new(pos as isize, len as isize)),
&self.state,
)
}
92 changes: 68 additions & 24 deletions crates/loro-internal/src/state/richtext_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{ops::Range, sync::Arc};

use fxhash::FxHashMap;
use generic_btree::rle::HasLength;
use generic_btree::rle::{HasLength, Mergeable};
use loro_common::{Counter, LoroResult, LoroValue, PeerID, ID};
use loro_preload::{CommonArena, EncodedRichtextState, TempArena, TextRanges};

@@ -87,6 +87,49 @@ enum UndoItem {
},
}

impl Mergeable for UndoItem {
fn can_merge(&self, rhs: &Self) -> bool {
match (self, rhs) {
(UndoItem::Insert { index, len }, UndoItem::Insert { index: r_index, .. }) => {
*index + *len == *r_index
}
(
UndoItem::Delete { index, content },
UndoItem::Delete {
index: r_i,
content: r_c,
},
) => *r_i + r_c.rle_len() as u32 == *index && r_c.can_merge(content),
_ => false,
}
}

fn merge_right(&mut self, rhs: &Self) {
match (self, rhs) {
(UndoItem::Insert { len, .. }, UndoItem::Insert { len: r_len, .. }) => {
*len += *r_len;
}
(
UndoItem::Delete { content, index },
UndoItem::Delete {
content: r_c,
index: r_i,
},
) => {
if *r_i + r_c.rle_len() as u32 == *index {
content.merge_right(r_c);
*index = *r_i
}
}
_ => unreachable!(),
}
}

fn merge_left(&mut self, _: &Self) {
unreachable!()
}
}

impl ContainerState for RichtextState {
// TODO: refactor
fn apply_diff_and_convert(&mut self, diff: InternalDiff, _arena: &SharedArena) -> Diff {
@@ -179,22 +222,13 @@ impl ContainerState for RichtextState {
}
}
}
self.undo_stack.push(UndoItem::Insert {
index: entity_index as u32,
len: value.rle_len() as u32,
});
entity_index += value.rle_len();
}
crate::delta::DeltaItem::Delete { len, meta: _ } => {
let (start, end) =
self.state
.get_mut()
.drain_by_entity_index(entity_index, *len, |span| {
self.undo_stack.push(UndoItem::Delete {
index: entity_index as u32,
content: span,
})
});
.drain_by_entity_index(entity_index, *len, |_| {});
if start > event_index {
for (len, styles) in self
.state
@@ -287,21 +321,12 @@ impl ContainerState for RichtextState {
}
}
}
self.undo_stack.push(UndoItem::Insert {
index: entity_index as u32,
len: value.rle_len() as u32,
});
entity_index += value.rle_len();
}
crate::delta::DeltaItem::Delete { len, meta: _ } => {
self.state
.get_mut()
.drain_by_entity_index(entity_index, *len, |span| {
self.undo_stack.push(UndoItem::Delete {
index: entity_index as u32,
content: span,
})
});
.drain_by_entity_index(entity_index, *len, |_| {});
}
}
}
@@ -324,7 +349,7 @@ impl ContainerState for RichtextState {
.insert_at_entity_index(*pos as usize, slice.clone());

if self.in_txn {
self.undo_stack.push(UndoItem::Insert {
self.push_undo(UndoItem::Insert {
index: *pos,
len: *len,
})
@@ -336,10 +361,18 @@ impl ContainerState for RichtextState {
rle::HasLength::atom_len(&del),
|span| {
if self.in_txn {
self.undo_stack.push(UndoItem::Delete {
let item = UndoItem::Delete {
index: del.start() as u32,
content: span,
})
};
match self.undo_stack.last_mut() {
Some(last) if last.can_merge(&item) => {
last.merge_right(&item);
}
_ => {
self.undo_stack.push(item);
}
}
}
},
);
@@ -427,6 +460,17 @@ impl RichtextState {
}
}

fn push_undo(&mut self, item: UndoItem) {
match self.undo_stack.last_mut() {
Some(last) if last.can_merge(&item) => {
last.merge_right(&item);
}
_ => {
self.undo_stack.push(item);
}
}
}

#[inline(always)]
pub fn len_utf8(&mut self) -> usize {
self.state.get_mut().len_utf8()
149 changes: 119 additions & 30 deletions crates/loro-internal/src/txn.rs
Original file line number Diff line number Diff line change
@@ -6,13 +6,13 @@ use std::{

use debug_log::debug_dbg;
use enum_as_inner::EnumAsInner;
use fxhash::FxHashMap;
use generic_btree::rle::{HasLength as RleHasLength, Mergeable, Sliceable as GBSliceable};
use loro_common::{ContainerType, LoroResult};
use rle::{HasLength, RleVec};
use rle::{HasLength, Mergable, RleVec, Sliceable};

use crate::{
change::{get_sys_timestamp, Change, Lamport, Timestamp},
container::{idx::ContainerIdx, richtext::Style, IntoContainerId},
container::{idx::ContainerIdx, list::list_op::DeleteSpan, richtext::Style, IntoContainerId},
delta::{Delta, MapValue, TreeDelta, TreeDiff},
event::Diff,
id::{Counter, PeerID, ID},
@@ -44,7 +44,7 @@ pub struct Transaction {
oplog: Arc<Mutex<OpLog>>,
frontiers: Frontiers,
local_ops: RleVec<[Op; 1]>, // TODO: use a more efficient data structure
event_hints: FxHashMap<Counter, EventHint>,
event_hints: Vec<EventHint>,
pub(super) arena: SharedArena,
finished: bool,
on_commit: Option<OnCommitFn>,
@@ -54,29 +54,23 @@ pub struct Transaction {
#[derive(Debug, Clone, EnumAsInner)]
pub(super) enum EventHint {
Mark {
start: usize,
end: usize,
start: u32,
end: u32,
style: Style,
},
InsertText {
/// pos is a Unicode index. If wasm, it's a UTF-16 index.
pos: usize,
pos: u32,
len: u32,
styles: Vec<Style>,
},
DeleteText {
/// pos is a Unicode index. If wasm, it's a UTF-16 index.
pos: usize,
/// len is a Unicode length. If wasm, it's a UTF-16 length.
len: usize,
},
/// pos is a Unicode index. If wasm, it's a UTF-16 index.
DeleteText(DeleteSpan),
InsertList {
pos: usize,
value: LoroValue,
},
DeleteList {
pos: usize,
len: usize,
},
DeleteList(DeleteSpan),
Map {
key: InternalString,
value: Option<LoroValue>,
@@ -85,6 +79,76 @@ pub(super) enum EventHint {
None,
}

impl generic_btree::rle::HasLength for EventHint {
fn rle_len(&self) -> usize {
match self {
EventHint::Mark { .. } => 1,
EventHint::InsertText { len, .. } => *len as usize,
EventHint::DeleteText(d) => d.len(),
EventHint::InsertList { .. } => 1,
EventHint::DeleteList(d) => d.len(),
EventHint::Map { .. } => 1,
EventHint::Tree(_) => 1,
EventHint::None => 1,
}
}
}

impl generic_btree::rle::Mergeable for EventHint {
fn can_merge(&self, rhs: &Self) -> bool {
match (self, rhs) {
(
EventHint::InsertText { pos, len, styles },
EventHint::InsertText {
pos: r_pos,
styles: r_styles,
..
},
) => *pos + *len == *r_pos && styles == r_styles,
(EventHint::DeleteText(l), EventHint::DeleteText(r)) => l.is_mergable(r, &()),
(EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
_ => false,
}
}

fn merge_right(&mut self, rhs: &Self) {
match (self, rhs) {
(EventHint::InsertText { len, .. }, EventHint::InsertText { len: r_len, .. }) => {
*len += *r_len;
}
(EventHint::DeleteText(l), EventHint::DeleteText(r)) => l.merge(r, &()),
(EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
_ => unreachable!(),
}
}

fn merge_left(&mut self, _: &Self) {
unreachable!()
}
}

impl generic_btree::rle::Sliceable for EventHint {
fn _slice(&self, range: std::ops::Range<usize>) -> Self {
match self {
EventHint::InsertText {
pos,
len: _,
styles,
} => EventHint::InsertText {
pos: *pos + range.start as u32,
len: range.len() as u32,
styles: styles.clone(),
},
EventHint::DeleteText(d) => EventHint::DeleteText(d.slice(range.start, range.end)),
EventHint::DeleteList(d) => EventHint::DeleteList(d.slice(range.start, range.end)),
a => {
assert_eq!(a.rle_len(), range.len());
a.clone()
}
}
}
}

impl Transaction {
pub fn new(
state: Arc<Mutex<DocState>>,
@@ -268,7 +332,15 @@ impl Transaction {
let op = self.arena.convert_raw_op(&raw_op);
state.apply_local_op(&raw_op, &op)?;
drop(state);
self.event_hints.insert(raw_op.id.counter, event);
assert_eq!(event.rle_len(), op.atom_len());
match self.event_hints.last_mut() {
Some(last) if last.can_merge(&event) => {
last.merge_right(&event);
}
_ => {
self.event_hints.push(event);
}
}
self.local_ops.push(op);
self.next_counter += len as Counter;
self.next_lamport += len as Lamport;
@@ -349,40 +421,58 @@ pub(crate) struct TxnContainerDiff {
fn change_to_diff(
change: &Change,
_arena: &SharedArena,
mut event_hints: FxHashMap<Counter, EventHint>,
event_hints: Vec<EventHint>,
) -> Vec<TxnContainerDiff> {
let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
let peer = change.id.peer;
let mut lamport = change.lamport;
let mut event_hint_iter = event_hints.into_iter();
let mut o_hint = event_hint_iter.next();

for op in change.ops.iter() {
let counter = op.counter;
let Some(hint) = event_hints.remove(&counter) else {
let Some(hint) = o_hint.as_mut() else {
unreachable!()
};

let hint = match op.atom_len().cmp(&hint.rle_len()) {
std::cmp::Ordering::Less => {
let ans = hint.slice(..op.atom_len());
hint.slice_(op.atom_len()..);
ans
}
std::cmp::Ordering::Equal => match event_hint_iter.next() {
Some(n) => o_hint.replace(n).unwrap(),
None => o_hint.take().unwrap(),
},
std::cmp::Ordering::Greater => {
unreachable!()
}
};

'outer: {
let diff: Diff =
match hint {
EventHint::Mark { start, end, style } => {
Diff::Text(Delta::new().retain(start).retain_with_meta(
end - start,
Diff::Text(Delta::new().retain(start as usize).retain_with_meta(
(end - start) as usize,
crate::delta::StyleMeta { vec: vec![style] },
))
}
EventHint::InsertText { pos, styles } => {
EventHint::InsertText { pos, styles, .. } => {
let slice = op.content.as_list().unwrap().as_insert_text().unwrap().0;
Diff::Text(Delta::new().retain(pos).insert_with_meta(
Diff::Text(Delta::new().retain(pos as usize).insert_with_meta(
slice.clone(),
crate::delta::StyleMeta { vec: styles },
))
}
EventHint::DeleteText { pos, len } => {
Diff::Text(Delta::new().retain(pos).delete(len))
EventHint::DeleteText(s) => {
Diff::Text(Delta::new().retain(s.start() as usize).delete(s.len()))
}
EventHint::InsertList { pos, value } => {
Diff::List(Delta::new().retain(pos).insert(vec![value]))
}
EventHint::DeleteList { pos, len } => {
Diff::List(Delta::new().retain(pos).delete(len))
EventHint::DeleteList(s) => {
Diff::List(Delta::new().retain(s.start() as usize).delete(s.len()))
}
EventHint::Map { key, value } => {
Diff::NewMap(crate::delta::MapDelta::new().with_entry(
@@ -410,6 +500,5 @@ fn change_to_diff(
lamport += op.content_len() as Lamport;
}

debug_dbg!(&ans);
ans
}