diff --git a/.changeset/popular-suits-joke.md b/.changeset/popular-suits-joke.md new file mode 100644 index 0000000000..d432994904 --- /dev/null +++ b/.changeset/popular-suits-joke.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +fix: Read data_bytes properly when pruning diff --git a/apps/hubble/src/addon/src/store/cast_store.rs b/apps/hubble/src/addon/src/store/cast_store.rs index 5faa8656c5..b754b3cc96 100644 --- a/apps/hubble/src/addon/src/store/cast_store.rs +++ b/apps/hubble/src/addon/src/store/cast_store.rs @@ -1,5 +1,11 @@ -use super::{hub_error_to_js_throw, make_cast_id_key, make_fid_key, make_user_key, message, store::{Store, StoreDef}, utils::{encode_messages_to_js_object, get_page_options, get_store}, HubError, MessagesPage, PageOptions, RootPrefix, StoreEventHandler, UserPostfix, PAGE_SIZE_MAX, TS_HASH_LENGTH, TRUE_VALUE, HASH_LENGTH, bytes_compare}; -use crate::protos::{CastRemoveBody, message_data}; +use super::{ + bytes_compare, hub_error_to_js_throw, make_cast_id_key, make_fid_key, make_user_key, message, + store::{Store, StoreDef}, + utils::{encode_messages_to_js_object, get_page_options, get_store}, + HubError, MessagesPage, PageOptions, RootPrefix, StoreEventHandler, UserPostfix, HASH_LENGTH, + PAGE_SIZE_MAX, TRUE_VALUE, TS_HASH_LENGTH, +}; +use crate::protos::{message_data, CastRemoveBody}; use crate::{ db::{RocksDB, RocksDbTransactionBatch}, protos::{self, Message, MessageType}, @@ -121,7 +127,9 @@ impl StoreDef for CastStoreDef { // Compare message types to enforce that RemoveWins in case of LWW ties. if (a_type == MessageType::CastRemove as u8) && (b_type == MessageType::CastAdd as u8) { return 1; - } else if (a_type == MessageType::CastAdd as u8) && (b_type == MessageType::CastRemove as u8) { + } else if (a_type == MessageType::CastAdd as u8) + && (b_type == MessageType::CastRemove as u8) + { return -1; } @@ -176,7 +184,9 @@ impl StoreDef for CastStoreDef { fn make_add_key(&self, message: &protos::Message) -> Result, HubError> { let hash = match message.data.as_ref().unwrap().body.as_ref() { Some(message_data::Body::CastAddBody(_)) => message.hash.as_ref(), - Some(message_data::Body::CastRemoveBody(cast_remove_body)) => cast_remove_body.target_hash.as_ref(), + Some(message_data::Body::CastRemoveBody(cast_remove_body)) => { + cast_remove_body.target_hash.as_ref() + } _ => { return Err(HubError { code: "bad_request.validation_failure".to_string(), @@ -193,7 +203,9 @@ impl StoreDef for CastStoreDef { fn make_remove_key(&self, message: &protos::Message) -> Result, HubError> { let hash = match message.data.as_ref().unwrap().body.as_ref() { Some(message_data::Body::CastAddBody(_)) => message.hash.as_ref(), - Some(message_data::Body::CastRemoveBody(cast_remove_body)) => cast_remove_body.target_hash.as_ref(), + Some(message_data::Body::CastRemoveBody(cast_remove_body)) => { + cast_remove_body.target_hash.as_ref() + } _ => { return Err(HubError { code: "bad_request.validation_failure".to_string(), @@ -300,10 +312,7 @@ impl CastStoreDef { } // Generates unique keys used to store or fetch CastAdd messages in the adds set index - pub fn make_cast_adds_key( - fid: u32, - hash: &Vec, - ) -> Vec { + pub fn make_cast_adds_key(fid: u32, hash: &Vec) -> Vec { let mut key = Vec::with_capacity(5 + 1 + 20); key.extend_from_slice(&make_user_key(fid)); @@ -334,10 +343,7 @@ impl CastStoreDef { } // Generates unique keys used to store or fetch CastRemove messages in the removes set index - pub fn make_cast_removes_key( - fid: u32, - hash: &Vec, - ) -> Vec { + pub fn make_cast_removes_key(fid: u32, hash: &Vec) -> Vec { let mut key = Vec::with_capacity(5 + 1 + 20); key.extend_from_slice(&make_user_key(fid)); @@ -374,9 +380,11 @@ impl CastStore { data: Some(protos::MessageData { fid: fid as u64, r#type: MessageType::CastAdd.into(), - body: Some(protos::message_data::Body::CastAddBody(protos::CastAddBody { - ..Default::default() - })), + body: Some(protos::message_data::Body::CastAddBody( + protos::CastAddBody { + ..Default::default() + }, + )), ..Default::default() }), hash, @@ -466,11 +474,7 @@ impl CastStore { fid: u32, page_options: &PageOptions, ) -> Result { - store.get_adds_by_fid( - fid, - page_options, - Some(|message: &Message| true), - ) + store.get_adds_by_fid(fid, page_options, Some(|message: &Message| true)) } pub fn create_cast_store(mut cx: FunctionContext) -> JsResult>> { @@ -499,11 +503,10 @@ impl CastStore { let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; let page_options = get_page_options(&mut cx, 1)?; - let messages = - match Self::get_cast_adds_by_fid(&store, fid, &page_options) { - Ok(messages) => messages, - Err(e) => return hub_error_to_js_throw(&mut cx, e), - }; + let messages = match Self::get_cast_adds_by_fid(&store, fid, &page_options) { + Ok(messages) => messages, + Err(e) => return hub_error_to_js_throw(&mut cx, e), + }; let channel = cx.channel(); let (deferred, promise) = cx.promise(); @@ -519,11 +522,7 @@ impl CastStore { fid: u32, page_options: &PageOptions, ) -> Result { - store.get_removes_by_fid( - fid, - page_options, - Some(|message: &Message| true), - ) + store.get_removes_by_fid(fid, page_options, Some(|message: &Message| true)) } pub fn js_get_cast_removes_by_fid(mut cx: FunctionContext) -> JsResult { @@ -532,11 +531,7 @@ impl CastStore { let fid = cx.argument::(0).unwrap().value(&mut cx) as u32; let page_options = get_page_options(&mut cx, 1)?; - let messages = match Self::get_cast_removes_by_fid( - &store, - fid, - &page_options, - ) { + let messages = match Self::get_cast_removes_by_fid(&store, fid, &page_options) { Ok(messages) => messages, Err(e) => return hub_error_to_js_throw(&mut cx, e), }; @@ -563,12 +558,10 @@ impl CastStore { store .db() .for_each_iterator_by_prefix_unbounded(&prefix, page_options, |key, value| { - let ts_hash_offset = prefix.len(); let fid_offset = ts_hash_offset + TS_HASH_LENGTH; - let fid = - u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); + let fid = u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] .try_into() .unwrap(); @@ -629,11 +622,7 @@ impl CastStore { let page_options = get_page_options(&mut cx, 2)?; - let messages = match Self::get_casts_by_parent( - &store, - &target, - &page_options, - ) { + let messages = match Self::get_casts_by_parent(&store, &target, &page_options) { Ok(messages) => messages, Err(e) => return hub_error_to_js_throw(&mut cx, e), }; @@ -663,8 +652,7 @@ impl CastStore { let ts_hash_offset = prefix.len(); let fid_offset = ts_hash_offset + TS_HASH_LENGTH; - let fid = - u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); + let fid = u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap()); let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH] .try_into() .unwrap(); @@ -703,11 +691,7 @@ impl CastStore { let mention = mention.value(&mut cx) as u32; let page_options = get_page_options(&mut cx, 1)?; - let messages = match Self::get_casts_by_mention( - &store, - mention, - &page_options, - ) { + let messages = match Self::get_casts_by_mention(&store, mention, &page_options) { Ok(messages) => messages, Err(e) => return hub_error_to_js_throw(&mut cx, e), }; diff --git a/apps/hubble/src/addon/src/store/message.rs b/apps/hubble/src/addon/src/store/message.rs index 6e268fb419..71a4601a3f 100644 --- a/apps/hubble/src/addon/src/store/message.rs +++ b/apps/hubble/src/addon/src/store/message.rs @@ -1,7 +1,7 @@ use super::{store::HubError, PageOptions, PAGE_SIZE_MAX}; use crate::{ db::{RocksDB, RocksDbTransactionBatch}, - protos::{self, CastId, Message as MessageProto, MessageType}, + protos::{self, CastId, Message as MessageProto, MessageData, MessageType}, }; use prost::Message as _; use std::convert::TryFrom; @@ -277,7 +277,7 @@ pub fn get_message( // println!("get_message key: {:?}", key); match db.get(&key)? { - Some(bytes) => match MessageProto::decode(bytes.as_slice()) { + Some(bytes) => match message_decode(bytes.as_slice()) { Ok(message) => Ok(Some(message)), Err(_) => Err(HubError { code: "db.internal_error".to_string(), @@ -301,7 +301,7 @@ pub fn get_many_messages( for key in primary_keys { if let Ok(Some(value)) = db.get(&key) { - match MessageProto::decode(value.as_slice()) { + match message_decode(value.as_slice()) { Ok(message) => { messages.push(message); } @@ -336,7 +336,7 @@ where let mut last_key = vec![]; db.for_each_iterator_by_prefix_unbounded(prefix, page_options, |key, value| { - match MessageProto::decode(value) { + match message_decode(value) { Ok(message) => { if filter(&message) { messages.push(message); @@ -368,6 +368,38 @@ where }) } +pub fn message_encode(message: &MessageProto) -> Vec { + if message.data_bytes.is_some() && message.data_bytes.as_ref().unwrap().len() > 0 { + // Clone the message + let mut cloned = message.clone(); + cloned.data = None; + + cloned.encode_to_vec() + } else { + message.encode_to_vec() + } +} + +pub fn message_decode(bytes: &[u8]) -> Result { + if let Ok(mut msg) = MessageProto::decode(bytes) { + if msg.data.is_none() + && msg.data_bytes.is_some() + && msg.data_bytes.as_ref().unwrap().len() > 0 + { + if let Ok(msg_data) = MessageData::decode(msg.data_bytes.as_ref().unwrap().as_slice()) { + msg.data = Some(msg_data); + } + } + + Ok(msg) + } else { + Err(HubError { + code: "db.internal_error".to_string(), + message: "could not decode message".to_string(), + }) + } +} + pub fn put_message_transaction( txn: &mut RocksDbTransactionBatch, message: &MessageProto, @@ -380,8 +412,7 @@ pub fn put_message_transaction( as u8, Some(&ts_hash), ); - txn.put(primary_key, message.encode_to_vec()); - // println!("put_message_transaction primary_key: {:?}", primary_key); + txn.put(primary_key, message_encode(&message)); let by_signer_key = make_message_by_signer_key( message.data.as_ref().unwrap().fid as u32, diff --git a/apps/hubble/src/addon/src/store/store.rs b/apps/hubble/src/addon/src/store/store.rs index 1624bf9af4..3f1a225e26 100644 --- a/apps/hubble/src/addon/src/store/store.rs +++ b/apps/hubble/src/addon/src/store/store.rs @@ -1,6 +1,6 @@ use super::{ bytes_compare, delete_message_transaction, get_message, hub_error_to_js_throw, - make_message_primary_key, message, put_message_transaction, + make_message_primary_key, message, message_decode, put_message_transaction, utils::{self, encode_messages_to_js_object, get_page_options, get_store, vec_to_u8_24}, MessagesPage, StoreEventHandler, TS_HASH_LENGTH, }; @@ -18,9 +18,9 @@ use once_cell::sync::Lazy; use prost::Message as _; use rocksdb; use slog::{o, warn}; -use std::clone::Clone; use std::string::ToString; use std::sync::{Arc, Mutex}; +use std::{clone::Clone, fmt::Display}; use threadpool::ThreadPool; #[derive(Debug)] @@ -45,6 +45,12 @@ impl HubError { } } +impl Display for HubError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.code, self.message) + } +} + /** Convert RocksDB errors */ impl From for HubError { fn from(e: rocksdb::Error) -> HubError { @@ -699,40 +705,12 @@ impl Store { let prune_size_limit = self.store_def.get_prune_size_limit(); let prefix = &make_message_primary_key(fid, self.store_def.postfix(), None); - self.db - .for_each_iterator_by_prefix_unbounded(prefix, &PageOptions::default(), |_key, value| { + self.db.for_each_iterator_by_prefix_unbounded( + prefix, + &PageOptions::default(), + |_key, value| { // Value is a message, so try to decode it - let message = match protos::Message::decode(value) { - Ok(message) => message, - Err(e) => { - return Err(HubError { - code: "bad_request.internal_error".to_string(), - message: e.to_string(), - }) - } - }; - - if message.data.is_none() { - // This shouldn't happen, but if it does, skip it - if message.data_bytes.is_none() { - warn!(self.logger, "Missing message_data: Message data and data_bytes are both missing"; "full_message" => format!("{:?}", message)); - return Ok(false); // Continue the iteration - } - - // Try to interpret the message data - let bytes = message.data_bytes.as_ref().unwrap().as_slice(); - let data = match protos::MessageData::decode(bytes) { - Ok(data) => data, - Err(e) => { - warn!(self.logger, "Missing message_data: : Failed to decode message data"; "full_message" => format!("{:?}", message), "error" => e.to_string()); - return Ok(false); // Continue the iteration - } - }; - - // Print the message data - warn!(self.logger, "Missing message_data: Message data is missing, but data_bytes is present"; "full_message" => format!("{:?}", message), "data" => format!("{:?}", data)); - return Ok(false); // Continue the iteration - } + let message = message_decode(value)?; if count <= (prune_size_limit as u64) * units { return Ok(true); // Stop the iteration, nothing left to prune @@ -762,7 +740,8 @@ impl Store { pruned_events.push(hub_event); Ok(false) // Continue the iteration - })?; + }, + )?; Ok(pruned_events) } @@ -792,8 +771,9 @@ impl Store { pub fn js_merge(mut cx: FunctionContext) -> JsResult { let store = get_store(&mut cx)?; - let message_bytes = cx.argument::(0); - let message = protos::Message::decode(message_bytes.unwrap().as_slice(&cx)); + let message_bytes_result = cx.argument::(0); + let message_bytes = message_bytes_result.unwrap().as_slice(&cx).to_vec(); + let message = Message::decode(message_bytes.as_slice()); // let pool = store.pool.clone(); // pool.lock().unwrap().execute(move || { @@ -829,13 +809,12 @@ impl Store { let store = get_store(&mut cx)?; let message_bytes = cx.argument::(0); - let message = protos::Message::decode(message_bytes.unwrap().as_slice(&cx)); + let message = Message::decode(message_bytes.unwrap().as_slice(&cx)); let result = if message.is_err() { - let e = message.unwrap_err(); Err(HubError { code: "bad_request.validation_failure".to_string(), - message: e.to_string(), + message: message.unwrap_err().to_string(), }) } else { let m = message.unwrap(); diff --git a/apps/hubble/src/addon/src/store/username_proof_store.rs b/apps/hubble/src/addon/src/store/username_proof_store.rs index 3a6925b055..b537d366d5 100644 --- a/apps/hubble/src/addon/src/store/username_proof_store.rs +++ b/apps/hubble/src/addon/src/store/username_proof_store.rs @@ -97,13 +97,6 @@ impl StoreDef for UsernameProofStoreDef { } let data = message.data.as_ref().unwrap(); - if data.body.is_none() { - return Err(HubError { - code: "bad_request.validation_failure".to_string(), - message: "Message body is missing".to_string(), - }); - } - if let Some(Body::UsernameProofBody(body)) = &data.body { if body.name.len() == 0 { return Err(HubError { @@ -121,7 +114,7 @@ impl StoreDef for UsernameProofStoreDef { } else { Err(HubError { code: "bad_request.validation_failure".to_string(), - message: "Message body is missing".to_string(), + message: "Message body is missing or incorrect".to_string(), }) } } @@ -140,13 +133,6 @@ impl StoreDef for UsernameProofStoreDef { } let data = message.data.as_ref().unwrap(); - if data.body.is_none() { - return Err(HubError { - code: "bad_request.validation_failure".to_string(), - message: "Message body is missing".to_string(), - }); - } - if let Some(Body::UsernameProofBody(body)) = &data.body { if body.name.len() == 0 { return Err(HubError { @@ -161,7 +147,7 @@ impl StoreDef for UsernameProofStoreDef { } else { Err(HubError { code: "bad_request.validation_failure".to_string(), - message: "Message body is missing".to_string(), + message: "Message data body is missing or incorrect".to_string(), }) } } @@ -180,19 +166,12 @@ impl StoreDef for UsernameProofStoreDef { } let data = message.data.as_ref().unwrap(); - if data.body.is_none() { - return Err(HubError { - code: "bad_request.validation_failure".to_string(), - message: "Message body is missing".to_string(), - }); - } - let name = match &data.body { Some(Body::UsernameProofBody(body)) => &body.name, _ => { return Err(HubError { code: "bad_request.validation_failure".to_string(), - message: "Message body is missing".to_string(), + message: "Message data body is missing".to_string(), }) } }; diff --git a/apps/hubble/src/addon/src/store/verification_store.rs b/apps/hubble/src/addon/src/store/verification_store.rs index 3dfbaf6e37..080e5fb933 100644 --- a/apps/hubble/src/addon/src/store/verification_store.rs +++ b/apps/hubble/src/addon/src/store/verification_store.rs @@ -1,5 +1,6 @@ use super::{ - get_message, hub_error_to_js_throw, make_fid_key, make_ts_hash, make_user_key, read_fid_key, + get_message, hub_error_to_js_throw, make_fid_key, make_ts_hash, make_user_key, message_decode, + read_fid_key, store::{Store, StoreDef}, utils::{self, encode_messages_to_js_object, get_page_options, get_store}, HubError, MessagesPage, PageOptions, RootPrefix, StoreEventHandler, UserPostfix, FID_BYTES, @@ -444,7 +445,7 @@ impl VerificationStore { fid: u32, page_options: &PageOptions, ) -> Result { - store.get_removes_by_fid(fid, page_options, Some(|message: &Message| true)) + store.get_removes_by_fid(fid, page_options, Some(|_message: &Message| true)) } pub fn js_get_verification_removes_by_fid(mut cx: FunctionContext) -> JsResult { @@ -482,7 +483,7 @@ impl VerificationStore { return Ok(false); // Ignore non-verification messages } - let message = match Message::decode(value) { + let message = match message_decode(value) { Ok(message) => message, Err(_) => return Ok(false), // Ignore invalid messages }; diff --git a/apps/hubble/src/eth/fnameRegistryEventsProvider.ts b/apps/hubble/src/eth/fnameRegistryEventsProvider.ts index 1938a46e21..6ece03d48f 100644 --- a/apps/hubble/src/eth/fnameRegistryEventsProvider.ts +++ b/apps/hubble/src/eth/fnameRegistryEventsProvider.ts @@ -154,7 +154,7 @@ export class FNameRegistryEventsProvider { try { return await this.client.getTransfers(params); } catch (err) { - log.error(err, `Failed to get transfers ${params}`); + log.error({ err, params }, "Failed to get transfers from fname registry"); return []; } } diff --git a/apps/hubble/src/network/p2p/gossipNode.ts b/apps/hubble/src/network/p2p/gossipNode.ts index f3b2d06d3f..efe6dfd843 100644 --- a/apps/hubble/src/network/p2p/gossipNode.ts +++ b/apps/hubble/src/network/p2p/gossipNode.ts @@ -282,7 +282,7 @@ export class GossipNode extends TypedEmitter { if (result.isOk()) { log.info({ peerIdStr, addr }, "Connected to peer from DB"); } else { - log.error({ peerIdStr, addr, error: result.error }, "Failed to connect to peer from DB"); + log.warn({ peerIdStr, addr, error: result.error }, "Failed to connect to peer from DB"); } // Sleep for a bit to avoid overwhelming the network diff --git a/apps/hubble/src/storage/engine/messageDataBytes.test.ts b/apps/hubble/src/storage/engine/messageDataBytes.test.ts index 13d8745654..eeb5218218 100644 --- a/apps/hubble/src/storage/engine/messageDataBytes.test.ts +++ b/apps/hubble/src/storage/engine/messageDataBytes.test.ts @@ -8,10 +8,17 @@ import { bytesCompare, bytesDecrement, } from "@farcaster/hub-nodejs"; -import { ensureMessageData, messageDecode, messageEncode } from "../db/message.js"; +import { + ensureMessageData, + makeMessagePrimaryKeyFromMessage, + makeUserKey, + messageDecode, + messageEncode, +} from "../db/message.js"; import { jestRocksDB } from "../db/jestUtils.js"; import Engine from "./index.js"; import { blake3Truncate160 } from "../../utils/crypto.js"; +import { UserPostfix } from "../db/types.js"; const db = jestRocksDB("protobufs.messageDataBytes.test"); const network = FarcasterNetwork.TESTNET; @@ -91,9 +98,20 @@ describe("messageDataBytes", () => { const result = await engine.mergeMessage(ensureMessageData(castAddClone)); expect(result.isOk()).toBeTruthy(); + // Make sure that in the DB, the data field is erased, and only data bytes exist + const castAddKey = makeMessagePrimaryKeyFromMessage(castAddClone); + const castAddBytes = await db.get(castAddKey); + expect(castAddBytes).toBeDefined(); + + const castAddDecoded = Message.decode(castAddBytes); + expect(castAddDecoded.data).toBeUndefined(); + expect(bytesCompare(castAddDecoded.dataBytes as Uint8Array, castAddClone.dataBytes)).toEqual(0); + + // Then, get it via the engine. The castAdd should be fetched correctly and the data body should be populated const fetched = await engine.getCast(fid, castAdd.hash); expect(fetched.isOk()).toBeTruthy(); + expect(fetched._unsafeUnwrap()).toBeDefined(); expect(MessageData.toJSON(fetched._unsafeUnwrap().data)).toEqual(MessageData.toJSON(castAdd.data)); }); diff --git a/apps/hubble/src/storage/stores/castStore.ts b/apps/hubble/src/storage/stores/castStore.ts index b81a59212e..0c1152d4e2 100644 --- a/apps/hubble/src/storage/stores/castStore.ts +++ b/apps/hubble/src/storage/stores/castStore.ts @@ -7,9 +7,8 @@ import { Message, } from "@farcaster/hub-nodejs"; import { ResultAsync } from "neverthrow"; -import { makeCastIdKey, makeFidKey, makeUserKey } from "../db/message.js"; import RocksDB from "../db/rocksdb.js"; -import { RootPrefix, UserPostfix } from "../db/types.js"; +import { UserPostfix } from "../db/types.js"; import { MessagesPage, PageOptions, StorePruneOptions } from "../stores/types.js"; import { RustStoreBase } from "./rustStoreBase.js"; import StoreEventHandler from "./storeEventHandler.js";