Skip to content

Commit 40b6c96

Browse files
committed
Integrate latest flume api, fix bug with contacts update.
1 parent 1666fb1 commit 40b6c96

File tree

8 files changed

+49
-30
lines changed

8 files changed

+49
-30
lines changed

native/Cargo.lock

+11-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ private-box = "0.4.5"
2222
base64 = "0.10.0"
2323
itertools = "0.8.0"
2424
node_napi = { git = "https://github.com/sunrise-choir/node-napi" }
25-
flumedb = { git = "https://github.com/sunrise-choir/flumedb-rs" }
25+
flumedb = { git = "https://github.com/sunrise-choir/flumedb-rs", version = "0.1.1" }
2626

2727
[dependencies.rusqlite]
2828
version = "0.16.0"

native/benches/bench.rs

+8-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ extern crate ssb_sql_napi;
1414

1515
use base64::{decode, encode};
1616
use flumedb::flume_log::FlumeLog;
17-
use flumedb::offset_log::OffsetCodec;
1817
use flumedb::offset_log::OffsetLogIter;
1918
use itertools::Itertools;
2019
use private_box::SecretKey;
@@ -30,9 +29,9 @@ fn create_test_db(num_entries: usize, offset_filename: &str, db_filename: &str)
3029

3130
let file = std::fs::File::open(offset_filename.to_string()).unwrap();
3231

