Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions migrations/045_storage_slot_expiry_by_1y.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

DROP TABLE IF EXISTS `${NETWORK_NAME}`.int_storage_slot_expiry_by_1y ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.int_storage_slot_expiry_by_1y_local ON CLUSTER '{cluster}';
22 changes: 22 additions & 0 deletions migrations/045_storage_slot_expiry_by_1y.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE `${NETWORK_NAME}`.int_storage_slot_expiry_by_1y_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`block_number` UInt32 COMMENT 'The block number where this slot expiry is recorded (1 year after it was set)' CODEC(DoubleDelta, ZSTD(1)),
`address` String COMMENT 'The contract address' CODEC(ZSTD(1)),
`slot_key` String COMMENT 'The storage slot key' CODEC(ZSTD(1)),
`effective_bytes` UInt8 COMMENT 'Number of effective bytes that were set and are now being marked for expiry (0-32)' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY intDiv(block_number, 5000000)
ORDER BY (block_number, address, slot_key)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Storage slot expiries - records slots that were set 1 year ago and are now candidates for clearing';

CREATE TABLE `${NETWORK_NAME}`.int_storage_slot_expiry_by_1y ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.int_storage_slot_expiry_by_1y_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
int_storage_slot_expiry_by_1y_local,
cityHash64(block_number, address)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_1y ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_1y_local ON CLUSTER '{cluster}';
25 changes: 25 additions & 0 deletions migrations/046_storage_slot_state_with_expiry_by_1y.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_1y_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`block_number` UInt32 COMMENT 'The block number' CODEC(DoubleDelta, ZSTD(1)),
`expiry_slots_delta` Int32 COMMENT 'Slots expired this block (always <= 0)' CODEC(DoubleDelta, ZSTD(1)),
`expiry_bytes_delta` Int64 COMMENT 'Bytes freed by expiry this block (always <= 0)' CODEC(DoubleDelta, ZSTD(1)),
`cumulative_expiry_slots` Int64 COMMENT 'Cumulative slots removed by expiry up to this block' CODEC(DoubleDelta, ZSTD(1)),
`cumulative_expiry_bytes` Int64 COMMENT 'Cumulative bytes freed by expiry up to this block' CODEC(DoubleDelta, ZSTD(1)),
`active_slots` Int64 COMMENT 'Cumulative count of active storage slots at this block (with 1-year expiry applied)' CODEC(DoubleDelta, ZSTD(1)),
`effective_bytes` Int64 COMMENT 'Cumulative sum of effective bytes across all active slots at this block (with 1-year expiry applied)' CODEC(DoubleDelta, ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY intDiv(block_number, 5000000)
ORDER BY (block_number)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Cumulative storage slot state per block with 1-year expiry policy applied - slots unused for 1 year are cleared';

CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_1y ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_1y_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_storage_slot_state_with_expiry_by_1y_local,
cityHash64(block_number)
);
2 changes: 2 additions & 0 deletions migrations/047_storage_slot_delta_by_address.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_delta_by_address ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_delta_by_address_local ON CLUSTER '{cluster}';
22 changes: 22 additions & 0 deletions migrations/047_storage_slot_delta_by_address.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_delta_by_address_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`block_number` UInt32 COMMENT 'The block number' CODEC(DoubleDelta, ZSTD(1)),
`address` String COMMENT 'The contract address (lowercase hex)' CODEC(ZSTD(1)),
`slots_delta` Int32 COMMENT 'Change in active slots for this address in this block (positive=activated, negative=deactivated)' CODEC(DoubleDelta, ZSTD(1)),
`bytes_delta` Int64 COMMENT 'Change in effective bytes for this address in this block' CODEC(DoubleDelta, ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY intDiv(block_number, 5000000)
ORDER BY (address, block_number)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Storage slot deltas per block per address - tracks slot and byte changes for each contract';

CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_delta_by_address ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_storage_slot_delta_by_address_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_storage_slot_delta_by_address_local,
cityHash64(address)
);
2 changes: 2 additions & 0 deletions migrations/048_storage_slot_state_by_address.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_state_by_address ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_state_by_address_local ON CLUSTER '{cluster}';
22 changes: 22 additions & 0 deletions migrations/048_storage_slot_state_by_address.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_state_by_address_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`address` String COMMENT 'The contract address (lowercase hex)' CODEC(ZSTD(1)),
`last_block_number` UInt32 COMMENT 'The last block number where this address had storage changes' CODEC(DoubleDelta, ZSTD(1)),
`active_slots` Int64 COMMENT 'Current count of active storage slots for this address' CODEC(DoubleDelta, ZSTD(1)),
`effective_bytes` Int64 COMMENT 'Current sum of effective bytes for this address' CODEC(DoubleDelta, ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY cityHash64(`address`) % 16
ORDER BY (address)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Current storage slot state per address - single row per contract with latest cumulative values';

CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_state_by_address ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_storage_slot_state_by_address_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_storage_slot_state_by_address_local,
cityHash64(address)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_expiry_delta_by_address_1y ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_expiry_delta_by_address_1y_local ON CLUSTER '{cluster}';
22 changes: 22 additions & 0 deletions migrations/049_storage_slot_expiry_delta_by_address_1y.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_expiry_delta_by_address_1y_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`block_number` UInt32 COMMENT 'The block number where expiries occurred' CODEC(DoubleDelta, ZSTD(1)),
`address` String COMMENT 'The contract address (lowercase hex)' CODEC(ZSTD(1)),
`expiry_slots_delta` Int32 COMMENT 'Slots expired this block for this address (always <= 0)' CODEC(DoubleDelta, ZSTD(1)),
`expiry_bytes_delta` Int64 COMMENT 'Bytes freed by expiry this block for this address (always <= 0)' CODEC(DoubleDelta, ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY intDiv(block_number, 5000000)
ORDER BY (address, block_number)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Storage slot expiry deltas per block per address with 1-year expiry policy - tracks slots cleared due to inactivity';

CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_expiry_delta_by_address_1y ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_storage_slot_expiry_delta_by_address_1y_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_storage_slot_expiry_delta_by_address_1y_local,
cityHash64(address)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_address_1y ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_address_1y_local ON CLUSTER '{cluster}';
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_address_1y_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`address` String COMMENT 'The contract address (lowercase hex)' CODEC(ZSTD(1)),
`cumulative_expiry_slots` Int64 COMMENT 'Cumulative slots removed by expiry for this address' CODEC(DoubleDelta, ZSTD(1)),
`cumulative_expiry_bytes` Int64 COMMENT 'Cumulative bytes freed by expiry for this address' CODEC(DoubleDelta, ZSTD(1)),
`active_slots` Int64 COMMENT 'Current count of active storage slots for this address (with 1-year expiry applied)' CODEC(DoubleDelta, ZSTD(1)),
`effective_bytes` Int64 COMMENT 'Current sum of effective bytes for this address (with 1-year expiry applied)' CODEC(DoubleDelta, ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY cityHash64(`address`) % 16
ORDER BY (address)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Current storage slot state per address with 1-year expiry policy applied - single row per contract with latest cumulative values';

CREATE TABLE `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_address_1y ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_storage_slot_state_with_expiry_by_address_1y_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_storage_slot_state_with_expiry_by_address_1y_local,
cityHash64(address)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_effective_bytes ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_effective_bytes_local ON CLUSTER '{cluster}';
22 changes: 22 additions & 0 deletions migrations/051_storage_slot_top_10_by_effective_bytes.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_effective_bytes_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`rank` UInt32 COMMENT 'Rank by effective bytes (1=highest)' CODEC(DoubleDelta, ZSTD(1)),
`address` String COMMENT 'The contract address' CODEC(ZSTD(1)),
`active_slots` Int64 COMMENT 'Current count of active storage slots for this address' CODEC(DoubleDelta, ZSTD(1)),
`effective_bytes` Int64 COMMENT 'Total effective bytes of storage slots for this address' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY tuple()
ORDER BY (`rank`)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Top 10 addresses by effective bytes of storage slots';

CREATE TABLE `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_effective_bytes ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_effective_bytes_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_address_storage_slot_top_10_by_effective_bytes_local,
cityHash64(`rank`)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_cumulative_expiry_bytes ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_cumulative_expiry_bytes_local ON CLUSTER '{cluster}';
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_cumulative_expiry_bytes_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`rank` UInt32 COMMENT 'Rank by cumulative expiry bytes (1=highest)' CODEC(DoubleDelta, ZSTD(1)),
`address` String COMMENT 'The contract address' CODEC(ZSTD(1)),
`cumulative_expiry_slots` Int64 COMMENT 'Cumulative slots removed by 1-year expiry for this address' CODEC(DoubleDelta, ZSTD(1)),
`cumulative_expiry_bytes` Int64 COMMENT 'Cumulative bytes freed by 1-year expiry for this address' CODEC(ZSTD(1)),
`active_slots` Int64 COMMENT 'Current count of active storage slots for this address (with 1-year expiry applied)' CODEC(DoubleDelta, ZSTD(1)),
`effective_bytes` Int64 COMMENT 'Current sum of effective bytes for this address (with 1-year expiry applied)' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
'{replica}',
`updated_date_time`
) PARTITION BY tuple()
ORDER BY (`rank`)
SETTINGS
deduplicate_merge_projection_mode = 'rebuild'
COMMENT 'Top 10 addresses by cumulative expiry bytes (1-year expiry policy)';

CREATE TABLE `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_cumulative_expiry_bytes ON CLUSTER '{cluster}' AS `${NETWORK_NAME}`.fct_address_storage_slot_top_10_by_cumulative_expiry_bytes_local ENGINE = Distributed(
'{cluster}',
'${NETWORK_NAME}',
fct_address_storage_slot_top_10_by_cumulative_expiry_bytes_local,
cityHash64(`rank`)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
table: fct_address_storage_slot_top_10_by_cumulative_expiry_bytes
type: scheduled
schedule: "@every 1m"
tags:
- address
- storage
- expiry
- bytes
- top10
dependencies:
- "{{transformation}}.fct_storage_slot_state_with_expiry_by_address_1y"
---
INSERT INTO
`{{ .self.database }}`.`{{ .self.table }}`
SELECT
fromUnixTimestamp({{ .task.start }}) as updated_date_time,
row_number() OVER (ORDER BY abs(cumulative_expiry_bytes) DESC, address ASC) as rank,
address,
cumulative_expiry_slots,
cumulative_expiry_bytes,
active_slots,
effective_bytes
FROM {{ index .dep "{{transformation}}" "fct_storage_slot_state_with_expiry_by_address_1y" "helpers" "from" }} FINAL
ORDER BY rank ASC
LIMIT 10;

DELETE FROM
`{{ .self.database }}`.`{{ .self.table }}{{ if .clickhouse.cluster }}{{ .clickhouse.local_suffix }}{{ end }}`
{{ if .clickhouse.cluster }}
ON CLUSTER '{{ .clickhouse.cluster }}'
{{ end }}
WHERE updated_date_time != fromUnixTimestamp({{ .task.start }});
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
table: fct_address_storage_slot_top_10_by_effective_bytes
type: scheduled
schedule: "@every 1m"
tags:
- address
- storage
- bytes
- top10
dependencies:
- "{{transformation}}.fct_storage_slot_state_by_address"
---
INSERT INTO
`{{ .self.database }}`.`{{ .self.table }}`
SELECT
fromUnixTimestamp({{ .task.start }}) as updated_date_time,
row_number() OVER (ORDER BY effective_bytes DESC, address ASC) as rank,
address,
active_slots,
effective_bytes
FROM {{ index .dep "{{transformation}}" "fct_storage_slot_state_by_address" "helpers" "from" }} FINAL
ORDER BY rank ASC
LIMIT 10;

DELETE FROM
`{{ .self.database }}`.`{{ .self.table }}{{ if .clickhouse.cluster }}{{ .clickhouse.local_suffix }}{{ end }}`
{{ if .clickhouse.cluster }}
ON CLUSTER '{{ .clickhouse.cluster }}'
{{ end }}
WHERE updated_date_time != fromUnixTimestamp({{ .task.start }});
33 changes: 33 additions & 0 deletions models/transformations/fct_storage_slot_delta_by_address.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
table: fct_storage_slot_delta_by_address
type: incremental
interval:
type: block
max: 100000
fill:
direction: "tail"
allow_gap_skipping: false
schedules:
forwardfill: "@every 5s"
tags:
- execution
- storage
dependencies:
- "{{transformation}}.int_storage_slot_diff"
---
INSERT INTO
`{{ .self.database }}`.`{{ .self.table }}`
SELECT
now() as updated_date_time,
block_number,
lower(address) as address,
-- Slots activated: from=0, to>0 (+1)
-- Slots deactivated: from>0, to=0 (-1)
-- Slots modified: from>0, to>0 (no change)
toInt32(countIf(effective_bytes_from = 0 AND effective_bytes_to > 0))
- toInt32(countIf(effective_bytes_from > 0 AND effective_bytes_to = 0)) as slots_delta,
-- Net byte change: sum of (to - from) for all changes
SUM(toInt64(effective_bytes_to) - toInt64(effective_bytes_from)) as bytes_delta
FROM {{ index .dep "{{transformation}}" "int_storage_slot_diff" "helpers" "from" }} FINAL
WHERE block_number BETWEEN {{ .bounds.start }} AND {{ .bounds.end }}
GROUP BY block_number, address
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
table: fct_storage_slot_expiry_delta_by_address_1y
type: incremental
interval:
type: block
max: 10000
fill:
direction: "tail"
allow_gap_skipping: false
schedules:
forwardfill: "@every 5s"
tags:
- execution
- storage
dependencies:
- "{{transformation}}.int_storage_slot_expiry_by_1y"
---
INSERT INTO
`{{ .self.database }}`.`{{ .self.table }}`
SELECT
now() as updated_date_time,
block_number,
lower(address) as address,
-toInt32(count()) as expiry_slots_delta,
-SUM(toInt64(effective_bytes)) as expiry_bytes_delta
FROM {{ index .dep "{{transformation}}" "int_storage_slot_expiry_by_1y" "helpers" "from" }} FINAL
WHERE block_number BETWEEN {{ .bounds.start }} AND {{ .bounds.end }}
GROUP BY block_number, address
50 changes: 50 additions & 0 deletions models/transformations/fct_storage_slot_state_by_address.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
table: fct_storage_slot_state_by_address
type: incremental
interval:
type: block
max: 100000
fill:
direction: "tail"
allow_gap_skipping: false
schedules:
forwardfill: "@every 5s"
tags:
- execution
- storage
- cumulative
dependencies:
- "{{transformation}}.fct_storage_slot_delta_by_address"
---
INSERT INTO
`{{ .self.database }}`.`{{ .self.table }}`
WITH
-- Get deltas for this chunk, aggregated per address
chunk_deltas AS (
SELECT
address,
max(block_number) as last_block_number,
SUM(slots_delta) as slots_delta,
SUM(bytes_delta) as bytes_delta
FROM {{ index .dep "{{transformation}}" "fct_storage_slot_delta_by_address" "helpers" "from" }} FINAL
WHERE block_number BETWEEN {{ .bounds.start }} AND {{ .bounds.end }}
GROUP BY address
),
-- Get previous state for addresses in this chunk
prev_state AS (
SELECT
address,
argMax(active_slots, updated_date_time) as active_slots,
argMax(effective_bytes, updated_date_time) as effective_bytes
FROM `{{ .self.database }}`.`{{ .self.table }}`
WHERE address IN (SELECT address FROM chunk_deltas)
GROUP BY address
)
SELECT
now() as updated_date_time,
d.address,
d.last_block_number,
COALESCE(p.active_slots, 0) + d.slots_delta as active_slots,
COALESCE(p.effective_bytes, 0) + d.bytes_delta as effective_bytes
FROM chunk_deltas d
LEFT JOIN prev_state p ON d.address = p.address
Loading
Loading