Skip to content

Commit

Permalink
feat: Support explict modes for read fallback and caching
Browse files Browse the repository at this point in the history
  • Loading branch information
ethenotethan committed Aug 25, 2024
1 parent 1838c6e commit 9435cd2
Show file tree
Hide file tree
Showing 11 changed files with 459 additions and 126 deletions.
8 changes: 5 additions & 3 deletions e2e/optimism_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func TestOptimismKeccak256Commitment(gt *testing.T) {
gt.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

proxyTS, shutDown := e2e.CreateTestSuite(gt, useMemory(), true)
testCfg := e2e.TestConfig(useMemory())
testCfg.UseKeccak256ModeS3 = true
proxyTS, shutDown := e2e.CreateTestSuite(gt, testCfg)
defer shutDown()

t := actions.NewDefaultTesting(gt)
Expand Down Expand Up @@ -174,7 +176,7 @@ func TestOptimismAltDACommitment(gt *testing.T) {
gt.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

proxyTS, shutDown := e2e.CreateTestSuite(gt, useMemory(), false)
proxyTS, shutDown := e2e.CreateTestSuite(gt, e2e.TestConfig(useMemory()))
defer shutDown()

t := actions.NewDefaultTesting(gt)
Expand Down Expand Up @@ -218,7 +220,7 @@ func TestOptimismAltDACommitment(gt *testing.T) {
// assert that EigenDA proxy's was written and read from

if useMemory() {
stat := proxyTS.Server.GetMemStats()
stat := proxyTS.Server.GetEigenDAStats()
require.Equal(t, 1, stat.Entries)
require.Equal(t, 1, stat.Reads)
}
Expand Down
62 changes: 55 additions & 7 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ func TestOptimismClientWithS3Backend(t *testing.T) {

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), true)
testCfg := e2e.TestConfig(useMemory())
testCfg.UseKeccak256ModeS3 = true

ts, kill := e2e.CreateTestSuite(t, testCfg)
defer kill()

daClient := op_plasma.NewDAClient(ts.Address(), false, true)
Expand All @@ -36,16 +39,19 @@ func TestOptimismClientWithS3Backend(t *testing.T) {
require.Equal(t, testPreimage, preimage)
}

/*
this test asserts that the data can be posted/read to EigenDA
with a concurrent S3 backend configured
*/
func TestOptimismClientWithEigenDABackend(t *testing.T) {
// this test asserts that the data can be posted/read to EigenDA with a concurrent S3 backend configured

if !runIntegrationTests && !runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), true)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

daClient := op_plasma.NewDAClient(ts.Address(), false, false)
Expand All @@ -69,7 +75,7 @@ func TestProxyClient(t *testing.T) {

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), false)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

cfg := &client.Config{
Expand All @@ -96,14 +102,14 @@ func TestProxyClientWithLargeBlob(t *testing.T) {

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), false)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)
// 2MB blob
// 4MB blob
testPreimage := []byte(e2e.RandString(4_000_000))

t.Log("Setting input data on proxy server...")
Expand All @@ -123,7 +129,7 @@ func TestProxyClientWithOversizedBlob(t *testing.T) {

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), false)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

cfg := &client.Config{
Expand All @@ -150,3 +156,45 @@ func TestProxyClientWithOversizedBlob(t *testing.T) {
require.True(t, oversizedError)

}

func TestProxyClientWithCaching(t *testing.T) {
if !runIntegrationTests && !runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

t.Parallel()

testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Caching = true

ts, kill := e2e.CreateTestSuite(t, testCfg)
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)
// 1mb blob
testPreimage := []byte(e2e.RandString(1_0000))

t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, testPreimage)
require.NotEmpty(t, blobInfo)
require.NoError(t, err)

t.Log("Getting input data from proxy server...")
preimage, err := daClient.GetData(ts.Ctx, blobInfo)
require.NoError(t, err)
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
s3Stats := ts.Server.GetS3Stats()
require.Equal(t, 1, s3Stats.Reads)
require.Equal(t, 1, s3Stats.Entries)

if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 0, memStats.Reads)
require.Equal(t, 1, memStats.Entries)
}
}
56 changes: 48 additions & 8 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,45 @@ const (
holeskyDA = "disperser-holesky.eigenda.xyz:443"
)

type Cfg struct {
UseMemory bool
UseKeccak256ModeS3 bool
UseS3Caching bool
UseS3Fallback bool
}

func TestConfig(useMemory bool) *Cfg {
return &Cfg{
UseMemory: useMemory,
UseKeccak256ModeS3: false,
UseS3Caching: false,
UseS3Fallback: false,
}
}

type TestSuite struct {
Ctx context.Context
Log log.Logger
Server *server.Server
}

