diff --git a/bandwidth-comparison-topologies.ipynb b/bandwidth-comparison-topologies.ipynb new file mode 100644 index 0000000..e1c57ed --- /dev/null +++ b/bandwidth-comparison-topologies.ipynb @@ -0,0 +1,668 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1e953690", + "metadata": {}, + "source": [ + "# Bandwidth Comparison Across Topologies\n", + "\n", + "Compare gossipsub, WFR-enhanced gossipsub, and grid topologies across bandwidth caps. Each combination runs across multiple seeds (averages recorded) and feeds into summary plots." + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "id": "fc0d5e1e", + "metadata": {}, + "outputs": [], + "source": [ + "# Ensure the stdlib random module is available despite the local random.py helper.\n", + "import importlib.machinery\n", + "import importlib.util\n", + "import os\n", + "import subprocess\n", + "import sys\n", + "\n", + "def ensure_stdlib_random() -> None:\n", + " stdlib_random_path = os.path.join(os.path.dirname(os.__file__), \"random.py\")\n", + " loader = importlib.machinery.SourceFileLoader(\"random\", stdlib_random_path)\n", + " spec = importlib.util.spec_from_loader(\"random\", loader)\n", + " module = importlib.util.module_from_spec(spec)\n", + " loader.exec_module(module)\n", + " sys.modules[\"random\"] = module\n", + "\n", + "ensure_stdlib_random()\n", + "\n", + "try:\n", + " import pandas as pd\n", + " import seaborn as sns\n", + "except ModuleNotFoundError:\n", + " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"pandas\", \"seaborn\"])\n", + " import pandas as pd\n", + " import seaborn as sns\n", + "\n", + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "import beamsim\n", + "from concurrent.futures import ThreadPoolExecutor, as_completed\n", + "\n", + "sns.set_theme(style=\"whitegrid\")\n", + "plt.rcParams[\"figure.figsize\"] = (12, 6)\n", + "plt.rcParams[\"axes.titlesize\"] = 13" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "0157592a", + "metadata": {}, + "outputs": [], + "source": [ + "DEFAULT_CONFIG = {\n", + " \"backend\": \"ns3-direct\",\n", + " \"snark1_pull\": True,\n", + " \"snark1_half_direct\": True,\n", + " \"signature_half_direct\": 0,\n", + " \"shuffle\": False,\n", + " \"random_seed\": 42,\n", + " \"group_count\": 8,\n", + " \"group_validator_count\": 1024,\n", + " \"group_local_aggregator_count\": \"10%\",\n", + " \"global_aggregator_count\": 102,\n", + " \"mesh_n\": 8,\n", + " \"non_mesh_n\": 4,\n", + " \"idontwant\": False,\n", + " \"signature_time\": \"20ms\",\n", + " \"signature_size\": 3072,\n", + " \"snark_size\": 131072,\n", + " \"snark1_threshold\": 0.9,\n", + " \"snark2_threshold\": 0.66,\n", + " \"aggregation_rate_per_sec\": 1000,\n", + " \"snark_recursion_aggregation_rate_per_sec\": 10,\n", + " \"pq_signature_verification_time\": \"30us\",\n", + " \"snark_proof_verification_time\": \"5ms\",\n", + " \"gml\": \"shadow-atlas.bin\",\n", + " \"max_bitrate\": \"50Mbps\",\n", + "}\n", + "\n", + "BANDWIDTH_CAPS = [\"25Mbps\", \"50Mbps\", \"100Mbps\", \"200Mbps\"]\n", + "\n", + "# Adjust MAX_WORKERS to tune concurrency across topology/bandwidth combinations.\n", + "MAX_WORKERS = 3\n", + "SEED_BASE = 42\n", + "NUM_SEEDS = 1\n", + "SEED_VALUES = [SEED_BASE + i for i in range(NUM_SEEDS)]\n", + "\n", + "TOPOLOGY_VARIANTS = [\n", + " {\n", + " \"name\": \"gossipsub\",\n", + " \"label\": \"gossipsub\",\n", + " \"topology\": \"gossip\",\n", + " \"config_overrides\": {},\n", + " \"wfr_overrides\": None,\n", + " },\n", + " {\n", + " \"name\": \"wfr-gossipsub\",\n", + " \"label\": \"wfr-gossipsub\",\n", + " \"topology\": \"gossip\",\n", + " \"config_overrides\": {},\n", + " \"wfr_overrides\": {\n", + " \"robust\": 4,\n", + " },\n", + " },\n", + " {\n", + " \"name\": \"grid\",\n", + " \"label\": \"grid\",\n", + " \"topology\": \"grid\",\n", + " \"config_overrides\": {},\n", + " \"wfr_overrides\": None,\n", + " },\n", + "]\n", + "\n", + "def bool_to_yaml(value: bool) -> str:\n", + " if not isinstance(value, bool):\n", + " raise TypeError(f\"Expected a bool, got {type(value)!r}\")\n", + " return str(value).lower()\n", + "\n", + "def generate_yaml_config(*, topology: str = \"gossip\", wfr: dict | None = None, **overrides) -> str:\n", + " config = {**DEFAULT_CONFIG, **overrides}\n", + " config[\"topology\"] = topology\n", + " lines = [\n", + " \"# Simulation Backend Configuration\",\n", + " f\"backend: {config['backend']}\",\n", + " \"\",\n", + " f\"snark1_pull: {bool_to_yaml(config['snark1_pull'])}\",\n", + " f\"snark1_half_direct: {bool_to_yaml(config['snark1_half_direct'])}\",\n", + " f\"signature_half_direct: {config['signature_half_direct']}\",\n", + " \"\",\n", + " \"# Network Topology Configuration\",\n", + " f\"topology: {config['topology']}\",\n", + " \"\",\n", + " \"# Whether to shuffle validators from the same group to different routers\",\n", + " f\"shuffle: {bool_to_yaml(config['shuffle'])}\",\n", + " \"\",\n", + " \"# Seed for reproducible simulation results\",\n", + " f\"random_seed: {config['random_seed']}\",\n", + " \"\",\n", + " \"# Role Assignment Configuration\",\n", + " \"roles:\",\n", + " f\" group_count: {config['group_count']}\",\n", + " f\" group_validator_count: {config['group_validator_count']}\",\n", + " f\" group_local_aggregator_count: {config['group_local_aggregator_count']}\",\n", + " f\" global_aggregator_count: {config['global_aggregator_count']}\",\n", + " \"\",\n", + " \"# Gossipsub Network Configuration\",\n", + " \"gossip:\",\n", + " f\" mesh_n: {config['mesh_n']}\",\n", + " f\" non_mesh_n: {config['non_mesh_n']}\",\n", + " f\" idontwant: {bool_to_yaml(config['idontwant'])}\",\n", + " ]\n", + " if topology == \"gossip\" and wfr:\n", + " lines.append(\" wfr:\")\n", + " if \"robust\" in wfr:\n", + " lines.append(f\" robust: {wfr['robust']}\")\n", + " lines.extend(\n", + " [\n", + " \"\",\n", + " \"# Cryptographic Constants\",\n", + " \"consts:\",\n", + " f\" signature_time: {config['signature_time']}\",\n", + " f\" signature_size: {config['signature_size']}\",\n", + " f\" snark_size: {config['snark_size']}\",\n", + " f\" snark1_threshold: {config['snark1_threshold']}\",\n", + " f\" snark2_threshold: {config['snark2_threshold']}\",\n", + " f\" aggregation_rate_per_sec: {config['aggregation_rate_per_sec']}\",\n", + " f\" snark_recursion_aggregation_rate_per_sec: {config['snark_recursion_aggregation_rate_per_sec']}\",\n", + " f\" pq_signature_verification_time: {config['pq_signature_verification_time']}\",\n", + " f\" snark_proof_verification_time: {config['snark_proof_verification_time']}\",\n", + " \"\",\n", + " \"# Network Simulation Parameters\",\n", + " \"network:\",\n", + " f\" gml: \\\"{config['gml']}\\\"\",\n", + " ]\n", + " )\n", + " if config[\"max_bitrate\"] is not None:\n", + " lines.append(f\" max_bitrate: {config['max_bitrate']}\")\n", + " return beamsim.yaml(\"\\n\".join(lines))\n", + "\n", + "def extract_metrics(items):\n", + " snark1_rows = beamsim.get_snark1_received(items)\n", + " snark1_completion = None\n", + " if snark1_rows:\n", + " snark1_completion = snark1_rows[-1][0]\n", + " else:\n", + " snark1_sent_xs, _ = beamsim.get_snark1_sent(items)\n", + " if snark1_sent_xs and len(snark1_sent_xs) > 1:\n", + " snark1_completion = snark1_sent_xs[-1]\n", + " duplicates = None\n", + " duplicates_avg = None\n", + " try:\n", + " duplicates, duplicates_avg = beamsim.get_signature_duplicates(items)\n", + " except Exception:\n", + " pass\n", + " validator_count = None\n", + " snark1_threshold = None\n", + " try:\n", + " info_row = beamsim.filter_report(items, \"info\")[0]\n", + " validator_count = info_row[2]\n", + " snark1_threshold = info_row[3]\n", + " except Exception:\n", + " pass\n", + " return snark1_completion, duplicates, duplicates_avg, validator_count, snark1_threshold\n", + "\n", + "def run_single_configuration(topo_cfg: dict, bandwidth: str, seeds: list[int]) -> dict:\n", + " snark1_times = []\n", + " duplicate_counts = []\n", + " duplicate_avgs = []\n", + " validator_count = None\n", + " snark1_threshold = None\n", + " errors = []\n", + " for seed in seeds:\n", + " overrides = {\n", + " **topo_cfg.get(\"config_overrides\", {}),\n", + " \"max_bitrate\": bandwidth,\n", + " \"random_seed\": seed,\n", + " }\n", + " try:\n", + " yaml_path = generate_yaml_config(\n", + " topology=topo_cfg[\"topology\"],\n", + " wfr=topo_cfg.get(\"wfr_overrides\"),\n", + " **overrides,\n", + " )\n", + " items = beamsim.run(\n", + " b=DEFAULT_CONFIG[\"backend\"],\n", + " t=topo_cfg[\"topology\"],\n", + " c=yaml_path,\n", + " local_aggregation_only=True,\n", + " )\n", + " snark1_completion, dup_count, dup_avg, validator_count, snark1_threshold = extract_metrics(items)\n", + " if snark1_completion is not None:\n", + " snark1_times.append(snark1_completion)\n", + " if dup_count is not None:\n", + " duplicate_counts.append(dup_count)\n", + " if dup_avg is not None:\n", + " duplicate_avgs.append(dup_avg)\n", + " except Exception as exc:\n", + " errors.append(str(exc))\n", + " num_successes = len(snark1_times)\n", + " result = {\n", + " \"topology\": topo_cfg[\"label\"],\n", + " \"topology_key\": topo_cfg[\"name\"],\n", + " \"bandwidth\": bandwidth,\n", + " \"snark1_completion_ms\": float(np.mean(snark1_times)) if snark1_times else np.nan,\n", + " \"snark1_completion_std_ms\": float(np.std(snark1_times, ddof=1)) if len(snark1_times) > 1 else (0.0 if snark1_times else np.nan),\n", + " \"duplicate_count\": float(np.mean(duplicate_counts)) if duplicate_counts else np.nan,\n", + " \"duplicate_avg\": float(np.mean(duplicate_avgs)) if duplicate_avgs else np.nan,\n", + " \"num_seeds\": num_successes,\n", + " \"total_seeds\": len(seeds),\n", + " \"validator_count\": validator_count,\n", + " \"snark1_threshold\": snark1_threshold,\n", + " \"error\": \"; \".join(errors) if errors else None,\n", + " }\n", + " return result" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "520576a3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "run: build/beamsim -c /var/folders/r9/53ggp_3x6w51pqj8_kv1_lb00000gn/T/beamsim-yaml-md5/adc588fe00488365fb54d07cc6749225 -b ns3-direct -t gossip --local-aggregation-only --report\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
topologytopology_keybandwidthsnark1_completion_mssnark1_completion_std_msduplicate_countduplicate_avgnum_seedstotal_seedsvalidator_countsnark1_thresholderror
0gossipsubgossipsub25Mbps4605.00.01027.01.116304111024921None
1gossipsubgossipsub50Mbps2980.00.01168.01.269565111024921None
2gossipsubgossipsub100Mbps2404.00.0688.00.747826111024921None
3gossipsubgossipsub200Mbps2306.00.01366.01.483170111024921None
4wfr-gossipsubwfr-gossipsub25Mbps4198.00.01061.01.139635111024921None
5wfr-gossipsubwfr-gossipsub50Mbps2660.00.0784.00.850325111024921None
6wfr-gossipsubwfr-gossipsub100Mbps2345.00.0971.01.052004111024921None
7wfr-gossipsubwfr-gossipsub200Mbps2177.00.0893.00.970652111024921None
8gridgrid25Mbps2838.00.0451.00.490217111024921None
9gridgrid50Mbps2313.00.0437.00.475000111024921None
10gridgrid100Mbps1830.00.0415.00.451087111024921None
11gridgrid200Mbps1394.00.0501.00.544565111024921None
\n", + "
" + ], + "text/plain": [ + " topology topology_key bandwidth snark1_completion_ms \\\n", + "0 gossipsub gossipsub 25Mbps 4605.0 \n", + "1 gossipsub gossipsub 50Mbps 2980.0 \n", + "2 gossipsub gossipsub 100Mbps 2404.0 \n", + "3 gossipsub gossipsub 200Mbps 2306.0 \n", + "4 wfr-gossipsub wfr-gossipsub 25Mbps 4198.0 \n", + "5 wfr-gossipsub wfr-gossipsub 50Mbps 2660.0 \n", + "6 wfr-gossipsub wfr-gossipsub 100Mbps 2345.0 \n", + "7 wfr-gossipsub wfr-gossipsub 200Mbps 2177.0 \n", + "8 grid grid 25Mbps 2838.0 \n", + "9 grid grid 50Mbps 2313.0 \n", + "10 grid grid 100Mbps 1830.0 \n", + "11 grid grid 200Mbps 1394.0 \n", + "\n", + " snark1_completion_std_ms duplicate_count duplicate_avg num_seeds \\\n", + "0 0.0 1027.0 1.116304 1 \n", + "1 0.0 1168.0 1.269565 1 \n", + "2 0.0 688.0 0.747826 1 \n", + "3 0.0 1366.0 1.483170 1 \n", + "4 0.0 1061.0 1.139635 1 \n", + "5 0.0 784.0 0.850325 1 \n", + "6 0.0 971.0 1.052004 1 \n", + "7 0.0 893.0 0.970652 1 \n", + "8 0.0 451.0 0.490217 1 \n", + "9 0.0 437.0 0.475000 1 \n", + "10 0.0 415.0 0.451087 1 \n", + "11 0.0 501.0 0.544565 1 \n", + "\n", + " total_seeds validator_count snark1_threshold error \n", + "0 1 1024 921 None \n", + "1 1 1024 921 None \n", + "2 1 1024 921 None \n", + "3 1 1024 921 None \n", + "4 1 1024 921 None \n", + "5 1 1024 921 None \n", + "6 1 1024 921 None \n", + "7 1 1024 921 None \n", + "8 1 1024 921 None \n", + "9 1 1024 921 None \n", + "10 1 1024 921 None \n", + "11 1 1024 921 None " + ] + }, + "execution_count": 40, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "future_to_meta = {}\n", + "results = []\n", + "with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:\n", + " for topo_cfg in TOPOLOGY_VARIANTS:\n", + " for bandwidth in BANDWIDTH_CAPS:\n", + " future = executor.submit(run_single_configuration, topo_cfg, bandwidth, SEED_VALUES)\n", + " future_to_meta[future] = (topo_cfg, bandwidth)\n", + " for future in as_completed(future_to_meta):\n", + " topo_cfg, bandwidth = future_to_meta[future]\n", + " try:\n", + " result = future.result()\n", + " except Exception as exc:\n", + " result = {\n", + " \"topology\": topo_cfg[\"label\"],\n", + " \"topology_key\": topo_cfg[\"name\"],\n", + " \"bandwidth\": bandwidth,\n", + " \"snark1_completion_ms\": np.nan,\n", + " \"snark1_completion_std_ms\": np.nan,\n", + " \"duplicate_count\": np.nan,\n", + " \"duplicate_avg\": np.nan,\n", + " \"num_seeds\": 0,\n", + " \"total_seeds\": len(SEED_VALUES),\n", + " \"validator_count\": None,\n", + " \"snark1_threshold\": None,\n", + " \"error\": str(exc),\n", + " }\n", + " results.append(result)\n", + "\n", + "combined_results_df = pd.DataFrame(results)\n", + "combined_results_df[\"bandwidth\"] = pd.Categorical(combined_results_df[\"bandwidth\"], categories=BANDWIDTH_CAPS, ordered=True)\n", + "combined_results_df[\"topology\"] = pd.Categorical(combined_results_df[\"topology\"], categories=[cfg[\"label\"] for cfg in TOPOLOGY_VARIANTS], ordered=True)\n", + "\n", + "numeric_columns = [\n", + " \"snark1_completion_ms\",\n", + " \"snark1_completion_std_ms\",\n", + " \"duplicate_count\",\n", + " \"duplicate_avg\",\n", + "]\n", + "for column in numeric_columns:\n", + " if column in combined_results_df:\n", + " combined_results_df[column] = pd.to_numeric(combined_results_df[column], errors=\"coerce\")\n", + "\n", + "combined_results_df.sort_values([\"topology\", \"bandwidth\"], inplace=True)\n", + "combined_results_df.reset_index(drop=True, inplace=True)\n", + "combined_results_df" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "aee64e13", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "plot_df = combined_results_df[combined_results_df[\"error\"].isna()].copy()\n", + "if not plot_df.empty:\n", + " fig, axes = plt.subplots(1, 2, figsize=(14, 6), sharex=True)\n", + " sns.lineplot(\n", + " data=plot_df,\n", + " x=\"bandwidth\",\n", + " y=\"snark1_completion_ms\",\n", + " hue=\"topology\",\n", + " marker=\"o\",\n", + " ax=axes[0],\n", + " )\n", + " axes[0].set_title(\"SNARK1 completion time by bandwidth\")\n", + " axes[0].set_ylabel(\"Completion time (ms)\")\n", + " axes[0].set_xlabel(\"Bandwidth cap\")\n", + " sns.lineplot(\n", + " data=plot_df,\n", + " x=\"bandwidth\",\n", + " y=\"duplicate_avg\",\n", + " hue=\"topology\",\n", + " marker=\"o\",\n", + " ax=axes[1],\n", + " )\n", + " axes[1].set_title(\"Average duplicate signatures\")\n", + " axes[1].set_ylabel(\"Duplicates per signature\")\n", + " axes[1].set_xlabel(\"Bandwidth cap\")\n", + " axes[0].legend(title=\"Topology\", loc=\"upper left\")\n", + " axes[1].legend().remove()\n", + " plt.tight_layout()\n", + "else:\n", + " print(\"No successful runs to visualize.\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "beamsim", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/cli.hpp b/cli.hpp index b9f1665..5cb46bc 100644 --- a/cli.hpp +++ b/cli.hpp @@ -709,6 +709,9 @@ struct SimulationConfig { yaml.at({"gossip", "mesh_n"}).get(gossip_config.mesh_n); yaml.at({"gossip", "non_mesh_n"}).get(gossip_config.non_mesh_n); yaml.at({"gossip", "idontwant"}).get(gossip_config.idontwant); + if (auto yaml_wfr_robust = yaml.at({"gossip", "wfr", "robust"})) { + yaml_wfr_robust.get(gossip_config.wfr_robust.emplace()); + } auto &consts = beamsim::consts(); yaml.at({"consts", "signature_time"}).get(consts.signature_time); diff --git a/main.cpp b/main.cpp index 498371a..3ccfbc4 100644 --- a/main.cpp +++ b/main.cpp @@ -314,7 +314,8 @@ namespace beamsim::example { auto source_group = getGroupFromPeerIndices(ihave->peer_indices); // If the group has already contributed, ignore this ihave - if (snark1_received_groups_.get(source_group) and snark1_received_ihave_groups_.get(source_group)) { + if (snark1_received_groups_.get(source_group) + and snark1_received_ihave_groups_.get(source_group)) { report(simulator_, "snark1_ihave_ignored_duplicate_group", source_group); @@ -425,7 +426,8 @@ namespace beamsim::example { consts().snark_proof_verification_time, [this, message, forward] { // Smart push at global aggregators: forward and process only once per group - auto source_group = getGroupFromPeerIndices(message.peer_indices); + auto source_group = + getGroupFromPeerIndices(message.peer_indices); if (snark1_pushed_groups_.get(source_group)) { report(simulator_, "snark1_smart_push_ignored_duplicate_group", @@ -884,6 +886,12 @@ void run_simulation(const SimulationConfig &config) { .stop_on_create_snark1 = config.local_aggregation_only, }; + shared_state.gossip_config.wfr_latency = [&](beamsim::PeerIndex peer1, + beamsim::PeerIndex peer2) { + return std::chrono::milliseconds{ + routers.directWire(peer1, peer2).delay_ms}; + }; + beamsim::example::report(simulator, "info", roles.validator_count, diff --git a/src/beamsim/gossip/config.hpp b/src/beamsim/gossip/config.hpp index 1b924e8..e63798a 100644 --- a/src/beamsim/gossip/config.hpp +++ b/src/beamsim/gossip/config.hpp @@ -1,11 +1,17 @@ #pragma once #include +#include +#include namespace beamsim::gossip { struct Config { PeerIndex mesh_n = 4; PeerIndex non_mesh_n = 4; bool idontwant = false; + + std::optional wfr_robust; + std::function wfr_latency; + Time wfr_latency_threshold = std::chrono::milliseconds{1}; }; } // namespace beamsim::gossip diff --git a/src/beamsim/gossip/peer.hpp b/src/beamsim/gossip/peer.hpp index 60d036c..fd2081a 100644 --- a/src/beamsim/gossip/peer.hpp +++ b/src/beamsim/gossip/peer.hpp @@ -84,8 +84,8 @@ namespace beamsim::gossip { } dontwant_.emplace(from_peer, message_hash); dontwant_.emplace(publish.origin, message_hash); - on_gossip(publish.message, [this, publish, message_hash] { - _gossip(publish, message_hash); + on_gossip(publish.message, [this, from_peer, publish, message_hash] { + _gossip(publish, message_hash, from_peer); }); } for (auto &message_hash : gossip_message.iwant) { @@ -117,17 +117,43 @@ namespace beamsim::gossip { if (not duplicate_cache_.emplace(message_hash).second) { return; } - _gossip({topic_index, peer_.peer_index_, any_message}, message_hash); + _gossip({topic_index, peer_.peer_index_, any_message}, + message_hash, + std::nullopt); } private: - void _gossip(const Publish &publish, MessageHash message_hash) { + void _gossip(const Publish &publish, + MessageHash message_hash, + std::optional from_peer) { mcache_.emplace(message_hash, publish); history_[publish.topic_index].add(message_hash); + std::vector peers; for (auto &to_peer : views_.at(publish.topic_index).publishTo()) { + if (to_peer == from_peer) { + continue; + } if (dontwant_.contains({to_peer, message_hash})) { continue; } + peers.emplace_back(to_peer); + } + if (from_peer.has_value() and config_.wfr_robust.has_value() + and config_.wfr_latency != nullptr + and *config_.wfr_robust < peers.size()) { + random_.shuffle(peers); + peers.erase(std::remove_if(peers.begin() + *config_.wfr_robust, + peers.end(), + [&](PeerIndex to_peer) { + return config_.wfr_latency( + peer_.peer_index_, to_peer) + > config_.wfr_latency( + *from_peer, peer_.peer_index_) + + config_.wfr_latency_threshold; + }), + peers.end()); + } + for (auto &to_peer : peers) { getBatch(to_peer).publish.emplace_back(publish); } }