Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions buf.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
version: v2
deps:
- name: buf.build/googleapis/googleapis
commit: 72c8614f3bd0466ea67931ef2c43d608
digest: b5:13efeea24e633fd45327390bdee941207a8727e96cf01affb84c1e4100fd8f48a42bbd508df11930cd2884629bafad685df1ac3111bc78cdaefcd38c9371c6b1
commit: 004180b77378443887d3b55cabc00384
digest: b5:e8f475fe3330f31f5fd86ac689093bcd274e19611a09db91f41d637cb9197881ce89882b94d13a58738e53c91c6e4bae7dc1feba85f590164c975a89e25115dc
2 changes: 2 additions & 0 deletions migrations/033_fct_peer_head_last_30m.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_peer_head_last_30m ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_peer_head_last_30m_local ON CLUSTER '{cluster}';
27 changes: 27 additions & 0 deletions migrations/033_fct_peer_head_last_30m.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE TABLE `${NETWORK_NAME}`.fct_peer_head_last_30m_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`head_slot` UInt32 COMMENT 'The head slot reported by peers' CODEC(DoubleDelta, ZSTD(1)),
`head_root` String COMMENT 'The head block root reported by peers (unknown if not available)' CODEC(ZSTD(1)),
`peer_count` UInt32 COMMENT 'Total number of peers at this head' CODEC(ZSTD(1)),
`count_by_client` Map(String, UInt32) COMMENT 'Peer count breakdown by client implementation' CODEC(ZSTD(1)),
`count_by_country` Map(String, UInt32) COMMENT 'Peer count breakdown by country' CODEC(ZSTD(1)),
`count_by_continent` Map(String, UInt32) COMMENT 'Peer count breakdown by continent code' CODEC(ZSTD(1)),
`count_by_fork_digest` Map(String, UInt32) COMMENT 'Peer count breakdown by fork digest' CODEC(ZSTD(1)),
`count_by_platform` Map(String, UInt32) COMMENT 'Peer count breakdown by platform (os)' CODEC(ZSTD(1)),
`count_by_finalized_epoch` Map(String, UInt32) COMMENT 'Peer count breakdown by finalized epoch' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY toStartOfMonth(updated_date_time)
ORDER BY (`head_slot`, `head_root`)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Aggregated peer head distribution from the last 30 minutes of handle_status events';

CREATE TABLE `${NETWORK_NAME}`.fct_peer_head_last_30m ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_peer_head_last_30m_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_peer_head_last_30m_local,
cityHash64(`head_slot`, `head_root`)
);
2 changes: 2 additions & 0 deletions migrations/034_int_peer_head.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.int_peer_head ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.int_peer_head_local ON CLUSTER '{cluster}';
30 changes: 30 additions & 0 deletions migrations/034_int_peer_head.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
CREATE TABLE `${NETWORK_NAME}`.int_peer_head_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime COMMENT 'Timestamp when the status was observed' CODEC(DoubleDelta, ZSTD(1)),
`peer_id_unique_key` String COMMENT 'Unique key for the peer' CODEC(ZSTD(1)),
`head_slot` UInt32 COMMENT 'The head slot reported by the peer' CODEC(DoubleDelta, ZSTD(1)),
`head_root` String COMMENT 'The head block root reported by the peer' CODEC(ZSTD(1)),
`fork_digest` LowCardinality(String) COMMENT 'The fork digest reported by the peer' CODEC(ZSTD(1)),
`finalized_epoch` Nullable(UInt32) COMMENT 'The finalized epoch reported by the peer' CODEC(ZSTD(1)),
`client` LowCardinality(String) COMMENT 'Client implementation (e.g., lighthouse, prysm)' CODEC(ZSTD(1)),
`client_version` LowCardinality(String) COMMENT 'Client version' CODEC(ZSTD(1)),
`platform` LowCardinality(String) COMMENT 'Platform/OS' CODEC(ZSTD(1)),
`country` LowCardinality(String) COMMENT 'Country name' CODEC(ZSTD(1)),
`country_code` LowCardinality(String) COMMENT 'Country code' CODEC(ZSTD(1)),
`continent_code` LowCardinality(String) COMMENT 'Continent code' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY toStartOfDay(event_date_time)
ORDER BY (`head_slot`, `peer_id_unique_key`)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Per-peer head observations from handle_status events, enriched with metadata';

