Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 81 additions & 68 deletions crates/cli/src/docker_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,89 +227,102 @@ pub async fn handle_docker_init(config_path: String, output_dir: String) -> Resu
}
};

// setup pbs service
if metrics_enabled {
targets.push(PrometheusTargetConfig {
targets: vec![format!("cb_pbs:{metrics_port}")],
labels: PrometheusLabelsConfig { job: "pbs".to_owned() },
});
let mut pbs_configs = vec![cb_config.pbs];
if let Some(extended_pbses) = cb_config.extended_pbses {
pbs_configs.extend(extended_pbses);
}

let mut pbs_envs = IndexMap::from([get_env_val(CONFIG_ENV, CONFIG_DEFAULT)]);
let mut pbs_volumes = vec![config_volume.clone()];
let mut general_pbs_envs = IndexMap::from([get_env_val(CONFIG_ENV, CONFIG_DEFAULT)]);
let mut general_pbs_volumes = vec![config_volume.clone()];

if let Some(mux_config) = cb_config.muxes {
for mux in mux_config.muxes.iter() {
if let Some((env_name, actual_path, internal_path)) = mux.loader_env() {
let (key, val) = get_env_val(&env_name, &internal_path);
pbs_envs.insert(key, val);
pbs_volumes.push(Volumes::Simple(format!("{}:{}:ro", actual_path, internal_path)));
general_pbs_envs.insert(key, val);
general_pbs_volumes
.push(Volumes::Simple(format!("{}:{}:ro", actual_path, internal_path)));
}
}
}

if let Some((key, val)) = chain_spec_env.clone() {
pbs_envs.insert(key, val);
}
if metrics_enabled {
let (key, val) = get_env_uval(METRICS_PORT_ENV, metrics_port as u64);
pbs_envs.insert(key, val);
}
if log_to_file {
let (key, val) = get_env_val(LOGS_DIR_ENV, LOGS_DIR_DEFAULT);
for (pbs_id, pbs_config) in pbs_configs.iter().enumerate() {
let pbs_container_name =
if pbs_id == 0 { "cb_pbs".to_owned() } else { format!("cb_pbs_ext_{}", pbs_id) };
// setup pbs service
if metrics_enabled {
targets.push(PrometheusTargetConfig {
targets: vec![format!("{}:{metrics_port}", pbs_container_name.clone())],
labels: PrometheusLabelsConfig { job: "pbs".to_owned() },
});
}

let mut pbs_envs = general_pbs_envs.clone();
let mut pbs_volumes = general_pbs_volumes.clone();

if let Some((key, val)) = chain_spec_env.clone() {
pbs_envs.insert(key, val);
}
if metrics_enabled {
let (key, val) = get_env_uval(METRICS_PORT_ENV, metrics_port as u64);
pbs_envs.insert(key, val);
}
if log_to_file {
let (key, val) = get_env_val(LOGS_DIR_ENV, LOGS_DIR_DEFAULT);
pbs_envs.insert(key, val);
}
if !builder_events_modules.is_empty() {
let env = builder_events_modules.join(",");
let (k, v) = get_env_val(BUILDER_URLS_ENV, &env);
pbs_envs.insert(k, v);
}

// ports
let host_endpoint =
SocketAddr::from((pbs_config.pbs_config.host, pbs_config.pbs_config.port));
let ports = Ports::Short(vec![format!("{}:{}", host_endpoint, pbs_config.pbs_config.port)]);
warnings.push(format!("pbs has an exported port on {}", pbs_config.pbs_config.port));

// inside the container expose on 0.0.0.0
let container_endpoint =
SocketAddr::from((Ipv4Addr::UNSPECIFIED, pbs_config.pbs_config.port));
let (key, val) = get_env_val(PBS_ENDPOINT_ENV, &container_endpoint.to_string());
pbs_envs.insert(key, val);
}
if !builder_events_modules.is_empty() {
let env = builder_events_modules.join(",");
let (k, v) = get_env_val(BUILDER_URLS_ENV, &env);
pbs_envs.insert(k, v);
}

// ports
let host_endpoint =
SocketAddr::from((cb_config.pbs.pbs_config.host, cb_config.pbs.pbs_config.port));
let ports = Ports::Short(vec![format!("{}:{}", host_endpoint, cb_config.pbs.pbs_config.port)]);
warnings.push(format!("pbs has an exported port on {}", cb_config.pbs.pbs_config.port));

// inside the container expose on 0.0.0.0
let container_endpoint =
SocketAddr::from((Ipv4Addr::UNSPECIFIED, cb_config.pbs.pbs_config.port));
let (key, val) = get_env_val(PBS_ENDPOINT_ENV, &container_endpoint.to_string());
pbs_envs.insert(key, val);

// volumes
pbs_volumes.extend(chain_spec_volume.clone());
pbs_volumes.extend(get_log_volume(&cb_config.logs, PBS_MODULE_NAME));

// networks
let pbs_networs = if metrics_enabled {
Networks::Simple(vec![METRICS_NETWORK.to_owned()])
} else {
Networks::default()
};
// volumes
pbs_volumes.extend(chain_spec_volume.clone());
pbs_volumes.extend(get_log_volume(&cb_config.logs, PBS_MODULE_NAME));

let pbs_service = Service {
container_name: Some("cb_pbs".to_owned()),
image: Some(cb_config.pbs.docker_image),
ports,
networks: pbs_networs,
volumes: pbs_volumes,
environment: Environment::KvPair(pbs_envs),
healthcheck: Some(Healthcheck {
test: Some(HealthcheckTest::Single(format!(
"curl -f http://localhost:{}{}{}",
cb_config.pbs.pbs_config.port, BUILDER_API_PATH, GET_STATUS_PATH
))),
interval: Some("30s".into()),
timeout: Some("5s".into()),
retries: 3,
start_period: Some("5s".into()),
disable: false,
}),
..Service::default()
};
// networks
let pbs_networs = if metrics_enabled {
Networks::Simple(vec![METRICS_NETWORK.to_owned()])
} else {
Networks::default()
};

services.insert("cb_pbs".to_owned(), Some(pbs_service));
let pbs_service = Service {
container_name: Some(pbs_container_name.clone()),
image: Some(pbs_config.docker_image.clone()),
ports,
networks: pbs_networs,
volumes: pbs_volumes,
environment: Environment::KvPair(pbs_envs),
healthcheck: Some(Healthcheck {
test: Some(HealthcheckTest::Single(format!(
"curl -f http://localhost:{}{}{}",
pbs_config.pbs_config.port, BUILDER_API_PATH, GET_STATUS_PATH
))),
interval: Some("30s".into()),
timeout: Some("5s".into()),
retries: 3,
start_period: Some("5s".into()),
disable: false,
}),
..Service::default()
};

services.insert(pbs_container_name.clone(), Some(pbs_service));
}

// setup signer service
if let Some(SignerConfig::Local { docker_image, loader, store }) = cb_config.signer {
Expand Down
9 changes: 7 additions & 2 deletions crates/common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ pub use utils::*;
pub struct CommitBoostConfig {
pub chain: Chain,
pub relays: Vec<RelayConfig>,
#[serde(skip)]
pub pbs: StaticPbsConfig,
#[serde(flatten)]
pub muxes: Option<PbsMuxes>,
pub modules: Option<Vec<StaticModuleConfig>>,
pub signer: Option<SignerConfig>,
pub metrics: Option<MetricsConfig>,
pub logs: Option<LogsSettings>,
#[serde(rename = "pbs")]
pub extended_pbses: Option<Vec<StaticPbsConfig>>,
}

impl CommitBoostConfig {
Expand Down Expand Up @@ -74,12 +77,13 @@ impl CommitBoostConfig {
let config = CommitBoostConfig {
chain,
relays: helper_config.relays,
pbs: helper_config.pbs,
pbs: Default::default(),
muxes: helper_config.muxes,
modules: helper_config.modules,
signer: helper_config.signer,
metrics: helper_config.metrics,
logs: helper_config.logs,
extended_pbses: helper_config.pbses,
};

Ok(config)
Expand Down Expand Up @@ -111,7 +115,8 @@ struct ChainConfig {
struct HelperConfig {
chain: ChainLoader,
relays: Vec<RelayConfig>,
pbs: StaticPbsConfig,
#[serde(rename = "pbs")]
pbses: Option<Vec<StaticPbsConfig>>,
#[serde(flatten)]
muxes: Option<PbsMuxes>,
modules: Option<Vec<StaticModuleConfig>>,
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/config/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
types::{Chain, Jwt, ModuleId},
};

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug,Clone, Deserialize, Serialize)]
pub enum ModuleKind {
#[serde(alias = "commit")]
Commit,
Expand All @@ -24,7 +24,7 @@ pub enum ModuleKind {
}

/// Static module config from config file
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug,Clone, Deserialize, Serialize)]
pub struct StaticModuleConfig {
/// Unique id of the module
pub id: ModuleId,
Expand Down
57 changes: 43 additions & 14 deletions crates/common/src/config/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,25 @@ impl PbsMuxes {
self,
chain: Chain,
default_pbs: &PbsConfig,
extended_pbses: &Option<Vec<PbsConfig>>,
) -> eyre::Result<HashMap<BlsPublicKey, RuntimeMuxConfig>> {
let mut muxes = self.muxes;

for mux in muxes.iter_mut() {
ensure!(!mux.relays.is_empty(), "mux config {} must have at least one relay", mux.id);

if let Some(loader) = &mux.loader {
let extra_keys = loader.load(&mux.id, chain, default_pbs.rpc_url.clone()).await?;
mux.validator_pubkeys.extend(extra_keys);
// TODO: this need to be handled with tokio::task::JoinSet for performance
if let Some(extended_pbses) = extended_pbses {
for pbs in extended_pbses {
let extra_keys = loader.load(&mux.id, chain, pbs.rpc_url.clone()).await?;
mux.validator_pubkeys.extend(extra_keys);
}
} else {
let extra_keys =
loader.load(&mux.id, chain, default_pbs.rpc_url.clone()).await?;
mux.validator_pubkeys.extend(extra_keys);
}
}

ensure!(
Expand Down Expand Up @@ -80,20 +90,39 @@ impl PbsMuxes {
relay_clients.push(RelayClient::new(config)?);
}

let config = PbsConfig {
timeout_get_header_ms: mux
.timeout_get_header_ms
.unwrap_or(default_pbs.timeout_get_header_ms),
late_in_slot_time_ms: mux
.late_in_slot_time_ms
.unwrap_or(default_pbs.late_in_slot_time_ms),
..default_pbs.clone()
let configs_with_extended_pbs = if let Some(extended_pbses) = extended_pbses {
extended_pbses
.iter()
.map(|pbs_config| PbsConfig {
timeout_get_header_ms: mux
.timeout_get_header_ms
.unwrap_or(pbs_config.timeout_get_header_ms),
late_in_slot_time_ms: mux
.late_in_slot_time_ms
.unwrap_or(pbs_config.late_in_slot_time_ms),
..default_pbs.clone()
})
.collect()
} else {
vec![PbsConfig {
timeout_get_header_ms: mux
.timeout_get_header_ms
.unwrap_or(default_pbs.timeout_get_header_ms),
late_in_slot_time_ms: mux
.late_in_slot_time_ms
.unwrap_or(default_pbs.late_in_slot_time_ms),
..default_pbs.clone()
}]
};
let config = Arc::new(config);

let runtime_config = RuntimeMuxConfig { id: mux.id, config, relays: relay_clients };
for pubkey in mux.validator_pubkeys.iter() {
configs.insert(*pubkey, runtime_config.clone());
for config in configs_with_extended_pbs {
let config = Arc::new(config);

let runtime_config =
RuntimeMuxConfig { id: mux.id.clone(), config, relays: relay_clients.clone() };
for pubkey in mux.validator_pubkeys.iter() {
configs.insert(*pubkey, runtime_config.clone());
}
}
}

Expand Down
Loading