Skip to content

Commit

Permalink
refactor: break down blobindexer.go indexBlobsAtSlot & indexBlobsInBa…
Browse files Browse the repository at this point in the history
…tches logic for improved redability
  • Loading branch information
Monika-Bitfly committed Feb 7, 2025
1 parent 4a229d2 commit ed92d0e
Showing 1 changed file with 156 additions and 111 deletions.
267 changes: 156 additions & 111 deletions backend/pkg/blobindexer/blobindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -180,70 +181,7 @@ func (bi *BlobIndexer) indexBlobsAtSlot(slot uint64) (int, error) {
return 0, nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(4)
for _, d := range blobSidecar.Data {
d := d
versionedBlobHash := fmt.Sprintf("%#x", utils.VersionedBlobHash(d.KzgCommitment).Bytes())
key := fmt.Sprintf("%s/blobs/%s", bi.networkID, versionedBlobHash)

if bi.writtenBlobsCache.Contains(key) {
continue
}

g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}

if enableCheckingBeforePutting {
tS3HeadObj := time.Now()
_, err = bi.S3Client.HeadObject(gCtx, &s3.HeadObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
})
metrics.TaskDuration.WithLabelValues("blobindexer_check_blob").Observe(time.Since(tS3HeadObj).Seconds())
if err != nil {
// Only put the object if it does not exist yet
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == http.StatusNotFound || httpResponseErr.HTTPStatusCode() == 403) {
return nil
}
return fmt.Errorf("error getting headObject: %s (%v/%v): %w", key, d.SignedBlockHeader.Message.Slot, d.Index, err)
}
}

tS3PutObj := time.Now()
_, putErr := bi.S3Client.PutObject(gCtx, &s3.PutObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
Body: bytes.NewReader(d.Blob),
Metadata: map[string]string{
"blob_index": fmt.Sprintf("%d", d.Index),
"block_slot": fmt.Sprintf("%d", d.SignedBlockHeader.Message.Slot),
"block_proposer": fmt.Sprintf("%d", d.SignedBlockHeader.Message.ProposerIndex),
"block_state_root": d.SignedBlockHeader.Message.StateRoot.String(),
"block_parent_root": d.SignedBlockHeader.Message.ParentRoot.String(),
"block_body_root": d.SignedBlockHeader.Message.BodyRoot.String(),
"kzg_commitment": d.KzgCommitment.String(),
"kzg_proof": d.KzgProof.String(),
},
})
metrics.TaskDuration.WithLabelValues("blobindexer_put_blob").Observe(time.Since(tS3PutObj).Seconds())
if putErr != nil {
return fmt.Errorf("error putting object: %s (%v/%v): %w", key, d.SignedBlockHeader.Message.Slot, d.Index, putErr)
}
bi.writtenBlobsCache.Add(key, true)

return nil
})
}
err = g.Wait()
err = bi.processBlobs(blobSidecar.Data)
if err != nil {
return len(blobSidecar.Data), fmt.Errorf("error indexing blobs at slot %v: %w", slot, err)
}
Expand All @@ -258,22 +196,17 @@ func (bi *BlobIndexer) GetIndexerStatus() (*BlobIndexerStatus, error) {
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

key := fmt.Sprintf("%s/blob-indexer-status.json", bi.networkID)
obj, err := bi.S3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
})
if err != nil {
// If the object that you request doesn’t exist, the error that Amazon S3 returns depends on whether you also have the s3:ListBucket permission. If you have the s3:ListBucket permission on the bucket, Amazon S3 returns an HTTP status code 404 (Not Found) error. If you don’t have the s3:ListBucket permission, Amazon S3 returns an HTTP status code 403 ("access denied") error.
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == 404 || httpResponseErr.HTTPStatusCode() == 403) {
return &BlobIndexerStatus{}, nil
}
return nil, err
return handleGetObjectError(err)
}
status := &BlobIndexerStatus{}
err = json.NewDecoder(obj.Body).Decode(status)
return status, err

return decodeBlobIndexerStatus(obj.Body)
}