CREATE TABLE `${NETWORK_NAME}`.int_peer_head ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.int_peer_head_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
int_peer_head_local,
cityHash64(`head_slot`, `peer_id_unique_key`)
);
25 changes: 25 additions & 0 deletions models/external/libp2p_handle_status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
table: libp2p_handle_status
cache:
incremental_scan_interval: 5s
full_scan_interval: 24h
interval:
type: datetime
lag: 60
---
SELECT
{{ if .cache.is_incremental_scan }}
'{{ .cache.previous_min }}' as min,
{{ else }}
toUnixTimestamp(min(event_date_time)) as min,
{{ end }}
toUnixTimestamp(max(event_date_time)) as max
FROM {{ .self.helpers.from }}
WHERE
meta_network_name = '{{ .env.NETWORK }}'

{{ if .cache.is_incremental_scan }}
AND event_date_time >= fromUnixTimestamp({{ .cache.previous_max }})
{{ else }}
AND event_date_time >= fromUnixTimestamp({{ default "0" .env.EXTERNAL_MODEL_MIN_TIMESTAMP }})
{{ end }}
25 changes: 25 additions & 0 deletions models/external/libp2p_synthetic_heartbeat.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
table: libp2p_synthetic_heartbeat
cache:
incremental_scan_interval: 5s
full_scan_interval: 24h
interval:
type: datetime
lag: 60
---
SELECT
{{ if .cache.is_incremental_scan }}
'{{ .cache.previous_min }}' as min,
{{ else }}
toUnixTimestamp(min(event_date_time)) as min,
{{ end }}
toUnixTimestamp(max(event_date_time)) as max
FROM {{ .self.helpers.from }}
WHERE
meta_network_name = '{{ .env.NETWORK }}'

{{ if .cache.is_incremental_scan }}
AND event_date_time >= fromUnixTimestamp({{ .cache.previous_max }})
{{ else }}
AND event_date_time >= fromUnixTimestamp({{ default "0" .env.EXTERNAL_MODEL_MIN_TIMESTAMP }})
{{ end }}
58 changes: 58 additions & 0 deletions models/transformations/fct_peer_head_last_30m.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
table: fct_peer_head_last_30m
type: scheduled
schedule: "@every 30s"
tags:
- libp2p
- peer
- head
dependencies:
- "{{transformation}}.int_peer_head"
---
INSERT INTO `{{ .self.database }}`.`{{ .self.table }}`
WITH
-- Get the lookback interval from env, defaulting to 30 minutes
lookback_minutes AS (
SELECT {{ default "30" .env.PEER_HEAD_LOOKBACK_MINUTES }} AS minutes
),

-- Get the latest observation per peer from int_peer_head within the lookback window
latest_peer_head AS (
SELECT
peer_id_unique_key,
argMax(head_slot, event_date_time) AS head_slot,
argMax(head_root, event_date_time) AS head_root,
argMax(fork_digest, event_date_time) AS fork_digest,
argMax(finalized_epoch, event_date_time) AS finalized_epoch,
argMax(client, event_date_time) AS client,
argMax(platform, event_date_time) AS platform,
argMax(continent_code, event_date_time) AS continent_code,
argMax(country, event_date_time) AS country
FROM `{{ .self.database }}`.int_peer_head FINAL
WHERE event_date_time >= NOW() - INTERVAL (SELECT minutes FROM lookback_minutes) MINUTE
GROUP BY peer_id_unique_key
)

-- Aggregate by head_slot/head_root to get counts
SELECT
fromUnixTimestamp({{ .task.start }}) AS updated_date_time,
head_slot,
head_root,
count(*) AS peer_count,
sumMap(map(client, toUInt32(1))) AS count_by_client,
sumMap(map(country, toUInt32(1))) AS count_by_country,
sumMap(map(continent_code, toUInt32(1))) AS count_by_continent,
sumMap(map(fork_digest, toUInt32(1))) AS count_by_fork_digest,
sumMap(map(platform, toUInt32(1))) AS count_by_platform,
sumMap(map(toString(coalesce(finalized_epoch, toUInt32(0))), toUInt32(1))) AS count_by_finalized_epoch
FROM latest_peer_head
WHERE head_slot IS NOT NULL
GROUP BY head_slot, head_root
ORDER BY peer_count DESC;