33-
OffsetLogIter::<u32, std::fs::File>::new(file)
32+
OffsetLogIter::<u32>::new(file)
3433
.take(num_entries)
35-
.map(|data| (data.id, data.data_buffer))
34+
.map(|data| (data.offset, data.data))
3635
.chunks(NUM_ENTRIES as usize)
3736
.into_iter()
3837
.for_each(|chunk| {
@@ -53,8 +52,8 @@ fn flume_view_sql_insert_piets_entire_log(c: &mut Criterion) {
5352
let mut view = FlumeViewSql::new(db_filename, keys, "").unwrap();
5453

5554
let file = std::fs::File::open(offset_filename.to_string()).unwrap();
56-
OffsetLogIter::<u32, std::fs::File>::new(file)
57-
.map(|data| (data.id, data.data_buffer))
55+
OffsetLogIter::<u32>::new(file)
56+
.map(|data| (data.offset, data.data))
5857
.chunks(NUM_ENTRIES as usize)
5958
.into_iter()
6059
.for_each(|chunk| {
@@ -85,8 +84,8 @@ fn flume_view_sql_insert_piets_entire_log_with_decryption(c: &mut Criterion) {
8584
let mut view = FlumeViewSql::new(db_filename, keys, "").unwrap();
8685

8786
let file = std::fs::File::open(offset_filename.to_string()).unwrap();
88-
OffsetLogIter::<u32, std::fs::File>::new(file)
89-
.map(|data| (data.id, data.data_buffer))
87+
OffsetLogIter::<u32>::new(file)
88+
.map(|data| (data.offset, data.data))
9089
.chunks(NUM_ENTRIES as usize)
9190
.into_iter()
9291
.for_each(|chunk| {
@@ -111,9 +110,9 @@ fn flume_view_sql_insert(c: &mut Criterion) {
111110

112111
//TODO: this is ok for a benchmark but uses lots of memory.
113112
//Better would be to create a transaction and then append in a for_each loop.
114-
OffsetLogIter::<u32, std::fs::File>::new(file)
113+
OffsetLogIter::<u32>::new(file)
115114
.take(NUM_ENTRIES as usize)
116-
.map(|data| (data.id, data.data_buffer))
115+
.map(|data| (data.offset, data.data))
117116
.chunks(NUM_ENTRIES as usize)
118117
.into_iter()
119118
.for_each(|chunk| {

native/src/flume_view_sql/contacts.rs

+23-10
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ pub fn insert_or_update_contacts(
2525
is_decrypted: bool,
2626
) {
2727
if let Value::String(contact) = &message.value.content["contact"] {
28+
//Ok what should this do:
29+
// - if the record already exists
30+
// - delete it if the new state is zero (this should only happen when record already
31+
// exists because you can't unfollow someone you already don't follow.
32+
// - update it if the new state is 1 or -1
33+
// - else create the new record. State should be a 1 or a -1
2834
let is_blocking = message.value.content["blocking"].as_bool().unwrap_or(false);
2935
let is_following = message.value.content["following"]
3036
.as_bool()
@@ -38,20 +44,27 @@ pub fn insert_or_update_contacts(
3844
};
3945

4046
let author_id = find_or_create_author(&connection, &message.value.author).unwrap();
41-
let mut insert_contacts_stmt = connection
42-
.prepare_cached("REPLACE INTO contacts_raw (author_id, contact_author_id, state, is_decrypted) VALUES (?, ?, ?, ?)")
43-
.unwrap();
4447
let contact_author_id = find_or_create_author(&connection, contact).unwrap();
4548

46-
insert_contacts_stmt
47-
.execute(&[
48-
&author_id,
49-
&contact_author_id,
50-
&state,
51-
&is_decrypted as &ToSql,
52-
])
49+
let mut stmt = connection.prepare_cached("SELECT id FROM contacts_raw WHERE author_id = ? AND contact_author_id = ? AND is_decrypted = ?").unwrap();
50+
51+
stmt.query_row(&[&author_id, &contact_author_id, &is_decrypted as &ToSql], |row| row.get(0))
52+
.and_then(|id: i64|{
53+
//Row exists so update
54+
connection
55+
.prepare_cached("UPDATE contacts_raw SET state = ? WHERE id = ?")
56+
.map(|mut stmt| stmt.execute(&[&state, &id]))
57+
})
58+
.or_else(|_| {
59+
//Row didn't exist so insert
60+
connection
61+
.prepare_cached("INSERT INTO contacts_raw (author_id, contact_author_id, is_decrypted, state) VALUES (?, ?, ?, ?)")
62+
.map(|mut stmt| stmt.execute(&[&author_id, &contact_author_id, &is_decrypted as &ToSql, &state]))
63+
})
64+
.unwrap()
5365
.unwrap();
5466
}
67+
5568
}
5669

5770
pub fn create_contacts_indices(connection: &Connection) -> Result<usize, Error> {

native/src/flume_view_sql/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ fn append_item(
258258

259259
let message_key_id = find_or_create_key(&connection, &message.key).unwrap();
260260

261+
// votes are a kind of backlink, but we want to put them in their own table.
261262
match &message.value.content["type"] {
262263
Value::String(type_string) if type_string == "vote" => {
263264
insert_or_update_votes(connection, &message);

native/src/flume_view_sql/queries.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ mod test {
103103

104104
let file = std::fs::File::open(offset_filename.to_string()).unwrap();
105105

106-
OffsetLogIter::<u32, std::fs::File>::new(file)
106+
OffsetLogIter::<u32>::new(file)
107107
.take(num_entries)
108-
.map(|data| (data.id, data.data_buffer))
108+
.map(|data| (data.offset, data.data))
109109
.chunks(1000 as usize)
110110
.into_iter()
111111
.for_each(|chunk| {

native/src/flume_view_sql/votes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub fn insert_or_update_votes(connection: &Connection, message: &SsbMessage) {
2323

2424
if value.as_i64().unwrap() == 1 {
2525
connection
26-
.prepare_cached("REPLACE INTO votes_raw (link_from_author_id, link_to_key_id) VALUES (?, ?)")
26+
.prepare_cached("INSERT INTO votes_raw (link_from_author_id, link_to_key_id) VALUES (?, ?)")
2727
.unwrap()
2828
.execute(&[&author_id, &link_to_key_id])
2929
.unwrap();

native/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ impl SsbQuery {
7474
n => n as usize,
7575
};
7676

77-
OffsetLogIter::<u32, std::fs::File>::with_starting_offset(file, latest)
77+
OffsetLogIter::<u32>::with_starting_offset(file, latest)
7878
.skip(num_to_skip)
7979
.take(items_to_take)
80-
.map(|data| (data.id + latest, data.data_buffer)) //TODO log_latest might not be the right thing
80+
.map(|data| (data.offset + latest, data.data)) //TODO log_latest might not be the right thing
8181
.chunks(1000)
8282
.into_iter()
8383
.for_each(|chunk| {

0 commit comments

Comments
 (0)