Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
472 changes: 331 additions & 141 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "hyperdrive_lib"
authors = ["Sybil Technologies AG"]
version = "1.9.1"
version = "1.9.2"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://hyperware.ai"
Expand Down
2 changes: 1 addition & 1 deletion hyperdrive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "hyperdrive"
authors = ["Sybil Technologies AG"]
version = "1.9.1"
version = "1.9.2"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://hyperware.ai"
Expand Down
74 changes: 38 additions & 36 deletions hyperdrive/packages/app-store/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,43 +733,45 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
} else {
if message.source().process == "eth:distro:sys" {
let eth_result = serde_json::from_slice::<eth::EthSubResult>(message.body())?;
if let Ok(eth::EthSub { id, result }) = eth_result {
if let Ok(eth::SubscriptionResult::Log(ref log)) =
serde_json::from_value::<eth::SubscriptionResult>(result)
{
// Determine which subscription this is from
// Note: log is Box<eth::Log>, we need to dereference it
let log_ref: &eth::Log = &**log;
let context = if id == SUBSCRIPTION_NUMBER {
LogContext::Hypermap(log_ref.clone())
} else if id == BINDINGS_SUBSCRIPTION {
LogContext::Bindings(log_ref.clone())
} else {
return Ok(false); // Unknown subscription
};
// delay handling of ETH RPC subscriptions by DELAY_MS
// to allow hns to have a chance to process block
timer::set_timer(DELAY_MS, Some(serde_json::to_vec(&context)?));
match eth_result {
Ok(eth::EthSub { id, result }) => {
if let Ok(eth::SubscriptionResult::Log(ref log)) =
serde_json::from_value::<eth::SubscriptionResult>(result)
{
// Determine which subscription this is from
// Note: log is Box<eth::Log>, we need to dereference it
let log_ref: &eth::Log = &**log;
let context = if id == SUBSCRIPTION_NUMBER {
LogContext::Hypermap(log_ref.clone())
} else if id == BINDINGS_SUBSCRIPTION {
LogContext::Bindings(log_ref.clone())
} else {
return Ok(false); // Unknown subscription
};
// delay handling of ETH RPC subscriptions by DELAY_MS
// to allow hns to have a chance to process block
timer::set_timer(DELAY_MS, Some(serde_json::to_vec(&context)?));
}
}
Err(err) => {
if err.id == SUBSCRIPTION_NUMBER {
let _ = state.hypermap.provider.unsubscribe(SUBSCRIPTION_NUMBER);
state.hypermap.provider.subscribe_loop(
SUBSCRIPTION_NUMBER,
app_store_filter(state),
1,
0,
);
} else if err.id == BINDINGS_SUBSCRIPTION {
let _ = state.bindings.provider.unsubscribe(BINDINGS_SUBSCRIPTION);
state.bindings.provider.subscribe_loop(
BINDINGS_SUBSCRIPTION,
bindings_filter(&state.bindings),
1,
0,
);
}
}
} else {
// unsubscribe to make sure we have cleaned up after ourselves;
// drop Result since we don't care if no subscription exists,
// just being diligent in case it does!
let _ = state.hypermap.provider.unsubscribe(SUBSCRIPTION_NUMBER);
let _ = state.bindings.provider.unsubscribe(BINDINGS_SUBSCRIPTION);
// re-subscribe if error
state.hypermap.provider.subscribe_loop(
SUBSCRIPTION_NUMBER,
app_store_filter(state),
1,
0,
);
state.bindings.provider.subscribe_loop(
BINDINGS_SUBSCRIPTION,
bindings_filter(&state.bindings),
1,
0,
);
}
} else {
let req = serde_json::from_slice::<ChainRequest>(message.body())?;
Expand Down
1 change: 1 addition & 0 deletions hyperdrive/packages/app-store/downloads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type ManualDownloads = HashMap<(PackageId, String), ManualDownloadStatus>;
#[derive(Debug, Serialize, Deserialize)]
pub struct State {
// persisted metadata about which packages we are mirroring
#[serde(default)]
mirroring: HashSet<PackageId>,
// note, pending auto_updates are not persisted.
}
Expand Down
6 changes: 6 additions & 0 deletions hyperdrive/packages/hypermap-cacher/binding-cacher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ const DEFAULT_NODES: &[&str] = &[
#[cfg(feature = "simulation-mode")]
const DEFAULT_NODES: &[&str] = &["fake.os"];

fn default_nodes() -> Vec<String> {
DEFAULT_NODES.iter().map(|s| s.to_string()).collect()
}

// Internal representation of LogsMetadata, similar to WIT but for Rust logic.
#[derive(Serialize, Deserialize, Debug, Clone)]
struct LogsMetadataInternal {
Expand Down Expand Up @@ -105,7 +109,9 @@ struct State {
block_batch_size: u64,
is_cache_timer_live: bool,
drive_path: String,
#[serde(default)]
is_providing: bool,
#[serde(default = "default_nodes")]
nodes: Vec<String>,
#[serde(skip)]
is_starting: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const DEFAULT_NODES: &[&str] = &[
#[cfg(feature = "simulation-mode")]
const DEFAULT_NODES: &[&str] = &["fake.os"];

fn default_nodes() -> Vec<String> {
DEFAULT_NODES.iter().map(|s| s.to_string()).collect()
}

// Internal representation of LogsMetadata, similar to WIT but for Rust logic.
#[derive(Serialize, Deserialize, Debug, Clone)]
struct LogsMetadataInternal {
Expand Down Expand Up @@ -102,7 +106,9 @@ struct State {
block_batch_size: u64,
is_cache_timer_live: bool,
drive_path: String,
#[serde(default)]
is_providing: bool,
#[serde(default = "default_nodes")]
nodes: Vec<String>,
#[serde(skip)]
is_starting: bool,
Expand Down
89 changes: 87 additions & 2 deletions hyperdrive/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ type ActiveSubscriptions = Arc<DashMap<Address, HashMap<u64, ActiveSub>>>;

type ResponseChannels = Arc<DashMap<u64, ProcessMessageSender>>;

type LocalToRemoteSubs = Arc<DashMap<Address, HashMap<u64, u64>>>;

#[derive(Debug)]
enum ActiveSub {
Local((tokio::sync::mpsc::Sender<bool>, JoinHandle<()>)),
Expand Down Expand Up @@ -295,8 +297,12 @@ struct ModuleState {
providers: Providers,
/// the set of active subscriptions we are currently maintaining
active_subscriptions: ActiveSubscriptions,
/// map local subscription ids to remote ids (per requester)
local_to_remote_subs: LocalToRemoteSubs,
/// the set of response channels we have open for outstanding request tasks
response_channels: ResponseChannels,
/// last request time per node for rate limiting
last_request_by_node: Arc<DashMap<String, Instant>>,
/// our sender for kernel event loop
send_to_loop: MessageSender,
/// our sender for terminal prints
Expand Down Expand Up @@ -386,7 +392,9 @@ pub async fn provider(
access_settings,
providers: Arc::new(DashMap::new()),
active_subscriptions: Arc::new(DashMap::new()),
local_to_remote_subs: Arc::new(DashMap::new()),
response_channels: Arc::new(DashMap::new()),
last_request_by_node: Arc::new(DashMap::new()),
send_to_loop,
print_tx,
request_cache: Arc::new(Mutex::new(IndexMap::new())),
Expand Down Expand Up @@ -703,6 +711,21 @@ async fn handle_eth_action(
}
}

if km.source.node != *state.our {
let node_name = km.source.node.clone();
let prev = state
.last_request_by_node
.get(&node_name)
.map(|entry| *entry);
if let Some(prev) = prev {
let elapsed = Instant::now().duration_since(prev);
if elapsed < Duration::from_secs(1) {
tokio::time::sleep(Duration::from_secs(1) - elapsed).await;
}
}
state.last_request_by_node.insert(node_name, Instant::now());
}

verbose_print(
&state.print_tx,
&format!(
Expand Down Expand Up @@ -741,8 +764,30 @@ async fn handle_eth_action(
}
EthAction::UnsubscribeLogs(sub_id) => {
// Remove the subscription from the map first, releasing the guard
let mut resolved_sub_id = sub_id;
let mut had_mapping = false;
let sub = {
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
if let Some(mut map) = state.local_to_remote_subs.get_mut(&km.source) {
map.remove(&sub_id);
let should_remove = map.is_empty();
drop(map);
if should_remove {
state.local_to_remote_subs.remove(&km.source);
}
kernel_message(
&state.our,
km.id,
km.rsvp.unwrap_or(km.source.clone()),
None,
false,
None,
EthResponse::Ok,
&state.send_to_loop,
)
.await;
return Ok(());
}
verbose_print(
&state.print_tx,
&format!(
Expand All @@ -761,12 +806,32 @@ async fn handle_eth_action(
.await;
return Ok(());
};
sub_map.remove(&sub_id)
let mut sub = sub_map.remove(&sub_id);
if sub.is_none() {
if let Some(remote_id) = state
.local_to_remote_subs
.get(&km.source)
.and_then(|map| map.get(&sub_id).copied())
{
had_mapping = true;
resolved_sub_id = remote_id;
sub = sub_map.remove(&remote_id);
}
}
sub
}; // Guard is released here

if let Some(sub) = sub {
if let Some(mut map) = state.local_to_remote_subs.get_mut(&km.source) {
map.remove(&sub_id);
let should_remove = map.is_empty();
drop(map);
if should_remove {
state.local_to_remote_subs.remove(&km.source);
}
}
// Now we can safely call close without holding the guard
sub.close(sub_id, state, false).await;
sub.close(resolved_sub_id, state, false).await;
verbose_print(
&state.print_tx,
&format!("eth: closed subscription {} for {}", sub_id, km.source.node),
Expand All @@ -783,6 +848,26 @@ async fn handle_eth_action(
&state.send_to_loop,
)
.await;
} else if had_mapping {
if let Some(mut map) = state.local_to_remote_subs.get_mut(&km.source) {
map.remove(&sub_id);
let should_remove = map.is_empty();
drop(map);
if should_remove {
state.local_to_remote_subs.remove(&km.source);
}
}
kernel_message(
&state.our,
km.id,
km.rsvp.unwrap_or(km.source.clone()),
None,
false,
None,
EthResponse::Ok,
&state.send_to_loop,
)
.await;
} else {
verbose_print(
&state.print_tx,
Expand Down
16 changes: 16 additions & 0 deletions hyperdrive/src/eth/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub async fn create_new_subscription(
let our = state.our.clone();
let send_to_loop = state.send_to_loop.clone();
let active_subscriptions = state.active_subscriptions.clone();
let local_to_remote_subs = state.local_to_remote_subs.clone();
let providers = state.providers.clone();
let response_channels = state.response_channels.clone();
let print_tx = state.print_tx.clone();
Expand Down Expand Up @@ -71,6 +72,7 @@ pub async fn create_new_subscription(
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
let active_subscriptions = active_subscriptions.clone();
let local_to_remote_subs = local_to_remote_subs.clone();
let (close_sender, close_receiver) = tokio::sync::mpsc::channel(1);
match maybe_raw_sub {
Ok((rx, _chain_id)) => {
Expand Down Expand Up @@ -124,6 +126,10 @@ pub async fn create_new_subscription(
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender);
local_to_remote_subs
.entry(target.clone())
.or_insert(HashMap::new())
.insert(sub_id, remote_sub_id);
subs.insert(
remote_sub_id,
ActiveSub::Remote {
Expand All @@ -140,6 +146,7 @@ pub async fn create_new_subscription(
&target,
&send_to_loop,
&active_subscriptions,
&local_to_remote_subs,
&response_channels,
)
.await;
Expand Down Expand Up @@ -456,6 +463,7 @@ async fn maintain_remote_subscription(
target: &Address,
send_to_loop: &MessageSender,
active_subscriptions: &ActiveSubscriptions,
local_to_remote_subs: &LocalToRemoteSubs,
response_channels: &ResponseChannels,
) -> EthSubError {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
Expand Down Expand Up @@ -547,6 +555,14 @@ async fn maintain_remote_subscription(
.and_modify(|sub_map| {
sub_map.remove(&remote_sub_id);
});
if let Some(mut map) = local_to_remote_subs.get_mut(target) {
map.remove(&sub_id);
let should_remove = map.is_empty();
drop(map);
if should_remove {
local_to_remote_subs.remove(target);
}
}
response_channels.remove(&keepalive_km_id);
e
}
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "lib"
authors = ["Sybil Technologies AG"]
version = "1.9.1"
version = "1.9.2"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://hyperware.ai"
Expand Down