diff --git a/README.md b/README.md index 232a44a8..c4b798b6 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ In order to disperse to the EigenDA network in production, or at high throughput - [Metrics](#metrics) - [Flags](#flags) - [Resources](#resources) - + ## Deployment Guide @@ -170,6 +170,8 @@ In the event that the EigenDA disperser or network is down, the proxy will retur This behavior is turned on by default, but configurable via the `--eigenda.confirmation-timeout` flag (set to 15 mins by default currently). If a blob is not confirmed within this time, the proxy will return a 503 status code. This should be set long enough to accomodate for the disperser's batching interval (typically 10 minutes), signature gathering, and onchain submission. +This behavior can be modified to write to a secondary storage target when EigenDA write fails by setting the `--store.enable-write-on-eigenda-failure` flag to `true`. This flag only works with OP Stack-based rollups. + ## Blob Lifecycle > Warning: the below diagrams describe EigenDA V2 interactions. EigenDA V1 is very similar, but has slight discrepancies. diff --git a/e2e/server_test.go b/e2e/server_test.go index 10ccf135..4070773b 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -267,3 +267,79 @@ func TestProxyReadFallback(t *testing.T) { requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType) requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.Standard) } + +// Ensures that when EigenDA write fails and enable-write-on-eigenda-failure is true, +// the data is not written to the secondary storage and the write fails. +func TestProxyRedundantWriteOnEigenDAFailure(t *testing.T) { + if !runIntegrationTests || runTestnetIntegrationTests { + t.Skip("Skipping test as INTEGRATION env var not set") + } + + t.Parallel() + + // Setup server with S3 as secondary and simulate EigenDA failure + testCfg := e2e.TestConfig(useMemory()) + testCfg.UseS3Fallback = true + testCfg.UseWriteFallback = true + testCfg.SimulateEigenDAFailure = true + + tsConfig := e2e.TestSuiteConfig(testCfg) + ts, kill := e2e.CreateTestSuite(tsConfig) + defer kill() + + cfg := &client.Config{ + URL: ts.Address(), + } + daClient := client.New(cfg) + + // Write data when EigenDA is "failing" + expectedBlob := e2e.RandBytes(1_000_000) + t.Log("Setting input data on proxy server...") + blobInfo, err := daClient.SetData(ts.Ctx, expectedBlob) + require.NoError(t, err) + + // Try to read data - should succeed because it was written to secondary + t.Log("Getting input data from proxy server...") + actualBlob, err := daClient.GetData(ts.Ctx, blobInfo) + require.NoError(t, err) + require.Equal(t, expectedBlob, actualBlob) + + // Verify metrics show secondary write and read + requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType) +} + +// Ensures that when EigenDA write fails and enable-write-on-eigenda-failure is false, +// the data is not written to the secondary storage. +func TestProxyRedundantWriteDisabled(t *testing.T) { + if !runIntegrationTests || runTestnetIntegrationTests { + t.Skip("Skipping test as INTEGRATION env var not set") + } + + t.Parallel() + + // Setup server with S3 as secondary and simulate EigenDA failure + testCfg := e2e.TestConfig(useMemory()) + testCfg.UseS3Fallback = true + testCfg.UseWriteFallback = false + testCfg.SimulateEigenDAFailure = true + + tsConfig := e2e.TestSuiteConfig(testCfg) + ts, kill := e2e.CreateTestSuite(tsConfig) + defer kill() + + cfg := &client.Config{ + URL: ts.Address(), + } + daClient := client.New(cfg) + + // Write data when EigenDA is "failing" + expectedBlob := e2e.RandBytes(1_000_000) + t.Log("Setting input data on proxy server...") + blobInfo, err := daClient.SetData(ts.Ctx, expectedBlob) + require.Error(t, err) + + // Try to read data - should fail because it was not written to secondary + t.Log("Getting input data from proxy server...") + _, err = daClient.GetData(ts.Ctx, blobInfo) + require.Error(t, err) +} diff --git a/e2e/setup.go b/e2e/setup.go index 7ddd5041..beb1f0fc 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -113,17 +113,23 @@ type Cfg struct { UseS3Caching bool UseRedisCaching bool UseS3Fallback bool + // enable writing to secondary storage on EigenDA failure + UseWriteFallback bool + // simulate EigenDA failure + SimulateEigenDAFailure bool } func TestConfig(useMemory bool) *Cfg { return &Cfg{ - UseMemory: useMemory, - Expiration: 14 * 24 * time.Hour, - UseKeccak256ModeS3: false, - UseS3Caching: false, - UseRedisCaching: false, - UseS3Fallback: false, - WriteThreadCount: 0, + UseMemory: useMemory, + Expiration: 14 * 24 * time.Hour, + UseKeccak256ModeS3: false, + UseS3Caching: false, + UseRedisCaching: false, + UseS3Fallback: false, + WriteThreadCount: 0, + UseWriteFallback: false, + SimulateEigenDAFailure: false, } } @@ -210,10 +216,11 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig { }, MemstoreEnabled: testCfg.UseMemory, MemstoreConfig: memstore.Config{ - BlobExpiration: testCfg.Expiration, - MaxBlobSizeBytes: maxBlobLengthBytes, + BlobExpiration: testCfg.Expiration, + MaxBlobSizeBytes: maxBlobLengthBytes, + SimulateEigenDAFailure: testCfg.SimulateEigenDAFailure, }, - + UseWriteFallback: testCfg.UseWriteFallback, StorageConfig: store.Config{ AsyncPutWorkers: testCfg.WriteThreadCount, }, diff --git a/flags/eigendaflags/cli.go b/flags/eigendaflags/cli.go index c11cd2c9..76d5f200 100644 --- a/flags/eigendaflags/cli.go +++ b/flags/eigendaflags/cli.go @@ -29,7 +29,8 @@ var ( EthRPCURLFlagName = withFlagPrefix("eth-rpc") SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr") // Flags that are proxy specific, and not used by the eigenda-client - PutRetriesFlagName = withFlagPrefix("put-retries") + PutRetriesFlagName = withFlagPrefix("put-retries") + EnableWriteFallbackFlagName = withFlagPrefix("enable-write-on-eigenda-failure") ) func withFlagPrefix(s string) string { @@ -163,6 +164,14 @@ func CLIFlags(envPrefix, category string) []cli.Flag { EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETRIES")}, Category: category, }, + &cli.BoolFlag{ + Name: EnableWriteFallbackFlagName, + Usage: "Enable writing to secondary storage when EigenDA write fails. " + + "DANGER: incompatible with Nitro stack!!! Default is false.", + Value: false, + EnvVars: []string{withEnvPrefix(envPrefix, "WRITE_ON_EIGENDA_FAILURE")}, + Category: category, + }, } } diff --git a/server/config.go b/server/config.go index 7ed2e93b..6c21e3c9 100644 --- a/server/config.go +++ b/server/config.go @@ -15,11 +15,12 @@ import ( ) type Config struct { - EdaClientConfig clients.EigenDAClientConfig - MemstoreConfig memstore.Config - StorageConfig store.Config - VerifierConfig verify.Config - PutRetries uint + EdaClientConfig clients.EigenDAClientConfig + MemstoreConfig memstore.Config + StorageConfig store.Config + VerifierConfig verify.Config + PutRetries uint + UseWriteFallback bool MemstoreEnabled bool } @@ -28,12 +29,13 @@ type Config struct { func ReadConfig(ctx *cli.Context) Config { edaClientConfig := eigendaflags.ReadConfig(ctx) return Config{ - EdaClientConfig: edaClientConfig, - VerifierConfig: verify.ReadConfig(ctx, edaClientConfig), - PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName), - MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName), - MemstoreConfig: memstore.ReadConfig(ctx), - StorageConfig: store.ReadConfig(ctx), + EdaClientConfig: edaClientConfig, + VerifierConfig: verify.ReadConfig(ctx, edaClientConfig), + PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName), + UseWriteFallback: ctx.Bool(eigendaflags.EnableWriteFallbackFlagName), + MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName), + MemstoreConfig: memstore.ReadConfig(ctx), + StorageConfig: store.ReadConfig(ctx), } } diff --git a/server/load_store.go b/server/load_store.go index cb75a6ae..f9dabd9d 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -136,5 +136,5 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) - return store.NewManager(eigenDA, s3Store, log, secondary) + return store.NewManager(eigenDA, s3Store, log, secondary, cfg.EigenDAConfig.UseWriteFallback) } diff --git a/store/generated_key/memstore/memstore.go b/store/generated_key/memstore/memstore.go index dce04571..e76cdb0e 100644 --- a/store/generated_key/memstore/memstore.go +++ b/store/generated_key/memstore/memstore.go @@ -30,6 +30,8 @@ type Config struct { // artificial latency added for memstore backend to mimic eigenda's latency PutLatency time.Duration GetLatency time.Duration + // SimulateEigenDAFailure forces Put operations to fail, simulating EigenDA failures + SimulateEigenDAFailure bool } /* @@ -108,6 +110,12 @@ func (e *MemStore) pruneExpired() { func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { time.Sleep(e.config.GetLatency) e.reads++ + + // Simulate EigenDA failure if configured + if e.config.SimulateEigenDAFailure { + return nil, fmt.Errorf("simulated EigenDA failure") + } + e.RLock() defer e.RUnlock() @@ -147,6 +155,11 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { e.Lock() defer e.Unlock() + // Simulate EigenDA failure if configured + if e.config.SimulateEigenDAFailure { + return nil, fmt.Errorf("simulated EigenDA failure") + } + commitment, err := e.verifier.Commit(encodedVal) if err != nil { return nil, err diff --git a/store/manager.go b/store/manager.go index 93721d89..e277a3c9 100644 --- a/store/manager.go +++ b/store/manager.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/commitments" "github.com/Layr-Labs/eigenda-proxy/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" ) @@ -27,16 +28,18 @@ type Manager struct { // secondary storage backends (caching and fallbacks) secondary ISecondary + // redundant write flag + useWriteFallback bool } // NewManager ... Init -func NewManager(eigenda common.GeneratedKeyStore, s3 common.PrecomputedKeyStore, l log.Logger, - secondary ISecondary) (IManager, error) { +func NewManager(eigenda common.GeneratedKeyStore, s3 common.PrecomputedKeyStore, l log.Logger, secondary ISecondary, useWriteFallback bool) (IManager, error) { return &Manager{ - log: l, - eigenda: eigenda, - s3: s3, - secondary: secondary, + log: l, + eigenda: eigenda, + s3: s3, + secondary: secondary, + useWriteFallback: useWriteFallback, }, nil } @@ -124,6 +127,24 @@ func (m *Manager) Put(ctx context.Context, cm commitments.CommitmentMode, key, v } if err != nil { + log.Error("Failed to write to EigenDA backend", "err", err) + + // don't do redundant write to hide the misuse/misconfiguration of the proxy + if errors.Is(err, common.ErrProxyOversizedBlob) { + return nil, err + } + + // write to EigenDA failed, which shouldn't happen if the backend is functioning properly + // use the payload as the key to keep the op-batcher alive + if m.useWriteFallback && m.secondary.Enabled() && !m.secondary.AsyncWriteEntry() { + redundantErr := m.secondary.HandleRedundantWrites(ctx, value, value) + if redundantErr != nil { + log.Error("Failed to write to redundant backends", "err", redundantErr) + return nil, redundantErr + } + + return crypto.Keccak256(value), nil + } return nil, err } diff --git a/store/secondary.go b/store/secondary.go index 9814095c..72cc21eb 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "context" "errors" "net/http" @@ -8,8 +9,10 @@ import ( "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/metrics" + verifypackage "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/log" ) @@ -151,6 +154,14 @@ func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []by } key := crypto.Keccak256(commitment) + + // check if key is an RLP encoded certificate, if not, assume it's a cache key + var cert verifypackage.Certificate + err := rlp.DecodeBytes(commitment, &cert) + if err != nil { + key = commitment + } + for _, src := range sources { cb := sm.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodGet) data, err := src.Get(ctx, key) @@ -168,7 +179,14 @@ func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []by // verify cert:data using provided verification function sm.verifyLock.Lock() - err = verify(ctx, commitment, data) + + if bytes.Equal(key, commitment) { + err = src.Verify(ctx, commitment, data) + } else { + // verify cert:data using EigenDA verification checks + err = verify(ctx, commitment, data) + } + if err != nil { cb(Failed) log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType())