Skip to content

Commit 029db8d

Browse files
authored
Merge pull request #7 from initia-labs/feat/pruning
Feat: Add Prunner.
2 parents de88fe6 + f6a82ed commit 029db8d

File tree

11 files changed

+433
-1108
lines changed

11 files changed

+433
-1108
lines changed
+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package prunner_cmd
2+
3+
import (
4+
"github.com/initia-labs/core-indexer/informative-indexer/prunner"
5+
"github.com/spf13/cobra"
6+
"os"
7+
"strconv"
8+
)
9+
10+
const (
11+
FlagDBConnectionString = "db"
12+
FlagBackupBucketName = "backup-bucket-name"
13+
FlagBackupFilePrefix = "backup-file-prefix"
14+
FlagPruningKeepBlock = "pruning-keep-block"
15+
FlagPruningBlockInterval = "pruning-block-interval"
16+
FlagPruningInterval = "pruning-interval"
17+
FlagChain = "chain"
18+
FlagEnvironment = "environment"
19+
FlagCommitSHA = "commit-sha"
20+
)
21+
22+
func PruneCmd() *cobra.Command {
23+
cmd := &cobra.Command{
24+
Use: "prune",
25+
Short: "Pruning and backup data",
26+
Args: cobra.ExactArgs(0),
27+
RunE: func(cmd *cobra.Command, args []string) error {
28+
dbConnectionString, _ := cmd.Flags().GetString(FlagDBConnectionString)
29+
backupBucketName, _ := cmd.Flags().GetString(FlagBackupBucketName)
30+
filePrefix, _ := cmd.Flags().GetString(FlagBackupFilePrefix)
31+
pruningKeepBlock, _ := cmd.Flags().GetUint64(FlagPruningKeepBlock)
32+
pruningBlockInterval, _ := cmd.Flags().GetUint64(FlagPruningBlockInterval)
33+
pruningInterval, _ := cmd.Flags().GetUint64(FlagPruningInterval)
34+
chain, _ := cmd.Flags().GetString(FlagChain)
35+
environment, _ := cmd.Flags().GetString(FlagEnvironment)
36+
commitSHA, _ := cmd.Flags().GetString(FlagCommitSHA)
37+
38+
p, err := prunner.NewPrunner(&prunner.PrunnerConfig{
39+
DBConnectionString: dbConnectionString,
40+
BackupBucketName: backupBucketName,
41+
BackupFilePrefix: filePrefix,
42+
PruningKeepBlock: int64(pruningKeepBlock),
43+
PruningBlockInterval: int64(pruningBlockInterval),
44+
PruningInterval: int64(pruningInterval),
45+
Chain: chain,
46+
Environment: environment,
47+
CommitSHA: commitSHA,
48+
})
49+
50+
if err != nil {
51+
return err
52+
}
53+
54+
p.Prune()
55+
56+
return nil
57+
},
58+
}
59+
60+
pruningKeepBlock, err := strconv.ParseInt(os.Getenv("PRUNING_KEEP_BLOCK"), 10, 64)
61+
if err != nil {
62+
pruningKeepBlock = 500000
63+
}
64+
65+
pruningBlockInterval, err := strconv.ParseInt(os.Getenv("PRUNING_BLOCK_INTERVAL"), 10, 64)
66+
if err != nil {
67+
pruningBlockInterval = 100000
68+
}
69+
70+
pruningInterval, err := strconv.ParseInt(os.Getenv("PRUNING_INTERVAL"), 10, 64)
71+
{
72+
if err != nil {
73+
pruningInterval = 1
74+
}
75+
}
76+
77+
cmd.Flags().String(FlagDBConnectionString, os.Getenv("DB_CONNECTION_STRING"), "Database connection string")
78+
cmd.Flags().String(FlagBackupBucketName, os.Getenv("BACKUP_BUCKET_NAME"), "Name of the backup bucket")
79+
cmd.Flags().String(FlagBackupFilePrefix, os.Getenv("BACKUP_FILE_PREFIX"), "Prefix for backup files")
80+
cmd.Flags().Uint64(FlagPruningKeepBlock, uint64(pruningKeepBlock), "Number of blocks to keep in the db")
81+
cmd.Flags().Uint64(FlagPruningBlockInterval, uint64(pruningBlockInterval), "Interval for pruning blocks, specified in block height")
82+
cmd.Flags().Uint64(FlagPruningInterval, uint64(pruningInterval), "Pruning interval specified in days")
83+
cmd.Flags().String(FlagChain, os.Getenv("CHAIN"), "Chain ID to prune")
84+
cmd.Flags().String(FlagEnvironment, os.Getenv("ENVIRONMENT"), "Environment")
85+
cmd.Flags().String(FlagCommitSHA, os.Getenv("COMMIT_SHA"), "Commit SHA")
86+
87+
return cmd
88+
}

informative-indexer/cmd/root.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/spf13/cobra"
77

88
flusher "github.com/initia-labs/core-indexer/informative-indexer/cmd/flusher"
9+
prunner "github.com/initia-labs/core-indexer/informative-indexer/cmd/prunner"
910
sweeper "github.com/initia-labs/core-indexer/informative-indexer/cmd/sweeper"
1011
)
1112

