Skip to content

Commit

Permalink
fix: Read data_bytes properly when pruning (farcasterxyz#1828)
Browse files Browse the repository at this point in the history
## Motivation

When pruning, read the data_bytes properly to avoid warnings


## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [X] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [X] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [X] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [X] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

<!-- start pr-codex -->

---

## PR-Codex overview
This PR fixes data handling issues in various modules. It ensures proper
data decoding and storage consistency.

### Detailed summary
- Improved error handling for missing message data
- Updated data decoding and encoding methods
- Enhanced logging for data retrieval and storage

> The following files were skipped due to too many changes:
`apps/hubble/src/addon/src/store/store.rs`,
`apps/hubble/src/addon/src/store/cast_store.rs`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
adityapk00 authored Mar 20, 2024
1 parent 7060319 commit 0b52328
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 130 deletions.
5 changes: 5 additions & 0 deletions .changeset/popular-suits-joke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Read data_bytes properly when pruning
86 changes: 35 additions & 51 deletions apps/hubble/src/addon/src/store/cast_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use super::{hub_error_to_js_throw, make_cast_id_key, make_fid_key, make_user_key, message, store::{Store, StoreDef}, utils::{encode_messages_to_js_object, get_page_options, get_store}, HubError, MessagesPage, PageOptions, RootPrefix, StoreEventHandler, UserPostfix, PAGE_SIZE_MAX, TS_HASH_LENGTH, TRUE_VALUE, HASH_LENGTH, bytes_compare};
use crate::protos::{CastRemoveBody, message_data};
use super::{
bytes_compare, hub_error_to_js_throw, make_cast_id_key, make_fid_key, make_user_key, message,
store::{Store, StoreDef},
utils::{encode_messages_to_js_object, get_page_options, get_store},
HubError, MessagesPage, PageOptions, RootPrefix, StoreEventHandler, UserPostfix, HASH_LENGTH,
PAGE_SIZE_MAX, TRUE_VALUE, TS_HASH_LENGTH,
};
use crate::protos::{message_data, CastRemoveBody};
use crate::{
db::{RocksDB, RocksDbTransactionBatch},
protos::{self, Message, MessageType},
Expand Down Expand Up @@ -121,7 +127,9 @@ impl StoreDef for CastStoreDef {
// Compare message types to enforce that RemoveWins in case of LWW ties.
if (a_type == MessageType::CastRemove as u8) && (b_type == MessageType::CastAdd as u8) {
return 1;
} else if (a_type == MessageType::CastAdd as u8) && (b_type == MessageType::CastRemove as u8) {
} else if (a_type == MessageType::CastAdd as u8)
&& (b_type == MessageType::CastRemove as u8)
{
return -1;
}

Expand Down Expand Up @@ -176,7 +184,9 @@ impl StoreDef for CastStoreDef {
fn make_add_key(&self, message: &protos::Message) -> Result<Vec<u8>, HubError> {
let hash = match message.data.as_ref().unwrap().body.as_ref() {
Some(message_data::Body::CastAddBody(_)) => message.hash.as_ref(),
Some(message_data::Body::CastRemoveBody(cast_remove_body)) => cast_remove_body.target_hash.as_ref(),
Some(message_data::Body::CastRemoveBody(cast_remove_body)) => {
cast_remove_body.target_hash.as_ref()
}
_ => {
return Err(HubError {
code: "bad_request.validation_failure".to_string(),
Expand All @@ -193,7 +203,9 @@ impl StoreDef for CastStoreDef {
fn make_remove_key(&self, message: &protos::Message) -> Result<Vec<u8>, HubError> {
let hash = match message.data.as_ref().unwrap().body.as_ref() {
Some(message_data::Body::CastAddBody(_)) => message.hash.as_ref(),
Some(message_data::Body::CastRemoveBody(cast_remove_body)) => cast_remove_body.target_hash.as_ref(),
Some(message_data::Body::CastRemoveBody(cast_remove_body)) => {
cast_remove_body.target_hash.as_ref()
}
_ => {
return Err(HubError {
code: "bad_request.validation_failure".to_string(),
Expand Down Expand Up @@ -300,10 +312,7 @@ impl CastStoreDef {
}

// Generates unique keys used to store or fetch CastAdd messages in the adds set index
pub fn make_cast_adds_key(
fid: u32,
hash: &Vec<u8>,
) -> Vec<u8> {
pub fn make_cast_adds_key(fid: u32, hash: &Vec<u8>) -> Vec<u8> {
let mut key = Vec::with_capacity(5 + 1 + 20);

key.extend_from_slice(&make_user_key(fid));
Expand Down Expand Up @@ -334,10 +343,7 @@ impl CastStoreDef {
}

// Generates unique keys used to store or fetch CastRemove messages in the removes set index
pub fn make_cast_removes_key(
fid: u32,
hash: &Vec<u8>,
) -> Vec<u8> {
pub fn make_cast_removes_key(fid: u32, hash: &Vec<u8>) -> Vec<u8> {
let mut key = Vec::with_capacity(5 + 1 + 20);

key.extend_from_slice(&make_user_key(fid));
Expand Down Expand Up @@ -374,9 +380,11 @@ impl CastStore {
data: Some(protos::MessageData {
fid: fid as u64,
r#type: MessageType::CastAdd.into(),
body: Some(protos::message_data::Body::CastAddBody(protos::CastAddBody {
..Default::default()
})),
body: Some(protos::message_data::Body::CastAddBody(
protos::CastAddBody {
..Default::default()
},
)),
..Default::default()
}),
hash,
Expand Down Expand Up @@ -466,11 +474,7 @@ impl CastStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_adds_by_fid(
fid,
page_options,
Some(|message: &Message| true),
)
store.get_adds_by_fid(fid, page_options, Some(|message: &Message| true))
}

pub fn create_cast_store(mut cx: FunctionContext) -> JsResult<JsBox<Arc<Store>>> {
Expand Down Expand Up @@ -499,11 +503,10 @@ impl CastStore {
let fid = cx.argument::<JsNumber>(0).unwrap().value(&mut cx) as u32;
let page_options = get_page_options(&mut cx, 1)?;

let messages =
match Self::get_cast_adds_by_fid(&store, fid, &page_options) {
Ok(messages) => messages,
Err(e) => return hub_error_to_js_throw(&mut cx, e),
};
let messages = match Self::get_cast_adds_by_fid(&store, fid, &page_options) {
Ok(messages) => messages,
Err(e) => return hub_error_to_js_throw(&mut cx, e),
};

let channel = cx.channel();
let (deferred, promise) = cx.promise();
Expand All @@ -519,11 +522,7 @@ impl CastStore {
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_removes_by_fid(
fid,
page_options,
Some(|message: &Message| true),
)
store.get_removes_by_fid(fid, page_options, Some(|message: &Message| true))
}

pub fn js_get_cast_removes_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
Expand All @@ -532,11 +531,7 @@ impl CastStore {
let fid = cx.argument::<JsNumber>(0).unwrap().value(&mut cx) as u32;
let page_options = get_page_options(&mut cx, 1)?;

let messages = match Self::get_cast_removes_by_fid(
&store,
fid,
&page_options,
) {
let messages = match Self::get_cast_removes_by_fid(&store, fid, &page_options) {
Ok(messages) => messages,
Err(e) => return hub_error_to_js_throw(&mut cx, e),
};
Expand All @@ -563,12 +558,10 @@ impl CastStore {
store
.db()
.for_each_iterator_by_prefix_unbounded(&prefix, page_options, |key, value| {

let ts_hash_offset = prefix.len();
let fid_offset = ts_hash_offset + TS_HASH_LENGTH;

let fid =
u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap());
let fid = u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap());
let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH]
.try_into()
.unwrap();
Expand Down Expand Up @@ -629,11 +622,7 @@ impl CastStore {

let page_options = get_page_options(&mut cx, 2)?;

let messages = match Self::get_casts_by_parent(
&store,
&target,
&page_options,
) {
let messages = match Self::get_casts_by_parent(&store, &target, &page_options) {
Ok(messages) => messages,
Err(e) => return hub_error_to_js_throw(&mut cx, e),
};
Expand Down Expand Up @@ -663,8 +652,7 @@ impl CastStore {
let ts_hash_offset = prefix.len();
let fid_offset = ts_hash_offset + TS_HASH_LENGTH;

let fid =
u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap());
let fid = u32::from_be_bytes(key[fid_offset..fid_offset + 4].try_into().unwrap());
let ts_hash = key[ts_hash_offset..ts_hash_offset + TS_HASH_LENGTH]
.try_into()
.unwrap();
Expand Down Expand Up @@ -703,11 +691,7 @@ impl CastStore {
let mention = mention.value(&mut cx) as u32;
let page_options = get_page_options(&mut cx, 1)?;

let messages = match Self::get_casts_by_mention(
&store,
mention,
&page_options,
) {
let messages = match Self::get_casts_by_mention(&store, mention, &page_options) {
Ok(messages) => messages,
Err(e) => return hub_error_to_js_throw(&mut cx, e),
};
Expand Down
43 changes: 37 additions & 6 deletions apps/hubble/src/addon/src/store/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{store::HubError, PageOptions, PAGE_SIZE_MAX};
use crate::{
db::{RocksDB, RocksDbTransactionBatch},
protos::{self, CastId, Message as MessageProto, MessageType},
protos::{self, CastId, Message as MessageProto, MessageData, MessageType},
};
use prost::Message as _;
use std::convert::TryFrom;
Expand Down Expand Up @@ -277,7 +277,7 @@ pub fn get_message(
// println!("get_message key: {:?}", key);

match db.get(&key)? {
Some(bytes) => match MessageProto::decode(bytes.as_slice()) {
Some(bytes) => match message_decode(bytes.as_slice()) {
Ok(message) => Ok(Some(message)),
Err(_) => Err(HubError {
code: "db.internal_error".to_string(),
Expand All @@ -301,7 +301,7 @@ pub fn get_many_messages(

for key in primary_keys {
if let Ok(Some(value)) = db.get(&key) {
match MessageProto::decode(value.as_slice()) {
match message_decode(value.as_slice()) {
Ok(message) => {
messages.push(message);
}
Expand Down Expand Up @@ -336,7 +336,7 @@ where
let mut last_key = vec![];

db.for_each_iterator_by_prefix_unbounded(prefix, page_options, |key, value| {
match MessageProto::decode(value) {
match message_decode(value) {
Ok(message) => {
if filter(&message) {
messages.push(message);
Expand Down Expand Up @@ -368,6 +368,38 @@ where
})
}

pub fn message_encode(message: &MessageProto) -> Vec<u8> {
if message.data_bytes.is_some() && message.data_bytes.as_ref().unwrap().len() > 0 {
// Clone the message
let mut cloned = message.clone();
cloned.data = None;

cloned.encode_to_vec()
} else {
message.encode_to_vec()
}
}

pub fn message_decode(bytes: &[u8]) -> Result<MessageProto, HubError> {
if let Ok(mut msg) = MessageProto::decode(bytes) {
if msg.data.is_none()
&& msg.data_bytes.is_some()
&& msg.data_bytes.as_ref().unwrap().len() > 0
{
if let Ok(msg_data) = MessageData::decode(msg.data_bytes.as_ref().unwrap().as_slice()) {
msg.data = Some(msg_data);
}
}

Ok(msg)
} else {
Err(HubError {
code: "db.internal_error".to_string(),
message: "could not decode message".to_string(),
})
}
}

pub fn put_message_transaction(
txn: &mut RocksDbTransactionBatch,
message: &MessageProto,
Expand All @@ -380,8 +412,7 @@ pub fn put_message_transaction(
as u8,
Some(&ts_hash),
);
txn.put(primary_key, message.encode_to_vec());
// println!("put_message_transaction primary_key: {:?}", primary_key);
txn.put(primary_key, message_encode(&message));

let by_signer_key = make_message_by_signer_key(
message.data.as_ref().unwrap().fid as u32,
Expand Down
61 changes: 20 additions & 41 deletions apps/hubble/src/addon/src/store/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
bytes_compare, delete_message_transaction, get_message, hub_error_to_js_throw,
make_message_primary_key, message, put_message_transaction,
make_message_primary_key, message, message_decode, put_message_transaction,
utils::{self, encode_messages_to_js_object, get_page_options, get_store, vec_to_u8_24},
MessagesPage, StoreEventHandler, TS_HASH_LENGTH,
};
Expand All @@ -18,9 +18,9 @@ use once_cell::sync::Lazy;
use prost::Message as _;
use rocksdb;
use slog::{o, warn};
use std::clone::Clone;
use std::string::ToString;
use std::sync::{Arc, Mutex};
use std::{clone::Clone, fmt::Display};
use threadpool::ThreadPool;

#[derive(Debug)]
Expand All @@ -45,6 +45,12 @@ impl HubError {
}
}

impl Display for HubError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.code, self.message)
}
}

/** Convert RocksDB errors */
impl From<rocksdb::Error> for HubError {
fn from(e: rocksdb::Error) -> HubError {
Expand Down Expand Up @@ -699,40 +705,12 @@ impl Store {
let prune_size_limit = self.store_def.get_prune_size_limit();

let prefix = &make_message_primary_key(fid, self.store_def.postfix(), None);
self.db
.for_each_iterator_by_prefix_unbounded(prefix, &PageOptions::default(), |_key, value| {
self.db.for_each_iterator_by_prefix_unbounded(
prefix,
&PageOptions::default(),
|_key, value| {
// Value is a message, so try to decode it
let message = match protos::Message::decode(value) {
Ok(message) => message,
Err(e) => {
return Err(HubError {
code: "bad_request.internal_error".to_string(),
message: e.to_string(),
})
}
};

if message.data.is_none() {
// This shouldn't happen, but if it does, skip it
if message.data_bytes.is_none() {
warn!(self.logger, "Missing message_data: Message data and data_bytes are both missing"; "full_message" => format!("{:?}", message));
return Ok(false); // Continue the iteration
}

// Try to interpret the message data
let bytes = message.data_bytes.as_ref().unwrap().as_slice();
let data = match protos::MessageData::decode(bytes) {
Ok(data) => data,
Err(e) => {
warn!(self.logger, "Missing message_data: : Failed to decode message data"; "full_message" => format!("{:?}", message), "error" => e.to_string());
return Ok(false); // Continue the iteration
}
};

// Print the message data
warn!(self.logger, "Missing message_data: Message data is missing, but data_bytes is present"; "full_message" => format!("{:?}", message), "data" => format!("{:?}", data));
return Ok(false); // Continue the iteration
}
let message = message_decode(value)?;

if count <= (prune_size_limit as u64) * units {
return Ok(true); // Stop the iteration, nothing left to prune
Expand Down Expand Up @@ -762,7 +740,8 @@ impl Store {
pruned_events.push(hub_event);

Ok(false) // Continue the iteration
})?;
},
)?;

Ok(pruned_events)
}
Expand Down Expand Up @@ -792,8 +771,9 @@ impl Store {
pub fn js_merge(mut cx: FunctionContext) -> JsResult<JsPromise> {
let store = get_store(&mut cx)?;

let message_bytes = cx.argument::<JsBuffer>(0);
let message = protos::Message::decode(message_bytes.unwrap().as_slice(&cx));
let message_bytes_result = cx.argument::<JsBuffer>(0);
let message_bytes = message_bytes_result.unwrap().as_slice(&cx).to_vec();
let message = Message::decode(message_bytes.as_slice());

// let pool = store.pool.clone();
// pool.lock().unwrap().execute(move || {
Expand Down Expand Up @@ -829,13 +809,12 @@ impl Store {
let store = get_store(&mut cx)?;

let message_bytes = cx.argument::<JsBuffer>(0);
let message = protos::Message::decode(message_bytes.unwrap().as_slice(&cx));
let message = Message::decode(message_bytes.unwrap().as_slice(&cx));

let result = if message.is_err() {
let e = message.unwrap_err();
Err(HubError {
code: "bad_request.validation_failure".to_string(),
message: e.to_string(),
message: message.unwrap_err().to_string(),
})
} else {
let m = message.unwrap();
Expand Down
Loading

0 comments on commit 0b52328

Please sign in to comment.