func CreateTestSuite(t *testing.T, useMemory bool, useS3 bool) (TestSuite, func()) {
func CreateTestSuite(t *testing.T, testCfg *Cfg) (TestSuite, func()) {
ctx := context.Background()

// load signer key from environment
pk := os.Getenv(privateKey)
if pk == "" && !useMemory {
if pk == "" && !testCfg.UseMemory {
t.Fatal("SIGNER_PRIVATE_KEY environment variable not set")
}

// load node url from environment
ethRPC := os.Getenv(ethRPC)
if ethRPC == "" && !useMemory {
if ethRPC == "" && !testCfg.UseMemory {
t.Fatal("ETHEREUM_RPC environment variable is not set")
}

var pollInterval time.Duration
if useMemory {
if testCfg.UseMemory {
pollInterval = time.Second * 1
} else {
pollInterval = time.Minute * 1
Expand All @@ -80,17 +96,19 @@ func CreateTestSuite(t *testing.T, useMemory bool, useS3 bool) (TestSuite, func(
MaxBlobLength: "16mib",
G2PowerOfTauPath: "../resources/g2.point.powerOf2",
PutBlobEncodingVersion: 0x00,
MemstoreEnabled: useMemory,
MemstoreEnabled: testCfg.UseMemory,
MemstoreBlobExpiration: 14 * 24 * time.Hour,
EthConfirmationDepth: 0,
}

if useMemory {
if testCfg.UseMemory {
eigendaCfg.ClientConfig.SignerPrivateKeyHex = "0000000000000000000100000000000000000000000000000000000000000000"
}

var cfg server.CLIConfig
if useS3 {

switch {
case testCfg.UseKeccak256ModeS3:
// generate random string
bucketName := "eigenda-proxy-test-" + RandString(10)
createS3Bucket(bucketName)
Expand All @@ -108,12 +126,34 @@ func CreateTestSuite(t *testing.T, useMemory bool, useS3 bool) (TestSuite, func(
Backup: false,
},
}
} else {

case testCfg.UseS3Caching:
bucketName := "eigenda-proxy-test-" + RandString(10)
createS3Bucket(bucketName)

eigendaCfg.CacheTargets = []string{"S3"}

cfg = server.CLIConfig{
EigenDAConfig: eigendaCfg,
S3Config: store.S3Config{
Profiling: true,
Bucket: bucketName,
Path: "",
Endpoint: "localhost:4566",
AccessKeySecret: "minioadmin",
AccessKeyID: "minioadmin",
S3CredentialType: store.S3CredentialStatic,
Backup: false,
},
}

default:
cfg = server.CLIConfig{
EigenDAConfig: eigendaCfg,
MetricsCfg: opmetrics.CLIConfig{},
}
}

store, err := server.LoadStoreRouter(
ctx,
cfg,
Expand Down
46 changes: 43 additions & 3 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

const (
// eigenda client flags
EigenDADisperserRPCFlagName = "eigenda-disperser-rpc"
EthRPCFlagName = "eigenda-eth-rpc"
SvcManagerAddrFlagName = "eigenda-svc-manager-addr"
Expand All @@ -27,17 +28,18 @@ const (
SignerPrivateKeyHexFlagName = "eigenda-signer-private-key-hex"
PutBlobEncodingVersionFlagName = "eigenda-put-blob-encoding-version"
DisablePointVerificationModeFlagName = "eigenda-disable-point-verification-mode"
// Kzg flags

// kzg flags
G1PathFlagName = "eigenda-g1-path"
G2TauFlagName = "eigenda-g2-tau-path"
CachePathFlagName = "eigenda-cache-path"
MaxBlobLengthFlagName = "eigenda-max-blob-length"

// Memstore flags
// memstore flags
MemstoreFlagName = "memstore.enabled"
MemstoreExpirationFlagName = "memstore.expiration"

// S3 flags
// S3 client flags
S3CredentialTypeFlagName = "s3.credential-type" // #nosec G101
S3BucketFlagName = "s3.bucket" // #nosec G101
S3PathFlagName = "s3.path"
Expand All @@ -46,6 +48,10 @@ const (
S3AccessKeySecretFlagName = "s3.access-key-secret" // #nosec G101
S3BackupFlagName = "s3.backup"
S3TimeoutFlagName = "s3.timeout"

// routing flags
FallbackTargets = "routing.fallback-targets"
CacheTargets = "routing.cache-targets"
)

const BytesPerSymbol = 31
Expand Down Expand Up @@ -81,6 +87,10 @@ type Config struct {
// Memstore
MemstoreEnabled bool
MemstoreBlobExpiration time.Duration

// routing
FallbackTargets []string
CacheTargets []string
}

func (cfg *Config) GetMaxBlobLength() (uint64, error) {
Expand Down Expand Up @@ -164,6 +174,8 @@ func ReadConfig(ctx *cli.Context) Config {
EthConfirmationDepth: ctx.Int64(EthConfirmationDepthFlagName),
MemstoreEnabled: ctx.Bool(MemstoreFlagName),
MemstoreBlobExpiration: ctx.Duration(MemstoreExpirationFlagName),
FallbackTargets: ctx.StringSlice(FallbackTargets),
CacheTargets: ctx.StringSlice(CacheTargets),
}
cfg.ClientConfig.WaitForFinalization = (cfg.EthConfirmationDepth < 0)

Expand Down Expand Up @@ -217,6 +229,22 @@ func (cfg *Config) Check() error {
return fmt.Errorf("eigenda disperser rpc url is not set")
}

if len(cfg.FallbackTargets) > 0 {
for _, t := range cfg.FallbackTargets {
if store.StringToBackend(t) == store.Unknown {
return fmt.Errorf("unknown fallback target provided: %s", t)
}
}
}

if len(cfg.CacheTargets) > 0 {
for _, t := range cfg.CacheTargets {
if store.StringToBackend(t) == store.Unknown {
return fmt.Errorf("unknown cache target provided: %s", t)
}
}
}

return nil
}

Expand Down Expand Up @@ -377,6 +405,18 @@ func CLIFlags() []cli.Flag {
Value: 25 * time.Minute,
EnvVars: []string{"MEMSTORE_EXPIRATION"},
},
&cli.StringSliceFlag{
Name: FallbackTargets,
Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.",
Value: cli.NewStringSlice(),
EnvVars: prefixEnvVars("FALLBACK_TARGETS"),
},
&cli.StringSliceFlag{
Name: CacheTargets,
Usage: "List of caching targets to use fast reads from EigenDA.",
Value: cli.NewStringSlice(),
EnvVars: prefixEnvVars("CACHE_TARGETS"),
},
}

flags = append(flags, s3Flags()...)
Expand Down
Loading

0 comments on commit 9435cd2

Please sign in to comment.