diff --git a/README.md b/README.md index 0b736d7..ef37e2e 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ just dev | [Blob Inclusion](notebooks/01-blob-inclusion.ipynb) | Blob inclusion patterns per block and epoch | | [Blob Flow](notebooks/02-blob-flow.ipynb) | Blob flow across validators, builders, and relays | | [Column Propagation](notebooks/03-column-propagation.ipynb) | Column propagation timing across 128 data columns | +| [Client Versions](notebooks/04-client-versions.ipynb) | Consensus client version distribution | ## Architecture @@ -41,7 +42,8 @@ pipeline.yaml # Central config: dates, queries, notebooks queries/ # ClickHouse query modules -> Parquet ├── blob_inclusion.py # fetch_blobs_per_slot(), fetch_blocks_blob_epoch(), ... ├── blob_flow.py # fetch_proposer_blobs() -└── column_propagation.py # fetch_col_first_seen() +├── column_propagation.py # fetch_col_first_seen() +└── client_versions.py # fetch_client_versions() scripts/ ├── pipeline.py # Coordinator: config loading, hash computation, staleness ├── fetch_data.py # CLI: ClickHouse -> notebooks/data/*.parquet diff --git a/notebooks/04-client-versions.ipynb b/notebooks/04-client-versions.ipynb new file mode 100644 index 0000000..a2c4165 --- /dev/null +++ b/notebooks/04-client-versions.ipynb @@ -0,0 +1,232 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": "Analysis of consensus client versions connected to Xatu nodes on Ethereum mainnet." + }, + { + "cell_type": "code", + "metadata": { + "tags": [ + "parameters" + ] + }, + "source": [ + "target_date = None # Set via papermill, or auto-detect from manifest" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "import pandas as pd\n", + "import plotly.express as px\n", + "import plotly.graph_objects as go\n", + "\n", + "from loaders import load_parquet" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "# Load pre-aggregated data\n", + "df_hourly = load_parquet(\"client_hourly\", target_date)\n", + "df_versions = load_parquet(\"client_version_dist\", target_date)\n", + "df_summary = load_parquet(\"client_summary\", target_date)\n", + "\n", + "# Filter out unknown clients for visualizations\n", + "df_hourly_known = df_hourly[df_hourly[\"client\"] != \"unknown\"]\n", + "df_versions_known = df_versions[df_versions[\"client\"] != \"unknown\"]\n", + "df_summary_known = df_summary[df_summary[\"client\"] != \"unknown\"]\n", + "\n", + "# Compute totals from hourly data\n", + "total_connections = df_hourly[\"connections\"].sum()\n", + "impl_counts = df_hourly_known.groupby(\"client\")[\"connections\"].sum().reset_index()\n", + "impl_counts = impl_counts.sort_values(\"connections\", ascending=False)\n", + "\n", + "print(f\"Total connections: {total_connections:,}\")\n", + "print(f\"Unique clients: {df_summary['client'].nunique()}\")\n", + "print(f\"Total unique peers: {df_summary['unique_peers'].sum():,}\")" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Client Implementation Distribution\n", + "\n", + "Distribution of consensus client implementations observed across all connections. This shows the diversity of the Ethereum validator client ecosystem." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "fig = px.pie(\n", + " impl_counts,\n", + " values=\"connections\",\n", + " names=\"client\",\n", + " title=\"Client Implementation Distribution\",\n", + " color_discrete_sequence=px.colors.qualitative.Set2,\n", + ")\n", + "fig.update_traces(textposition=\"inside\", textinfo=\"percent+label\")\n", + "fig.update_layout(height=500)\n", + "fig.show()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Connections by Client Implementation\n", + "\n", + "Bar chart showing the number of connections per client implementation." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "fig = px.bar(\n", + " impl_counts,\n", + " x=\"client\",\n", + " y=\"connections\",\n", + " title=\"Connections by Client Implementation\",\n", + " labels={\"client\": \"Client\", \"connections\": \"Connections\"},\n", + " color=\"client\",\n", + " color_discrete_sequence=px.colors.qualitative.Set2,\n", + ")\n", + "fig.update_layout(\n", + " showlegend=False,\n", + " height=500,\n", + " xaxis_tickangle=-45,\n", + ")\n", + "fig.show()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Client Connections Over Time\n", + "\n", + "Stacked area chart showing how client connections are distributed across implementations over time." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "# Pivot hourly data for stacked area chart\n", + "hourly_pivot = df_hourly_known.pivot(index=\"hour\", columns=\"client\", values=\"connections\").fillna(0)\n", + "\n", + "fig = go.Figure()\n", + "for col in hourly_pivot.columns:\n", + " fig.add_trace(go.Scatter(\n", + " x=hourly_pivot.index,\n", + " y=hourly_pivot[col],\n", + " mode=\"lines\",\n", + " stackgroup=\"one\",\n", + " name=col,\n", + " ))\n", + "\n", + "fig.update_layout(\n", + " title=\"Client Implementation Connections Over Time\",\n", + " xaxis_title=\"Time\",\n", + " yaxis_title=\"Connections\",\n", + " height=500,\n", + " legend=dict(orientation=\"h\", yanchor=\"bottom\", y=1.02, xanchor=\"right\", x=1),\n", + ")\n", + "fig.show()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Version Distribution by Client\n", + "\n", + "Detailed breakdown of version distribution for each major client implementation (top 15 versions per client)." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "# Get clients sorted by total connections (exclude \"Others\" and \"unknown\")\n", + "top_clients = impl_counts[~impl_counts[\"client\"].isin([\"Others\", \"unknown\"])][\"client\"].tolist()\n", + "\n", + "for i, client in enumerate(top_clients):\n", + " df_client = df_versions_known[df_versions_known[\"client\"] == client]\n", + " \n", + " if len(df_client) == 0:\n", + " continue\n", + " \n", + " fig = px.bar(\n", + " df_client,\n", + " x=\"version\",\n", + " y=\"connections\",\n", + " title=f\"{client.capitalize()} Version Distribution\",\n", + " labels={\"version\": \"Version\", \"connections\": \"Connections\"},\n", + " color_discrete_sequence=[px.colors.qualitative.Set2[i % len(px.colors.qualitative.Set2)]],\n", + " )\n", + " fig.update_layout(\n", + " height=400,\n", + " xaxis_tickangle=-45,\n", + " )\n", + " fig.show()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary Statistics" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "# Format summary table\n", + "summary_display = df_summary_known[[\"client\", \"connections\", \"unique_peers\", \"version_count\", \"top_version\"]].copy()\n", + "summary_display.columns = [\"Client\", \"Connections\", \"Unique Peers\", \"Versions\", \"Top Version\"]\n", + "summary_display = summary_display.sort_values(\"Connections\", ascending=False)\n", + "summary_display[\"Connections\"] = summary_display[\"Connections\"].apply(lambda x: f\"{x:,}\")\n", + "summary_display[\"Unique Peers\"] = summary_display[\"Unique Peers\"].apply(lambda x: f\"{x:,}\")\n", + "summary_display" + ], + "execution_count": null, + "outputs": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.12" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/pipeline.yaml b/pipeline.yaml index 4d82db3..d2873eb 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -70,6 +70,24 @@ queries: description: Column first seen timing across 128 subnets output_file: col_first_seen.parquet + client_hourly: + module: queries.client_versions + function: fetch_client_hourly + description: Hourly connection counts by client implementation + output_file: client_hourly.parquet + + client_version_dist: + module: queries.client_versions + function: fetch_client_version_dist + description: Version distribution per client (top 15) + output_file: client_version_dist.parquet + + client_summary: + module: queries.client_versions + function: fetch_client_summary + description: Summary statistics per client with unique peers + output_file: client_summary.parquet + # ============================================ # Notebook Registry # ============================================ @@ -119,6 +137,22 @@ notebooks: required: true order: 3 + - id: client-versions + title: Client Versions + description: Consensus client version distribution across Ethereum mainnet + icon: Users + source: notebooks/04-client-versions.ipynb + schedule: daily + queries: + - client_hourly + - client_version_dist + - client_summary + parameters: + - name: target_date + type: date + required: true + order: 4 + # Schedule options: hourly, daily, weekly, manual # - hourly: Runs every hour, accumulating data throughout the day # - daily: Runs once per day at 1am UTC diff --git a/queries/__init__.py b/queries/__init__.py index a767c6d..7a8a5ca 100644 --- a/queries/__init__.py +++ b/queries/__init__.py @@ -12,6 +12,11 @@ ) from queries.blob_flow import fetch_proposer_blobs from queries.column_propagation import fetch_col_first_seen, NUM_COLUMNS +from queries.client_versions import ( + fetch_client_hourly, + fetch_client_version_dist, + fetch_client_summary, +) __all__ = [ # Blob inclusion @@ -24,4 +29,8 @@ # Column propagation "fetch_col_first_seen", "NUM_COLUMNS", + # Client versions + "fetch_client_hourly", + "fetch_client_version_dist", + "fetch_client_summary", ] diff --git a/queries/client_versions.py b/queries/client_versions.py new file mode 100644 index 0000000..78511a3 --- /dev/null +++ b/queries/client_versions.py @@ -0,0 +1,146 @@ +""" +Fetch functions for client versions analysis. + +Each function executes SQL and writes directly to Parquet. +Aggregation is pushed to ClickHouse for efficiency. +""" + +from pathlib import Path + +# Known consensus clients to track individually (others grouped as "Others") +KNOWN_CLIENTS_SQL = """ +CASE + WHEN lower(remote_agent_implementation) IN ( + 'lighthouse', 'teku', 'nimbus', 'erigon', 'grandine', 'lodestar', 'prysm' + ) THEN remote_agent_implementation + WHEN remote_agent_implementation IS NULL + OR remote_agent_implementation = '' + OR remote_agent_implementation = 'unknown' + THEN 'unknown' + ELSE 'Others' +END +""".strip() + + +def _get_date_filter(target_date: str, column: str = "event_date_time") -> str: + """Generate SQL date filter for a specific date.""" + return f"{column} >= '{target_date}' AND {column} < '{target_date}'::date + INTERVAL 1 DAY" + + +def fetch_client_hourly( + client, + target_date: str, + output_path: Path, + network: str = "mainnet", +) -> int: + """Fetch hourly connection counts by client implementation. + + Used for time series visualization and deriving total counts. + + Returns row count. + """ + date_filter = _get_date_filter(target_date) + + query = f""" +SELECT + toStartOfHour(event_date_time) AS hour, + {KNOWN_CLIENTS_SQL} AS client, + count() AS connections +FROM default.libp2p_connected FINAL +WHERE {date_filter} + AND meta_network_name = '{network}' +GROUP BY hour, client +ORDER BY hour, client +""" + + df = client.query_df(query) + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(output_path, index=False) + return len(df) + + +def fetch_client_version_dist( + client, + target_date: str, + output_path: Path, + network: str = "mainnet", +) -> int: + """Fetch version distribution per client. + + Returns top versions per client with connection counts. + Used for version distribution bar charts. + + Returns row count. + """ + date_filter = _get_date_filter(target_date) + + query = f""" +SELECT + client, + version, + connections +FROM ( + SELECT + {KNOWN_CLIENTS_SQL} AS client, + coalesce(remote_agent_version, 'unknown') AS version, + count() AS connections, + row_number() OVER (PARTITION BY client ORDER BY count() DESC) AS rn + FROM default.libp2p_connected FINAL + WHERE {date_filter} + AND meta_network_name = '{network}' + GROUP BY client, version +) +WHERE rn <= 15 +ORDER BY client, connections DESC +""" + + df = client.query_df(query) + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(output_path, index=False) + return len(df) + + +def fetch_client_summary( + client, + target_date: str, + output_path: Path, + network: str = "mainnet", +) -> int: + """Fetch summary statistics per client. + + Includes unique peer counts (properly deduplicated per client). + Used for summary table. + + Returns row count. + """ + date_filter = _get_date_filter(target_date) + + query = f""" +SELECT + {KNOWN_CLIENTS_SQL} AS client, + count() AS connections, + uniqExact(remote_peer_id_unique_key) AS unique_peers, + count(DISTINCT coalesce(remote_agent_version, 'unknown')) AS version_count, + argMax(coalesce(remote_agent_version, 'unknown'), cnt) AS top_version +FROM ( + SELECT + remote_agent_implementation, + remote_agent_version, + remote_peer_id_unique_key, + count() OVER ( + PARTITION BY + {KNOWN_CLIENTS_SQL}, + coalesce(remote_agent_version, 'unknown') + ) AS cnt + FROM default.libp2p_connected FINAL + WHERE {date_filter} + AND meta_network_name = '{network}' +) +GROUP BY client +ORDER BY connections DESC +""" + + df = client.query_df(query) + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(output_path, index=False) + return len(df)