-- Remove previous snapshot to keep only the latest
DELETE FROM `{{ .self.database }}`.`{{ .self.table }}{{ if .clickhouse.cluster }}{{ .clickhouse.local_suffix }}{{ end }}`
{{ if .clickhouse.cluster }}
ON CLUSTER '{{ .clickhouse.cluster }}'
{{ end }}
WHERE updated_date_time != fromUnixTimestamp({{ .task.start }});
103 changes: 103 additions & 0 deletions models/transformations/int_peer_head.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
---
table: int_peer_head
type: incremental
interval:
type: datetime
max: 3600
schedules:
forwardfill: "@every 30s"
backfill: "@every 60s"
tags:
- libp2p
- peer
- head
dependencies:
- "{{external}}.libp2p_handle_status"
- "{{external}}.libp2p_synthetic_heartbeat"
---
INSERT INTO `{{ .self.database }}`.`{{ .self.table }}`
WITH
-- Extract peer head data from handle_status events
-- For outbound: response_* is the peer's data
-- For inbound: request_* is the peer's data
peer_status AS (
SELECT
event_date_time,
peer_id_unique_key,
CASE
WHEN direction = 'outbound' THEN response_head_slot
ELSE request_head_slot
END AS head_slot,
CASE
WHEN direction = 'outbound' THEN response_head_root
ELSE request_head_root
END AS head_root,
CASE
WHEN direction = 'outbound' THEN response_fork_digest
ELSE request_fork_digest
END AS fork_digest,
CASE
WHEN direction = 'outbound' THEN response_finalized_epoch
ELSE request_finalized_epoch
END AS finalized_epoch
FROM {{ index .dep "{{external}}" "libp2p_handle_status" "helpers" "from" }}
WHERE
meta_network_name = '{{ .env.NETWORK }}'
AND event_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) AND fromUnixTimestamp({{ .bounds.end }})
AND error IS NULL
AND (
(direction = 'outbound' AND response_head_slot IS NOT NULL) OR
(direction != 'outbound' AND request_head_slot IS NOT NULL) OR
(direction IS NULL AND request_head_slot IS NOT NULL)
)
),

-- Get peer metadata from synthetic heartbeat (using a wider window for metadata)
peer_metadata AS (
SELECT
remote_peer_id_unique_key,
argMax(remote_agent_implementation, event_date_time) AS client,
argMax(remote_agent_version, event_date_time) AS client_version,
argMax(remote_agent_platform, event_date_time) AS platform,
argMax(remote_geo_country, event_date_time) AS country,
argMax(remote_geo_country_code, event_date_time) AS country_code,
argMax(remote_geo_continent_code, event_date_time) AS continent_code
FROM {{ index .dep "{{external}}" "libp2p_synthetic_heartbeat" "helpers" "from" }}
WHERE
meta_network_name = '{{ .env.NETWORK }}'
-- Use a wider window to ensure we have metadata for peers
AND event_date_time BETWEEN fromUnixTimestamp({{ .bounds.start }}) - INTERVAL 1 HOUR AND fromUnixTimestamp({{ .bounds.end }})
GROUP BY remote_peer_id_unique_key
)

-- Join status with metadata and deduplicate to latest per (head_slot, peer)
SELECT
fromUnixTimestamp({{ .task.start }}) AS updated_date_time,
argMax(s.event_date_time, s.event_date_time) AS event_date_time,
s.peer_id_unique_key,
s.head_slot,
-- Normalize head_root: ensure 0x prefix, remove null bytes
argMax(
if(
nullIf(replaceAll(s.head_root, '\0', ''), '') IS NULL,
'unknown',
if(
startsWith(replaceAll(s.head_root, '\0', ''), '0x'),
replaceAll(s.head_root, '\0', ''),
concat('0x', replaceAll(s.head_root, '\0', ''))
)
),
s.event_date_time
) AS head_root,
argMax(coalesce(nullIf(s.fork_digest, ''), 'unknown'), s.event_date_time) AS fork_digest,
argMax(s.finalized_epoch, s.event_date_time) AS finalized_epoch,
argMax(coalesce(nullIf(m.client, ''), 'unknown'), s.event_date_time) AS client,
argMax(coalesce(nullIf(m.client_version, ''), 'unknown'), s.event_date_time) AS client_version,
argMax(coalesce(nullIf(m.platform, ''), 'unknown'), s.event_date_time) AS platform,
argMax(coalesce(nullIf(m.country, ''), 'unknown'), s.event_date_time) AS country,
argMax(coalesce(nullIf(m.country_code, ''), 'unknown'), s.event_date_time) AS country_code,
argMax(coalesce(nullIf(m.continent_code, ''), 'unknown'), s.event_date_time) AS continent_code
FROM peer_status s
LEFT JOIN peer_metadata m ON s.peer_id_unique_key = m.remote_peer_id_unique_key
WHERE s.head_slot IS NOT NULL
GROUP BY s.head_slot, s.peer_id_unique_key;
Loading
Loading