@@ -19,6 +20,7 @@ func Execute() {
1920
rootCmd.AddCommand(
2021
sweeper.SweepCmd(),
2122
flusher.FlushCmd(),
23+
prunner.PruneCmd(),
2224
)
2325

2426
err := rootCmd.Execute()

informative-indexer/db/db.go

+51
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strings"
78
"time"
89

910
"github.com/getsentry/sentry-go"
@@ -94,3 +95,53 @@ func InsertFinalizeBlockEventsIgnoreConflict(ctx context.Context, dbTx Queryable
9495

9596
return BulkInsert(ctx, dbTx, "finalize_block_events", columns, values, "ON CONFLICT DO NOTHING")
9697
}
98+
99+
func GetRowCount(ctx context.Context, dbClient Queryable, table string) (int64, error) {
100+
if !isValidTableName(table) {
101+
return 0, fmt.Errorf("invalid table name: %s", table)
102+
}
103+
104+
var count int64
105+
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", table)
106+
err := QueryRowWithTimeout(ctx, dbClient, query).Scan(&count)
107+
if err != nil {
108+
return 0, fmt.Errorf("failed to get row count for table %s: %w", table, err)
109+
}
110+
return count, nil
111+
}
112+
113+
func GetRowsToPruneByBlockHeight(ctx context.Context, dbClient Queryable, table string, threshold int64) (pgx.Rows, error) {
114+
if !isValidTableName(table) {
115+
return nil, fmt.Errorf("invalid table name: %s", table)
116+
}
117+
118+
var query string
119+
var t interface{}
120+
121+
if table == "transaction_events" {
122+
t = TransactionEvent{}
123+
} else if table == "finalize_block_events" {
124+
t = FinalizeBlockEvent{}
125+
}
126+
127+
columns := getColumns(t)
128+
129+
query = fmt.Sprintf("SELECT %s FROM %s WHERE block_height <= $1", strings.Join(columns, ", "), table)
130+
131+
rows, err := QueryRowsWithTimeout(ctx, dbClient, query, threshold)
132+
if err != nil {
133+
return nil, fmt.Errorf("failed to get rows to prune from table %s: %w", table, err)
134+
}
135+
return rows, err
136+
}
137+
138+
func DeleteRowsToPrune(ctx context.Context, dbClient Queryable, table string, threshold int64) error {
139+
if !isValidTableName(table) {
140+
return fmt.Errorf("invalid table name: %s", table)
141+
}
142+
143+
query := fmt.Sprintf("DELETE FROM %s WHERE block_height <= $1", table)
144+
145+
_, err := ExecWithTimeout(ctx, dbClient, query, threshold)
146+
return err
147+
}

informative-indexer/db/finalize_block_event.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package db
22

3-
import "errors"
3+
import (
4+
"errors"
5+
"github.com/jackc/pgx/v5"
6+
)
47

58
type Mode int
69

@@ -38,3 +41,20 @@ type FinalizeBlockEvent struct {
3841
EventIndex int `json:"event_index"`
3942
Mode Mode `json:"mode"`
4043
}
44+
45+
func (f *FinalizeBlockEvent) Unmarshal(rows pgx.Rows) (map[string]interface{}, error) {
46+
err := rows.Scan(&f.BlockHeight, &f.EventKey, &f.EventValue, &f.EventIndex, &f.Mode)
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
row := map[string]interface{}{
52+
"block_height": f.BlockHeight,
53+
"event_key": f.EventKey,
54+
"event_value": f.EventValue,
55+
"event_index": f.EventIndex,
56+
"mode": f.Mode,
57+
}
58+
59+
return row, nil
60+
}
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,28 @@
11
package db
22

3+
import "github.com/jackc/pgx/v5"
4+
35
type TransactionEvent struct {
46
TransactionHash string `json:"transaction_hash"`
57
BlockHeight int64 `json:"block_height"`
68
EventKey string `json:"event_key"`
79
EventValue string `json:"event_value"`
810
EventIndex int `json:"event_index"`
911
}
12+
13+
func (t *TransactionEvent) Unmarshal(rows pgx.Rows) (map[string]interface{}, error) {
14+
err := rows.Scan(&t.TransactionHash, &t.BlockHeight, &t.EventKey, &t.EventValue, &t.EventIndex)
15+
if err != nil {
16+
return nil, err
17+
}
18+
19+
row := map[string]interface{}{
20+
"transaction_hash": t.TransactionHash,
21+
"block_height": t.BlockHeight,
22+
"event_key": t.EventKey,
23+
"event_value": t.EventValue,
24+
"event_index": t.EventIndex,
25+
}
26+
27+
return row, nil
28+
}

informative-indexer/db/util.go

+12
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,15 @@ func BulkInsert(parentCtx context.Context, dbTx Queryable, tableName string, col
8383

8484
return nil
8585
}
86+
87+
func QueryRowsWithTimeout(parentCtx context.Context, dbClient Queryable, query string, args ...interface{}) (pgx.Rows, error) {
88+
ctx, cancel := context.WithTimeout(context.Background(), QueryTimeout)
89+
defer cancel()
90+
91+
results, err := dbClient.Query(ctx, query, args...)
92+
if err != nil {
93+
return nil, err
94+
}
95+
96+
return results, err
97+
}
+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package db
2+
3+
var validTableNames = []string{
4+
"transaction_events",
5+
"finalize_block_events",
6+
}
7+
8+
func isValidTableName(tableName string) bool {
9+
for _, validTable := range validTableNames {
10+
if tableName == validTable {
11+
return true
12+
}
13+
}
14+
return false
15+
}
16+
17+
func GetValidTableNames() []string {
18+
return validTableNames
19+
}

0 commit comments

Comments
 (0)