forked from OffchainLabs/nitro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths3_storage_service.go
148 lines (127 loc) · 5.09 KB
/
s3_storage_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright 2022, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE
package das
import (
"bytes"
"context"
"fmt"
"io"
"math"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
flag "github.com/spf13/pflag"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/pretty"
)
type S3Uploader interface {
Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error)
}
type S3Downloader interface {
Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error)
}
type S3StorageServiceConfig struct {
Enable bool `koanf:"enable"`
AccessKey string `koanf:"access-key"`
Bucket string `koanf:"bucket"`
ObjectPrefix string `koanf:"object-prefix"`
Region string `koanf:"region"`
SecretKey string `koanf:"secret-key"`
DiscardAfterTimeout bool `koanf:"discard-after-timeout"`
}
var DefaultS3StorageServiceConfig = S3StorageServiceConfig{}
func S3ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultS3StorageServiceConfig.Enable, "enable storage/retrieval of sequencer batch data from an AWS S3 bucket")
f.String(prefix+".access-key", DefaultS3StorageServiceConfig.AccessKey, "S3 access key")
f.String(prefix+".bucket", DefaultS3StorageServiceConfig.Bucket, "S3 bucket")
f.String(prefix+".object-prefix", DefaultS3StorageServiceConfig.ObjectPrefix, "prefix to add to S3 objects")
f.String(prefix+".region", DefaultS3StorageServiceConfig.Region, "S3 region")
f.String(prefix+".secret-key", DefaultS3StorageServiceConfig.SecretKey, "S3 secret key")
f.Bool(prefix+".discard-after-timeout", DefaultS3StorageServiceConfig.DiscardAfterTimeout, "discard data after its expiry timeout")
}
type S3StorageService struct {
client *s3.Client
bucket string
objectPrefix string
uploader S3Uploader
downloader S3Downloader
discardAfterTimeout bool
}
func NewS3StorageService(config S3StorageServiceConfig) (StorageService, error) {
client, err := buildS3Client(config.AccessKey, config.SecretKey, config.Region)
if err != nil {
return nil, err
}
return &S3StorageService{
client: client,
bucket: config.Bucket,
objectPrefix: config.ObjectPrefix,
uploader: manager.NewUploader(client),
downloader: manager.NewDownloader(client),
discardAfterTimeout: config.DiscardAfterTimeout,
}, nil
}
func buildS3Client(accessKey, secretKey, region string) (*s3.Client, error) {
cfg, err := awsConfig.LoadDefaultConfig(context.TODO(), awsConfig.WithRegion(region), func(options *awsConfig.LoadOptions) error {
// remain backward compatible with accessKey and secretKey credentials provided via cli flags
if accessKey != "" && secretKey != "" {
options.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
}
return nil
})
if err != nil {
return nil, err
}
return s3.NewFromConfig(cfg), nil
}
func (s3s *S3StorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) {
log.Trace("das.S3StorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s3s)
buf := manager.NewWriteAtBuffer([]byte{})
_, err := s3s.downloader.Download(ctx, buf, &s3.GetObjectInput{
Bucket: aws.String(s3s.bucket),
Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(key)),
})
return buf.Bytes(), err
}
func (s3s *S3StorageService) Put(ctx context.Context, value []byte, timeout uint64) error {
logPut("das.S3StorageService.Store", value, timeout, s3s)
putObjectInput := s3.PutObjectInput{
Bucket: aws.String(s3s.bucket),
Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(dastree.Hash(value))),
Body: bytes.NewReader(value)}
if s3s.discardAfterTimeout && timeout <= math.MaxInt64 {
// #nosec G115
expires := time.Unix(int64(timeout), 0)
putObjectInput.Expires = &expires
}
_, err := s3s.uploader.Upload(ctx, &putObjectInput)
if err != nil {
log.Error("das.S3StorageService.Store", "err", err)
}
return err
}
func (s3s *S3StorageService) Sync(ctx context.Context) error {
return nil
}
func (s3s *S3StorageService) Close(ctx context.Context) error {
return nil
}
func (s3s *S3StorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) {
if s3s.discardAfterTimeout {
return daprovider.DiscardAfterDataTimeout, nil
}
return daprovider.KeepForever, nil
}
func (s3s *S3StorageService) String() string {
return fmt.Sprintf("S3StorageService(:%s)", s3s.bucket)
}
func (s3s *S3StorageService) HealthCheck(ctx context.Context) error {
_, err := s3s.client.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(s3s.bucket)})
return err
}