func (bi *BlobIndexer) putIndexerStatus(status BlobIndexerStatus) error {
Expand All @@ -283,6 +216,7 @@ func (bi *BlobIndexer) putIndexerStatus(status BlobIndexerStatus) error {
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

key := fmt.Sprintf("%s/blob-indexer-status.json", bi.networkID)
contentType := "application/json"
body, err := json.Marshal(&status)
Expand All @@ -308,6 +242,96 @@ func (bi *BlobIndexer) putIndexerStatus(status BlobIndexerStatus) error {
return nil
}

func handleGetObjectError(err error) (*BlobIndexerStatus, error) {
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == http.StatusNotFound || httpResponseErr.HTTPStatusCode() == http.StatusForbidden) {
return &BlobIndexerStatus{}, nil
}
return nil, err
}

func decodeBlobIndexerStatus(body io.ReadCloser) (*BlobIndexerStatus, error) {
status := &BlobIndexerStatus{}
err := json.NewDecoder(body).Decode(status)
return status, err
}

func (bi *BlobIndexer) processBlobs(blobs []constypes.BlobSidecarsData) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(4)

for _, d := range blobs {
d := d
g.Go(func() error {
return bi.processBlob(gCtx, d)
})
}

return g.Wait()
}

func (bi *BlobIndexer) processBlob(ctx context.Context, blob constypes.BlobSidecarsData) error {
versionedBlobHash := fmt.Sprintf("%#x", utils.VersionedBlobHash(blob.KzgCommitment).Bytes())
key := fmt.Sprintf("%s/blobs/%s", bi.networkID, versionedBlobHash)

if bi.writtenBlobsCache.Contains(key) {
return nil
}

if enableCheckingBeforePutting {
if err := bi.checkBlobExists(ctx, key, blob); err != nil {
return err
}
}

return bi.putBlob(ctx, key, blob)
}

func (bi *BlobIndexer) checkBlobExists(ctx context.Context, key string, blob constypes.BlobSidecarsData) error {
tS3HeadObj := time.Now()
_, err := bi.S3Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
})
metrics.TaskDuration.WithLabelValues("blobindexer_check_blob").Observe(time.Since(tS3HeadObj).Seconds())
if err != nil {
var httpResponseErr *awshttp.ResponseError
if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == http.StatusNotFound || httpResponseErr.HTTPStatusCode() == 403) {
return nil
}
return fmt.Errorf("error getting headObject: %s (%v/%v): %w", key, blob.SignedBlockHeader.Message.Slot, blob.Index, err)
}
return nil
}

func (bi *BlobIndexer) putBlob(ctx context.Context, key string, blob constypes.BlobSidecarsData) error {
tS3PutObj := time.Now()
_, err := bi.S3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &utils.Config.BlobIndexer.S3.Bucket,
Key: &key,
Body: bytes.NewReader(blob.Blob),
Metadata: map[string]string{
"blob_index": fmt.Sprintf("%d", blob.Index),
"block_slot": fmt.Sprintf("%d", blob.SignedBlockHeader.Message.Slot),
"block_proposer": fmt.Sprintf("%d", blob.SignedBlockHeader.Message.ProposerIndex),
"block_state_root": blob.SignedBlockHeader.Message.StateRoot.String(),
"block_parent_root": blob.SignedBlockHeader.Message.ParentRoot.String(),
"block_body_root": blob.SignedBlockHeader.Message.BodyRoot.String(),
"kzg_commitment": blob.KzgCommitment.String(),
"kzg_proof": blob.KzgProof.String(),
},
})
metrics.TaskDuration.WithLabelValues("blobindexer_put_blob").Observe(time.Since(tS3PutObj).Seconds())
if err != nil {
return fmt.Errorf("error putting object: %s (%v/%v): %w", key, blob.SignedBlockHeader.Message.Slot, blob.Index, err)
}
bi.writtenBlobsCache.Add(key, true)
return nil
}

