Skip to content

Commit 84ad6cf

Browse files
committed
WIP log based debugging, catch situation where current snapshot is being deleted
1 parent dc3d23c commit 84ad6cf

File tree

2 files changed

+43
-15
lines changed

2 files changed

+43
-15
lines changed

src/fs_store.rs

+26-5
Original file line numberDiff line numberDiff line change
@@ -157,28 +157,40 @@ impl FsStore {
157157
// Load all the data we have into a doc
158158
match Chunks::load(&self.root, id) {
159159
Ok(Some(chunks)) => {
160+
println!("hmm...");
160161
let doc = chunks
161162
.to_doc()
162163
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;
163164

164165
// Write the snapshot
165166
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
166167
let chunk = doc.save();
167-
write_chunk(&self.root, &paths, &chunk, output_chunk_name)?;
168+
println!("Going to write: {:#?}", output_chunk_name);
169+
write_chunk(&self.root, &paths, &chunk, output_chunk_name.clone())?;
168170

169171
// Remove all the old data
170172
for incremental in chunks.incrementals.keys() {
171173
let path = paths.chunk_path(&self.root, incremental);
174+
println!("Removing {:?}", path);
172175
std::fs::remove_file(&path)
173176
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
174177
}
178+
let just_wrote = paths.chunk_path(&self.root, &output_chunk_name);
175179
for snapshot in chunks.snapshots.keys() {
176180
let path = paths.chunk_path(&self.root, snapshot);
181+
println!("Removing Snap {:?}", path);
182+
183+
if path == just_wrote {
184+
tracing::error!("Somehow trying to delete the same path we just wrote to. Not today Satan");
185+
continue;
186+
}
187+
177188
std::fs::remove_file(&path)
178189
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
179190
}
180191
}
181192
Ok(None) => {
193+
println!("No existing files,and compaction requested first");
182194
let output_chunk_name = SavedChunkName {
183195
hash: uuid::Uuid::new_v4().as_bytes().to_vec(),
184196
chunk_type: ChunkType::Snapshot,
@@ -187,6 +199,7 @@ impl FsStore {
187199
write_chunk(&self.root, &paths, full_doc, output_chunk_name)?;
188200
}
189201
Err(e) => {
202+
println!("Error loading chunks for {:?} {}", self.root, id);
190203
tracing::error!(e=%e, "Error loading chunks");
191204
}
192205
}
@@ -219,6 +232,10 @@ fn write_chunk(
219232
// Move the temporary file into a snapshot in the document data directory
220233
// with a name based on the hash of the heads of the document
221234
let output_path = paths.chunk_path(root, &name);
235+
236+
tracing::warn!("Renaming: {:?}", temp_save);
237+
tracing::warn!("To: {:?}", output_path);
238+
222239
std::fs::rename(&temp_save_path, &output_path)
223240
.map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?;
224241

@@ -286,13 +303,13 @@ impl DocIdPaths {
286303
}
287304
}
288305

289-
#[derive(Debug, Hash, PartialEq, Eq)]
306+
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
290307
enum ChunkType {
291308
Snapshot,
292309
Incremental,
293310
}
294311

295-
#[derive(Debug, Hash, PartialEq, Eq)]
312+
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
296313
struct SavedChunkName {
297314
hash: Vec<u8>,
298315
chunk_type: ChunkType,
@@ -355,7 +372,7 @@ impl Chunks {
355372
fn load(root: &Path, doc_id: &DocumentId) -> Result<Option<Self>, Error> {
356373
let doc_id_hash = DocIdPaths::from(doc_id);
357374
let level2_path = doc_id_hash.level2_path(root);
358-
tracing::debug!(
375+
tracing::warn!(
359376
root=%root.display(),
360377
doc_id=?doc_id,
361378
doc_path=%level2_path.display(),
@@ -439,7 +456,11 @@ impl Chunks {
439456
for chunk in self.incrementals.values() {
440457
bytes.extend(chunk);
441458
}
442-
automerge::Automerge::load(&bytes)
459+
460+
automerge::Automerge::load_with_options(
461+
&bytes,
462+
automerge::LoadOptions::new().on_partial_load(automerge::OnPartialLoad::Ignore),
463+
)
443464
}
444465
}
445466

src/repo.rs

+17-10
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ pub(crate) struct DocumentInfo {
557557
change_observers: Vec<RepoFutureResolver<Result<(), RepoError>>>,
558558
/// Counter of local saves since last compact,
559559
/// used to make decisions about full or incemental saves.
560-
saves_since_last_compact: usize,
560+
patches_since_last_compact: usize,
561561
///
562562
allowable_changes_until_compaction: usize,
563563
/// Last heads obtained from the automerge doc.
@@ -580,7 +580,7 @@ impl DocumentInfo {
580580
handle_count,
581581
sync_states: Default::default(),
582582
change_observers: Default::default(),
583-
saves_since_last_compact: 0,
583+
patches_since_last_compact: 0,
584584
allowable_changes_until_compaction: 10,
585585
last_heads,
586586
}
@@ -600,7 +600,7 @@ impl DocumentInfo {
600600
| DocState::Error
601601
| DocState::LoadPending { .. }
602602
| DocState::Bootstrap { .. } => {
603-
assert_eq!(self.saves_since_last_compact, 0);
603+
assert_eq!(self.patches_since_last_compact, 0);
604604
DocState::PendingRemoval(vec![])
605605
}
606606
DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)),
@@ -704,14 +704,18 @@ impl DocumentInfo {
704704
let count = {
705705
let doc = self.document.read();
706706
let changes = doc.automerge.get_changes(&self.last_heads);
707-
println!("last: {:?}, current: {:?}", self.last_heads, doc.automerge.get_heads());
707+
println!(
708+
"last: {:?}, current: {:?}",
709+
self.last_heads,
710+
doc.automerge.get_heads()
711+
);
708712
//self.last_heads = doc.automerge.get_heads();
709713
changes.len()
710714
};
711715
let has_patches = count > 0;
712716
println!("Has patches: {:?}", has_patches);
713-
self.saves_since_last_compact = self
714-
.saves_since_last_compact
717+
self.patches_since_last_compact = self
718+
.patches_since_last_compact
715719
.checked_add(count)
716720
.unwrap_or(0);
717721
has_patches
@@ -735,14 +739,14 @@ impl DocumentInfo {
735739
return;
736740
}
737741
let should_compact =
738-
self.saves_since_last_compact > self.allowable_changes_until_compaction;
742+
self.patches_since_last_compact > self.allowable_changes_until_compaction;
739743
let (storage_fut, new_heads) = if should_compact {
740744
println!("We decided to Compact the document");
741745
let (to_save, new_heads) = {
742746
let doc = self.document.read();
743747
(doc.automerge.save(), doc.automerge.get_heads())
744748
};
745-
self.saves_since_last_compact = 0;
749+
self.patches_since_last_compact = 0;
746750
println!("Since compact is zero");
747751
(storage.compact(document_id.clone(), to_save), new_heads)
748752
} else {
@@ -754,8 +758,11 @@ impl DocumentInfo {
754758
doc.automerge.get_heads(),
755759
)
756760
};
757-
self.saves_since_last_compact.checked_add(1).unwrap_or(0);
758-
println!("Saves since last compact {}", self.saves_since_last_compact);
761+
self.patches_since_last_compact.checked_add(1).unwrap_or(0);
762+
println!(
763+
"Saves since last compact {}",
764+
self.patches_since_last_compact
765+
);
759766
(storage.append(document_id.clone(), to_save), new_heads)
760767
};
761768
match self.state {

0 commit comments

Comments
 (0)