Skip to content

Commit 0a1e5be

Browse files
authored
Merge pull request #558 from streamich/patch-log
`PatchLog` improvements
2 parents 3ec6776 + 4d06e69 commit 0a1e5be

File tree

7 files changed

+282
-107
lines changed

7 files changed

+282
-107
lines changed

src/json-crdt/file/File.ts

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {Model} from '../model';
2-
import {PatchLog} from './PatchLog';
2+
import {PatchLog} from '../history/PatchLog';
33
import {printTree} from '../../util/print/printTree';
44
import {decodeModel, decodeNdjsonComponents, decodePatch, decodeSeqCborComponents} from './util';
55
import {Patch} from '../../json-crdt-patch';
@@ -46,9 +46,8 @@ export class File implements Printable {
4646
if (history) {
4747
const [start, patches] = history;
4848
if (start) {
49-
const startModel = decodeModel(start);
50-
log = new PatchLog(startModel);
51-
for (const patch of patches) log.push(decodePatch(patch));
49+
log = new PatchLog(() => decodeModel(start));
50+
for (const patch of patches) log.end.applyPatch(decodePatch(patch));
5251
}
5352
}
5453
if (!log) throw new Error('NO_HISTORY');
@@ -57,7 +56,7 @@ export class File implements Printable {
5756
for (const patch of frontier) {
5857
const patchDecoded = decodePatch(patch);
5958
decodedModel.applyPatch(patchDecoded);
60-
log.push(patchDecoded);
59+
log.end.applyPatch(patchDecoded);
6160
}
6261
}
6362
const file = new File(decodedModel, log);
@@ -75,7 +74,7 @@ export class File implements Printable {
7574
}
7675

7776
public static fromModel(model: Model<any>, options: FileOptions = {}): File {
78-
return new File(model, PatchLog.fromModel(model), options);
77+
return new File(model, PatchLog.fromNewModel(model), options);
7978
}
8079

8180
constructor(
@@ -88,7 +87,7 @@ export class File implements Printable {
8887
const id = patch.getId();
8988
if (!id) return;
9089
this.model.applyPatch(patch);
91-
this.log.push(patch);
90+
this.log.end.applyPatch(patch);
9291
}
9392

9493
/**
@@ -100,10 +99,10 @@ export class File implements Printable {
10099
const api = model.api;
101100
const autoflushUnsubscribe = api.autoFlush();
102101
const onPatchUnsubscribe = api.onPatch.listen((patch) => {
103-
log.push(patch);
102+
log.end.applyPatch(patch);
104103
});
105104
const onFlushUnsubscribe = api.onFlush.listen((patch) => {
106-
log.push(patch);
105+
log.end.applyPatch(patch);
107106
});
108107
return () => {
109108
autoflushUnsubscribe();
@@ -153,7 +152,7 @@ export class File implements Printable {
153152
const patchFormat = params.history ?? 'binary';
154153
switch (patchFormat) {
155154
case 'binary': {
156-
history[0] = this.log.start.toBinary();
155+
history[0] = this.log.start().toBinary();
157156
this.log.patches.forEach(({v}) => {
158157
history[1].push(v.toBinary());
159158
});
@@ -162,7 +161,7 @@ export class File implements Printable {
162161
case 'compact': {
163162
const encoder = this.options.structuralCompactEncoder;
164163
if (!encoder) throw new Error('NO_COMPACT_ENCODER');
165-
history[0] = encoder.encode(this.log.start);
164+
history[0] = encoder.encode(this.log.start());
166165
const encodeCompact = this.options.patchCompactEncoder;
167166
if (!encodeCompact) throw new Error('NO_COMPACT_PATCH_ENCODER');
168167
const list = history[1];
@@ -174,7 +173,7 @@ export class File implements Printable {
174173
case 'verbose': {
175174
const encoder = this.options.structuralVerboseEncoder;
176175
if (!encoder) throw new Error('NO_VERBOSE_ENCODER');
177-
history[0] = encoder.encode(this.log.start);
176+
history[0] = encoder.encode(this.log.start());
178177
const encodeVerbose = this.options.patchVerboseEncoder;
179178
if (!encodeVerbose) throw new Error('NO_VERBOSE_PATCH_ENCODER');
180179
const list = history[1];

src/json-crdt/file/PatchLog.ts

Lines changed: 0 additions & 61 deletions
This file was deleted.

src/json-crdt/file/__tests__/File.spec.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ test('can create File from new model', () => {
2020
}),
2121
);
2222
const file = File.fromModel(model);
23-
expect(file.log.start.view()).toBe(undefined);
23+
expect(file.log.start().view()).toBe(undefined);
2424
expect(file.model.view()).toEqual({
2525
foo: 'bar',
2626
});
27-
expect(file.log.start.clock.sid).toBe(file.model.clock.sid);
27+
expect(file.log.start().clock.sid).toBe(file.model.clock.sid);
2828
});
2929

3030
test.todo('patches are flushed and stored in memory');
@@ -56,7 +56,7 @@ describe('.toBinary()', () => {
5656
const file2 = File.fromNdjson(blob);
5757
expect(file2.model.view()).toEqual({foo: 'bar'});
5858
expect(file2.model !== file.model).toBe(true);
59-
expect(file.log.start.view()).toEqual(undefined);
59+
expect(file.log.start().view()).toEqual(undefined);
6060
expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'});
6161
});
6262

@@ -66,7 +66,7 @@ describe('.toBinary()', () => {
6666
const file2 = File.fromSeqCbor(blob);
6767
expect(file2.model.view()).toEqual({foo: 'bar'});
6868
expect(file2.model !== file.model).toBe(true);
69-
expect(file.log.start.view()).toEqual(undefined);
69+
expect(file.log.start().view()).toEqual(undefined);
7070
expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'});
7171
});
7272
});
@@ -78,7 +78,7 @@ describe('.toBinary()', () => {
7878
params.format === 'seq.cbor' ? File.fromSeqCbor(blob, fileEncoders) : File.fromNdjson(blob, fileEncoders);
7979
expect(file2.model.view()).toEqual(file.model.view());
8080
expect(file2.model !== file.model).toBe(true);
81-
expect(file2.log.start.view()).toEqual(undefined);
81+
expect(file2.log.start().view()).toEqual(undefined);
8282
expect(file2.log.replayToEnd().view()).toEqual(file.model.view());
8383
expect(file2.log.patches.size()).toBe(file.log.patches.size());
8484
};

src/json-crdt/file/__tests__/PatchLog.spec.ts

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/json-crdt/history/PatchLog.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import {FanOutUnsubscribe} from 'thingies/es2020/fanout';
2+
import {ITimestampStruct, Patch, compare} from '../../json-crdt-patch';
3+
import {printTree} from '../../util/print/printTree';
4+
import {AvlMap} from '../../util/trees/avl/AvlMap';
5+
import {Model} from '../model';
6+
import {first, next} from '../../util/trees/util';
7+
import type {Printable} from '../../util/print/types';
8+
9+
export class PatchLog implements Printable {
10+
/**
11+
* Creates a `PatchLog` instance from a newly JSON CRDT model. Checks if
12+
* the model API buffer has any initial operations applied, if yes, it
13+
* uses them to create the initial state of the log.
14+
*
15+
* @param model A new JSON CRDT model, just created with
16+
* `Model.withLogicalClock()` or `Model.withServerClock()`.
17+
* @returns A new `PatchLog` instance.
18+
*/
19+
public static fromNewModel(model: Model<any>): PatchLog {
20+
const clock = model.clock.clone();
21+
const log = new PatchLog(() => new Model(clock));
22+
const api = model.api;
23+
if (api.builder.patch.ops.length) log.end.applyPatch(api.flush());
24+
return log;
25+
}
26+
27+
/**
28+
* Model factory function that creates a new JSON CRDT model instance, which
29+
* is used as the starting point of the log. It is called every time a new
30+
* model is needed to replay the log.
31+
*
32+
* @readonly Internally this function may be updated, but externally it is
33+
* read-only.
34+
*/
35+
public start: () => Model;
36+
37+
/**
38+
* The end of the log, the current state of the document. It is the model
39+
* instance that is used to apply new patches to the log.
40+
*
41+
* @readonly
42+
*/
43+
public readonly end: Model;
44+
45+
/**
46+
* The patches in the log, stored in an AVL tree for efficient replaying. The
47+
* collection of patches which are applied to the `start()` model to reach
48+
* the `end` model.
49+
*
50+
* @readonly
51+
*/
52+
public readonly patches = new AvlMap<ITimestampStruct, Patch>(compare);
53+
54+
private __onPatch: FanOutUnsubscribe;
55+
private __onFlush: FanOutUnsubscribe;
56+
57+
constructor(start: () => Model) {
58+
this.start = start;
59+
const end = (this.end = start());
60+
const onPatch = (patch: Patch) => {
61+
const id = patch.getId();
62+
if (!id) return;
63+
this.patches.set(id, patch);
64+
};
65+
const api = end.api;
66+
this.__onPatch = api.onPatch.listen(onPatch);
67+
this.__onFlush = api.onFlush.listen(onPatch);
68+
}
69+
70+
/**
71+
* Call this method to destroy the `PatchLog` instance. It unsubscribes patch
72+
* and flush listeners from the `end` model and clears the patch log.
73+
*/
74+
public destroy() {
75+
this.__onPatch();
76+
this.__onFlush();
77+
this.patches.clear();
78+
}
79+
80+
/**
81+
* Creates a new model instance using the `start()` factory function and
82+
* replays all patches in the log to reach the current state of the document.
83+
*
84+
* @returns A new model instance with all patches replayed.
85+
*/
86+
public replayToEnd(): Model {
87+
const clone = this.start().clone();
88+
for (let node = first(this.patches.root); node; node = next(node)) clone.applyPatch(node.v);
89+
return clone;
90+
}
91+
92+
/**
93+
* Replays the patch log until a specified timestamp, including the patch
94+
* at the given timestamp. The model returned is a new instance of `start()`
95+
* with patches replayed up to the given timestamp.
96+
*
97+
* @param ts Timestamp ID of the patch to replay to.
98+
* @returns A new model instance with patches replayed up to the given timestamp.
99+
*/
100+
public replayTo(ts: ITimestampStruct): Model {
101+
const clone = this.start().clone();
102+
for (let node = first(this.patches.root); node && compare(ts, node.k) >= 0; node = next(node))
103+
clone.applyPatch(node.v);
104+
return clone;
105+
}
106+
107+
/**
108+
* Advance the start of the log to a specified timestamp, excluding the patch
109+
* at the given timestamp. This method removes all patches from the log that
110+
* are older than the given timestamp and updates the `start()` factory
111+
* function to replay the log from the new start.
112+
*
113+
* @param ts Timestamp ID of the patch to advance to.
114+
*/
115+
public advanceTo(ts: ITimestampStruct): void {
116+
const newStartPatches: Patch[] = [];
117+
let node = first(this.patches.root);
118+
for (; node && compare(ts, node.k) >= 0; node = next(node)) newStartPatches.push(node.v);
119+
for (const patch of newStartPatches) this.patches.del(patch.getId()!);
120+
const oldStart = this.start;
121+
this.start = (): Model => {
122+
const model = oldStart();
123+
for (const patch of newStartPatches) model.applyPatch(patch);
124+
return model;
125+
};
126+
}
127+
128+
// ---------------------------------------------------------------- Printable
129+
130+
public toString(tab?: string) {
131+
const patches: Patch[] = [];
132+
this.patches.forEach(({v}) => patches.push(v));
133+
return (
134+
`log` +
135+
printTree(tab, [
136+
(tab) => `start` + printTree(tab, [(tab) => this.start().toString(tab)]),
137+
() => '',
138+
(tab) =>
139+
'history' +
140+
printTree(
141+
tab,
142+
patches.map((patch, i) => (tab) => `${i}: ${patch.toString(tab)}`),
143+
),
144+
() => '',
145+
(tab) => `end` + printTree(tab, [(tab) => this.end.toString(tab)]),
146+
])
147+
);
148+
}
149+
}

0 commit comments

Comments
 (0)