forked from OffchainLabs/nitro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfallback_storage_service.go
110 lines (100 loc) · 3.43 KB
/
fallback_storage_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2021-2022, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE
package das
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/pretty"
)
type FallbackStorageService struct {
StorageService
backup daprovider.DASReader
backupHealthChecker DataAvailabilityServiceHealthChecker
backupRetentionSeconds uint64
ignoreRetentionWriteErrors bool
preventRecursiveGets bool
currentlyFetching map[[32]byte]bool
currentlyFetchingMutex sync.RWMutex
}
// NewFallbackStorageService is a StorageService that relies on a "primary" StorageService and a "backup". Puts go to the primary.
// GetByHashes are tried first in the primary. If they aren't found in the primary, the backup is tried, and
// a successful GetByHash result from the backup is Put into the primary.
func NewFallbackStorageService(
primary StorageService,
backup daprovider.DASReader,
backupHealthChecker DataAvailabilityServiceHealthChecker,
backupRetentionSeconds uint64, // how long to retain data that we copy in from the backup (MaxUint64 means forever)
ignoreRetentionWriteErrors bool, // if true, don't return error if write of retention data to primary fails
preventRecursiveGets bool, // if true, return NotFound on simultaneous calls to Gets that miss in primary (prevents infinite recursion)
) *FallbackStorageService {
return &FallbackStorageService{
primary,
backup,
backupHealthChecker,
backupRetentionSeconds,
ignoreRetentionWriteErrors,
preventRecursiveGets,
make(map[[32]byte]bool),
sync.RWMutex{},
}
}
func (f *FallbackStorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) {
log.Trace("das.FallbackStorageService.GetByHash", "key", pretty.PrettyHash(key), "this", f)
if f.preventRecursiveGets {
f.currentlyFetchingMutex.RLock()
if f.currentlyFetching[key] {
// This is a recursive call, so return not-found
f.currentlyFetchingMutex.RUnlock()
return nil, ErrNotFound
}
f.currentlyFetchingMutex.RUnlock()
}
data, err := f.StorageService.GetByHash(ctx, key)
if err != nil {
doDelete := false
if f.preventRecursiveGets {
f.currentlyFetchingMutex.Lock()
if !f.currentlyFetching[key] {
f.currentlyFetching[key] = true
doDelete = true
}
f.currentlyFetchingMutex.Unlock()
}
log.Trace("das.FallbackStorageService.GetByHash trying fallback")
data, err = f.backup.GetByHash(ctx, key)
if doDelete {
f.currentlyFetchingMutex.Lock()
delete(f.currentlyFetching, key)
f.currentlyFetchingMutex.Unlock()
}
if err != nil {
return nil, err
}
if dastree.ValidHash(key, data) {
putErr := f.StorageService.Put(
// #nosec G115
ctx, data, arbmath.SaturatingUAdd(uint64(time.Now().Unix()), f.backupRetentionSeconds),
)
if putErr != nil && !f.ignoreRetentionWriteErrors {
return nil, err
}
}
}
return data, err
}
func (f *FallbackStorageService) String() string {
return "FallbackStorageService(stoargeService:" + f.StorageService.String() + ")"
}
func (f *FallbackStorageService) HealthCheck(ctx context.Context) error {
err := f.StorageService.HealthCheck(ctx)
if err != nil {
return err
}
return f.backupHealthChecker.HealthCheck(ctx)
}