func (bi *BlobIndexer) fetchNodeData(ctx context.Context) (*constypes.StandardBeaconHeaderResponse, *constypes.StandardBeaconHeaderResponse, *constypes.StandardSpecResponse, error) {
headHeader := &constypes.StandardBeaconHeaderResponse{}
finalizedHeader := &constypes.StandardBeaconHeaderResponse{}
Expand Down Expand Up @@ -405,47 +429,12 @@ func (bi *BlobIndexer) indexBlobsInBatches(status *BlobIndexerStatus, headHeader
batchSize := uint64(100)
for batchStart := startSlot; batchStart <= headHeader.Data.Header.Message.Slot; batchStart += batchSize {
batchStartTs := time.Now()
batchBlobsIndexed := atomic.NewInt64(0)
batchEnd := batchStart + batchSize
if batchEnd > headHeader.Data.Header.Message.Slot {
batchEnd = headHeader.Data.Header.Message.Slot
}
g, gCtx := errgroup.WithContext(context.Background())
g.SetLimit(4)
for slot := batchStart; slot <= batchEnd; slot++ {
slot := slot
g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}
numBlobs, err := bi.indexBlobsAtSlot(slot)
if err != nil {
return fmt.Errorf("error bi.IndexBlobsAtSlot(%v): %w", slot, err)
}
if numBlobs > 0 && slot <= finalizedHeader.Data.Header.Message.Slot && slot > lastIndexedFinalizedBlobSlot.Load() {
lastIndexedFinalizedBlobSlot.Store(slot)
}
batchBlobsIndexed.Add(int64(numBlobs))
return nil
})
}
err := g.Wait()
batchEnd := calculateBatchEnd(batchStart, batchSize, headHeader.Data.Header.Message.Slot)
batchBlobsIndexed, err := bi.processBatch(batchStart, batchEnd, finalizedHeader, lastIndexedFinalizedBlobSlot)
if err != nil {
return err
}
lastIndexedFinalizedSlot := getLastIndexedFinalizedSlot(batchEnd, finalizedHeader)
newBlobIndexerStatus := BlobIndexerStatus{
LastIndexedFinalizedSlot: lastIndexedFinalizedSlot,
LastIndexedFinalizedBlobSlot: lastIndexedFinalizedBlobSlot.Load(),
CurrentBlobIndexerId: bi.id,
LastUpdate: time.Now(),
BlobIndexerVersion: version.Version,
}
if status.LastIndexedFinalizedBlobSlot > newBlobIndexerStatus.LastIndexedFinalizedBlobSlot {
newBlobIndexerStatus.LastIndexedFinalizedBlobSlot = status.LastIndexedFinalizedBlobSlot
}
newBlobIndexerStatus := updateIndexerStatus(batchEnd, finalizedHeader, lastIndexedFinalizedBlobSlot, status, bi)
err = bi.putIndexerStatus(newBlobIndexerStatus)
if err != nil {
return fmt.Errorf("error updating indexer status at slot %v: %w", batchEnd, err)
Expand All @@ -471,6 +460,62 @@ func (bi *BlobIndexer) indexBlobsInBatches(status *BlobIndexerStatus, headHeader
return nil
}

func (bi *BlobIndexer) processBatch(batchStart, batchEnd uint64, finalizedHeader *constypes.StandardBeaconHeaderResponse, lastIndexedFinalizedBlobSlot *atomic.Uint64) (*atomic.Int64, error) {
batchBlobsIndexed := atomic.NewInt64(0)

g, gCtx := errgroup.WithContext(context.Background())
g.SetLimit(4)

for slot := batchStart; slot <= batchEnd; slot++ {
slot := slot
g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}
numBlobs, err := bi.indexBlobsAtSlot(slot)
if err != nil {
return fmt.Errorf("error bi.IndexBlobsAtSlot(%v): %w", slot, err)
}
if numBlobs > 0 && slot <= finalizedHeader.Data.Header.Message.Slot && slot > lastIndexedFinalizedBlobSlot.Load() {
lastIndexedFinalizedBlobSlot.Store(slot)
}
batchBlobsIndexed.Add(int64(numBlobs))
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

return batchBlobsIndexed, nil
}

func updateIndexerStatus(batchEnd uint64, finalizedHeader *constypes.StandardBeaconHeaderResponse, lastIndexedFinalizedBlobSlot *atomic.Uint64, status *BlobIndexerStatus, bi *BlobIndexer) BlobIndexerStatus {
lastIndexedFinalizedSlot := getLastIndexedFinalizedSlot(batchEnd, finalizedHeader)
newBlobIndexerStatus := BlobIndexerStatus{
LastIndexedFinalizedSlot: lastIndexedFinalizedSlot,
LastIndexedFinalizedBlobSlot: lastIndexedFinalizedBlobSlot.Load(),
CurrentBlobIndexerId: bi.id,
LastUpdate: time.Now(),
BlobIndexerVersion: version.Version,
}
if status.LastIndexedFinalizedBlobSlot > newBlobIndexerStatus.LastIndexedFinalizedBlobSlot {
newBlobIndexerStatus.LastIndexedFinalizedBlobSlot = status.LastIndexedFinalizedBlobSlot
}
return newBlobIndexerStatus
}

func calculateBatchEnd(batchStart, batchSize, headSlot uint64) uint64 {
batchEnd := batchStart + batchSize
if batchEnd > headSlot {
batchEnd = headSlot
}
return batchEnd
}

func calculateMinBlobSlot(spec *constypes.StandardSpecResponse, headHeader *constypes.StandardBeaconHeaderResponse) uint64 {
minBlobSlotRange := *spec.Data.MinEpochsForBlobSidecarsRequests * uint64(spec.Data.SlotsPerEpoch)
minBlobSlot := uint64(0)
Expand Down

0 comments on commit ed92d0e

Please sign in to comment.