diff --git a/commands/es-stats.go b/commands/es-stats.go index 01b8614..f0eebe9 100644 --- a/commands/es-stats.go +++ b/commands/es-stats.go @@ -1,6 +1,7 @@ package commands import ( + "log" "os" "strconv" @@ -17,10 +18,15 @@ type EsStatsCommand struct { // Execute collects ledger staticstics for the current ES cluster func (cmd *EsStatsCommand) Execute() { + min, max, empty := cmd.ES.MinMaxSeq() + + if empty { + log.Println("ES is empty") + return + } + table := tablewriter.NewWriter(os.Stdout) table.SetHeader([]string{"From", "To", "Doc_count"}) - - min, max := cmd.ES.MinMaxSeq() buckets := cmd.esRanges(min, max) for i := 0; i < len(buckets); i++ { diff --git a/commands/export.go b/commands/export.go index 265c61f..8c39ce7 100644 --- a/commands/export.go +++ b/commands/export.go @@ -2,6 +2,7 @@ package commands import ( "bytes" + "io/ioutil" "log" "math/rand" "time" @@ -79,14 +80,18 @@ func (cmd *ExportCommand) exportBlock(i int) { } if !cmd.Config.DryRun { + ioutil.WriteFile("./bulk.json", b.Bytes(), 0644) cmd.ES.IndexWithRetries(&b, cmd.Config.RetryCount) } } func (cmd *ExportCommand) index(b *bytes.Buffer, retry int) { - indexed := cmd.ES.BulkInsert(b) + err := cmd.ES.BulkInsert(b) + + if err != nil { + log.Println(err) + log.Println("Failed, retrying") - if !indexed { if retry > cmd.Config.RetryCount { log.Fatal("Retries for bulk failed, aborting") } diff --git a/commands/fill_gaps.go b/commands/fill_gaps.go new file mode 100644 index 0000000..4e62a45 --- /dev/null +++ b/commands/fill_gaps.go @@ -0,0 +1,186 @@ +package commands + +import ( + "bufio" + "bytes" + "github.com/astroband/astrologer/db" + "github.com/astroband/astrologer/es" + "github.com/astroband/astrologer/support" + "log" + "strings" +) + +const batchSize = 1000 +const INDEX_RETRIES_COUNT = 25 + +type FillGapsCommandConfig struct { + DryRun bool + Start *int + Count *int + BatchSize *int +} + +type FillGapsCommand struct { + ES es.Adapter + DB db.Adapter + Config *FillGapsCommandConfig + + minSeq int + maxSeq int + count int +} + +func (cmd *FillGapsCommand) Execute() { + if cmd.Config.Start != nil && cmd.Config.Count != nil { + cmd.minSeq = *cmd.Config.Start + cmd.count = *cmd.Config.Count + cmd.maxSeq = cmd.minSeq + cmd.count + } else { + var empty bool + cmd.minSeq, cmd.maxSeq, empty = cmd.ES.MinMaxSeq() + + if empty { + log.Println("ES is empty") + return + } + + if cmd.Config.Start != nil { + cmd.minSeq = *cmd.Config.Start + cmd.count = cmd.maxSeq - cmd.minSeq + 1 + } + } + + log.Printf("Min seq is %d, max seq is %d\n", cmd.minSeq, cmd.maxSeq) + + var missing []int + + for i := cmd.minSeq; i < cmd.maxSeq; i += batchSize { + var to int + + if i+batchSize > cmd.maxSeq { + to = cmd.maxSeq + } else { + to = i + batchSize - 1 + } + + seqs := cmd.ES.GetLedgerSeqsInRange(i, to) + + if len(seqs) > 0 { + missing = append(missing, cmd.findMissing(seqs)...) + + if seqs[len(seqs)-1] != to { + missing = append(missing, support.MakeRangeGtLte(seqs[len(seqs)-1], to)...) + } + } else { + missing = append(missing, support.MakeRangeGteLte(i, to)...) + } + } + + cmd.exportSeqs(missing) +} + +func (cmd *FillGapsCommand) findMissing(sortedArr []int) (missing []int) { + for i := 1; i < len(sortedArr); i += 1 { + diff := sortedArr[i] - sortedArr[i-1] + if diff > 1 { + missing = append(missing, support.MakeRangeGtLt(sortedArr[i-1], sortedArr[i])...) + } + } + + // log.Println("Missing:", missing) + return +} + +func (cmd *FillGapsCommand) exportSeqs(seqs []int) { + log.Printf("Exporting %d ledgers\n", len(seqs)) + + var dbSeqs []int + batchSize := *cmd.Config.BatchSize + batchesCounter := 0 + + totalTxs := 0 + totalOps := 0 + + for i := 0; i < len(seqs); i += batchSize { + + to := i + batchSize + if to > len(seqs) { + to = len(seqs) - 1 + } + + var seqsBlock []int + + if len(seqs) == 1 { + seqsBlock = seqs + } else { + seqsBlock = seqs[i:to] + } + + pool.Submit(func() { + batchesCounter += 1 + var b bytes.Buffer + rows := cmd.DB.LedgerHeaderRowFetchBySeqs(seqsBlock) + + for n := 0; n < len(rows); n++ { + // log.Printf("Ingesting %d ledger\n", rows[n].LedgerSeq) + dbSeqs = append(dbSeqs, rows[n].LedgerSeq) + + txs := cmd.DB.TxHistoryRowForSeq(rows[n].LedgerSeq) + totalTxs += len(txs) + + for _, tx := range txs { + totalOps += len(tx.Envelope.Tx.Operations) + } + + fees := cmd.DB.TxFeeHistoryRowsForRows(txs) + + es.SerializeLedger(rows[n], txs, fees, &b) + } + + log.Printf( + "Batch %d: Bulk inserting %d docs, total size is %s\n", + batchesCounter, + countLines(b)/2, + support.ByteCountBinary(b.Len()), + ) + + if !cmd.Config.DryRun { + err := cmd.ES.BulkInsert(&b) + + if err != nil { + log.Printf("Batch %d failed with error: %s\n", err) + } else { + log.Printf("Batch %d is inserted\n", batchesCounter) + } + } + }) + } + + pool.StopWait() + diff := support.Difference(seqs, dbSeqs) + + if len(diff) > 0 { + log.Printf("DB misses next ledgers: %v", diff) + } + + log.Printf("Total txs count: %d, total ops count: %d", totalTxs, totalOps) +} + +func countLines(buf bytes.Buffer) int { + scanner := bufio.NewScanner(strings.NewReader(buf.String())) + + // Set the split function for the scanning operation. + scanner.Split(bufio.ScanLines) + + // Count the lines. + count := 0 + for scanner.Scan() { + count++ + } + + if err := scanner.Err(); err != nil { + log.Fatal("reading input:", err) + } + + return count +} diff --git a/commands/fill_gaps_test.go b/commands/fill_gaps_test.go new file mode 100644 index 0000000..c2be76a --- /dev/null +++ b/commands/fill_gaps_test.go @@ -0,0 +1,18 @@ +package commands + +import ( + "github.com/astroband/astrologer/es/mocks" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestFillGaps(t *testing.T) { + t.Error("test error") + + esClient := new(mocks.EsAdapter) + esClient.On("MinMaxSeq").Return(386, 411) + + missingSeqs := FillGaps(esClient) + + assert.NotEmpty(t, missingSeqs) +} diff --git a/config/main.go b/config/main.go index 08fb708..3ffd29c 100644 --- a/config/main.go +++ b/config/main.go @@ -22,6 +22,7 @@ var ( ingestCommand = kingpin.Command("ingest", "Start real time ingestion") _ = kingpin.Command("stats", "Print database ledger statistics") _ = kingpin.Command("es-stats", "Print ES ranges stats") + fillGapsCommand = kingpin.Command("fill-gaps", "Fill gaps") // DatabaseURL Stellar Core database URL DatabaseURL = kingpin. @@ -72,11 +73,19 @@ var ( // Verbose print data Verbose = exportCommand.Flag("verbose", "Print indexed data").Bool() - // ExportDryRun do not index data - ExportDryRun = exportCommand.Flag("dry-run", "Do not send actual data to Elastic").Bool() + // DryRun do not index data + DryRun = kingpin.Flag("dry-run", "Do not send actual data to Elastic").Bool() // ForceRecreateIndexes Allows indexes to be deleted before creation ForceRecreateIndexes = createIndexCommand.Flag("force", "Delete indexes before creation").Bool() + + FillGapsFrom = fillGapsCommand.Arg("start", "Ledger to start from").Int() + FillGapsCount = fillGapsCommand.Arg("count", "How many ledgers to check").Int() + FillGapsBatchSize = fillGapsCommand. + Flag("batch", "Ledger batch size"). + Short('b'). + Default("50"). + Int() ) func parseNumberWithSign(value string) (r NumberWithSign, err error) { diff --git a/db/ledger_header_row.go b/db/ledger_header_row.go index 415811d..09cba46 100644 --- a/db/ledger_header_row.go +++ b/db/ledger_header_row.go @@ -2,9 +2,9 @@ package db import ( "database/sql" - "log" - + "github.com/jmoiron/sqlx" "github.com/stellar/go/xdr" + "log" ) // LedgerHeaderRow is struct representing ledger in database @@ -107,6 +107,25 @@ func (db *Client) LedgerHeaderNext(seq int) *LedgerHeaderRow { return &h } +func (db *Client) LedgerHeaderRowFetchBySeqs(seqs []int) []LedgerHeaderRow { + ledgers := []LedgerHeaderRow{} + + query, args, err := sqlx.In("SELECT * FROM ledgerheaders WHERE ledgerseq IN (?) ORDER BY ledgerseq ASC;", seqs) + + if err != nil { + log.Fatal(err) + } + + query = db.rawClient.Rebind(query) + err = db.rawClient.Select(&ledgers, query, args...) + + if err != nil { + log.Fatal(err) + } + + return ledgers +} + // LedgerHeaderGaps returns gap positions in ledgerheaders func (db *Client) LedgerHeaderGaps() (r []Gap) { err := db.rawClient.Select(&r, ` diff --git a/db/main.go b/db/main.go index 813ca61..27d251a 100644 --- a/db/main.go +++ b/db/main.go @@ -40,6 +40,7 @@ func utf8Scrub(in string) string { type Adapter interface { LedgerHeaderRowCount(first int, last int) int LedgerHeaderRowFetchBatch(n int, start int, batchSize int) []LedgerHeaderRow + LedgerHeaderRowFetchBySeqs(seqs []int) []LedgerHeaderRow LedgerHeaderLastRow() *LedgerHeaderRow LedgerHeaderFirstRow() *LedgerHeaderRow LedgerHeaderNext(seq int) *LedgerHeaderRow diff --git a/es/adapter.go b/es/adapter.go index 1919aa8..7ed9171 100644 --- a/es/adapter.go +++ b/es/adapter.go @@ -3,6 +3,8 @@ package es import ( "bytes" "encoding/json" + "errors" + "github.com/elastic/go-elasticsearch/v7/esapi" "log" "math/rand" "net/http" @@ -41,7 +43,7 @@ func (es *Client) CreateIndex(name IndexName, body IndexDefinition) { fatalIfError(res, err) } -func (es *Client) searchLedgers(query map[string]interface{}) (r map[string]interface{}) { +func (es *Client) search(query map[string]interface{}, index IndexName) (r map[string]interface{}) { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(query); err != nil { @@ -49,7 +51,7 @@ func (es *Client) searchLedgers(query map[string]interface{}) (r map[string]inte } res, err := es.rawClient.Search( - es.rawClient.Search.WithIndex("ledger"), + es.rawClient.Search.WithIndex(string(index)), es.rawClient.Search.WithBody(&buf), ) @@ -64,8 +66,7 @@ func (es *Client) searchLedgers(query map[string]interface{}) (r map[string]inte return r } -// MinMaxSeq return the minimum and maximum seqnum of ledgers stored in the ES -func (es *Client) MinMaxSeq() (min, max int) { +func (es *Client) MinMaxSeq() (min, max int, storageEmpty bool) { query := map[string]interface{}{ "aggs": map[string]interface{}{ "seq_stats": map[string]interface{}{ @@ -76,14 +77,21 @@ func (es *Client) MinMaxSeq() (min, max int) { }, } - r := es.searchLedgers(query) + r := es.search(query, ledgerHeaderIndexName) aggs := r["aggregations"].(map[string]interface{})["seq_stats"].(map[string]interface{}) - min = int(aggs["min"].(float64)) - max = int(aggs["max"].(float64)) + if aggs["min"] != nil && aggs["max"] != nil { + min = int(aggs["min"].(float64)) + max = int(aggs["max"].(float64)) + storageEmpty = false + } else { + min = 0 + max = 0 + storageEmpty = true + } - return min, max + return } // LedgerSeqRangeQuery fetches ledger ranges from ES @@ -99,21 +107,28 @@ func (es *Client) LedgerSeqRangeQuery(ranges []map[string]interface{}) map[strin }, } - r := es.searchLedgers(query) + r := es.search(query, ledgerHeaderIndexName) aggs := r["aggregations"].(map[string]interface{})["seq_ranges"].(map[string]interface{}) return aggs } -// BulkInsert sends the payload to ES using bulk operation -func (es *Client) BulkInsert(payload *bytes.Buffer) (success bool) { +func (es *Client) BulkInsert(payload *bytes.Buffer) error { res, err := es.rawClient.Bulk(bytes.NewReader(payload.Bytes())) if res != nil { defer res.Body.Close() } - return err == nil && (res == nil || !res.IsError()) + if err != nil { + return err + } + + if res.IsError() { + return errors.New(res.String()) + } + + return nil } // LedgerCountInRange counts number of ledgers from the given range persisted into ES @@ -164,13 +179,13 @@ func (es *Client) GetLedgerSeqsInRange(min, max int) (seqs []int) { "range": map[string]interface{}{ "seq": map[string]interface{}{ "gte": min, - "lt": max, + "lte": max, }, }, }, } - r := es.searchLedgers(query) + r := es.search(query, ledgerHeaderIndexName) for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) { doc := hit.(map[string]interface{}) @@ -183,13 +198,17 @@ func (es *Client) GetLedgerSeqsInRange(min, max int) (seqs []int) { // IndexWithRetries performs a bulk insert into ES cluster with retries on failures func (es *Client) IndexWithRetries(payload *bytes.Buffer, retryCount int) { - isIndexed := es.BulkInsert(payload) + err := es.BulkInsert(payload) + + if err != nil { + log.Println(err) - if !isIndexed { if retryCount-1 == 0 { log.Fatal("Retries for bulk failed, aborting") } + log.Println("Failed, retrying...") + delay := time.Duration((rand.Intn(10) + 5)) time.Sleep(delay * time.Second) diff --git a/es/main.go b/es/main.go index 7e7c43e..19f8a46 100644 --- a/es/main.go +++ b/es/main.go @@ -15,14 +15,14 @@ type Indexable interface { // Adapter represents the ledger storage backend type Adapter interface { - MinMaxSeq() (min, max int) + MinMaxSeq() (min, max int, empty bool) LedgerSeqRangeQuery(ranges []map[string]interface{}) map[string]interface{} GetLedgerSeqsInRange(min, max int) []int LedgerCountInRange(min, max int) int IndexExists(name IndexName) bool CreateIndex(name IndexName, body IndexDefinition) DeleteIndex(name IndexName) - BulkInsert(payload *bytes.Buffer) (success bool) + BulkInsert(payload *bytes.Buffer) error IndexWithRetries(payload *bytes.Buffer, retriesCount int) } diff --git a/main.go b/main.go index d3e5700..46d112e 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,8 @@ import ( "github.com/astroband/astrologer/db" "github.com/astroband/astrologer/es" "gopkg.in/alecthomas/kingpin.v2" + "log" + "time" ) func main() { @@ -28,7 +30,7 @@ func main() { config := cmd.ExportCommandConfig{ Start: cfg.Start, Count: *cfg.Count, - DryRun: *cfg.ExportDryRun, + DryRun: *cfg.DryRun, RetryCount: *cfg.Retries, BatchSize: *cfg.BatchSize, } @@ -38,7 +40,20 @@ func main() { command = &cmd.IngestCommand{ES: esClient, DB: dbClient} case "es-stats": command = &cmd.EsStatsCommand{ES: esClient} + case "fill-gaps": + dbClient := db.Connect(*cfg.DatabaseUrl) + config := &cmd.FillGapsCommandConfig{ + DryRun: *cfg.DryRun, + Start: cfg.FillGapsFrom, + Count: cfg.FillGapsCount, + BatchSize: cfg.FillGapsBatchSize, + } + command = &cmd.FillGapsCommand{ES: esClient, DB: dbClient, Config: config} } + start := time.Now() command.Execute() + elapsed := time.Since(start) + + log.Printf("Command took %s", elapsed) } diff --git a/support/main.go b/support/main.go new file mode 100644 index 0000000..9711e78 --- /dev/null +++ b/support/main.go @@ -0,0 +1,85 @@ +package support + +import ( + "fmt" +) + +func Difference(a, b []int) (diff []int) { + m := make(map[int]bool) + + for _, item := range b { + m[item] = true + } + + for _, item := range a { + if _, ok := m[item]; !ok { + diff = append(diff, item) + } + } + return +} + +func Unique(arr []int) []int { + keys := make(map[int]bool) + res := []int{} + for _, entry := range arr { + if _, value := keys[entry]; !value { + keys[entry] = true + res = append(res, entry) + } + } + return res +} + +//Creates (from, to) range slice +// e.g. for 4, 7 returns [5, 6] +func MakeRangeGtLt(gt, lt int) []int { + a := make([]int, lt-gt-1) + for i := range a { + a[i] = gt + 1 + i + } + return a +} + +//Creates (from, to] range slice +// e.g. for 4, 7 returns [5, 6, 7] +func MakeRangeGtLte(gt, lte int) []int { + a := make([]int, lte-gt) + for i := range a { + a[i] = gt + 1 + i + } + return a +} + +//Creates [from, to) range slice +// e.g. for 4, 7 returns [4, 5, 6] +func MakeRangeGteLt(gte, lt int) []int { + a := make([]int, lt-gte) + for i := range a { + a[i] = gte + i + } + return a +} + +//Creates [from, to] range slice +// e.g. for 4, 7 returns [4, 5, 6, 7] +func MakeRangeGteLte(gte, lte int) []int { + a := make([]int, lte-gte+1) + for i := range a { + a[i] = gte + i + } + return a +} + +func ByteCountBinary(b int) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) +}