Skip to content

Commit 115b041

Browse files
authored
Merge pull request #6469 from yyforyongyu/5388-migration
channeldb: add optional migration to prune revocation logs
2 parents e86a69b + d391514 commit 115b041

File tree

20 files changed

+4181
-18
lines changed

20 files changed

+4181
-18
lines changed

channeldb/db.go

Lines changed: 112 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/lightningnetwork/lnd/channeldb/migration26"
2424
"github.com/lightningnetwork/lnd/channeldb/migration27"
2525
"github.com/lightningnetwork/lnd/channeldb/migration29"
26+
"github.com/lightningnetwork/lnd/channeldb/migration30"
2627
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
2728
"github.com/lightningnetwork/lnd/clock"
2829
"github.com/lightningnetwork/lnd/kvdb"
@@ -45,17 +46,34 @@ var (
4546
// up-to-date version of the database.
4647
type migration func(tx kvdb.RwTx) error
4748

48-
type version struct {
49+
// mandatoryVersion defines a db version that must be applied before the lnd
50+
// starts.
51+
type mandatoryVersion struct {
4952
number uint32
5053
migration migration
5154
}
5255

56+
// optionalMigration defines an optional migration function. When a migration
57+
// is optional, it usually involves a large scale of changes that might touch
58+
// millions of keys. Due to OOM concern, the update cannot be safely done
59+
// within one db transaction. Thus, for optional migrations, they must take the
60+
// db backend and construct transactions as needed.
61+
type optionalMigration func(db kvdb.Backend) error
62+
63+
// optionalVersion defines a db version that can be optionally applied. When
64+
// applying migrations, we must apply all the mandatory migrations first before
65+
// attempting optional ones.
66+
type optionalVersion struct {
67+
name string
68+
migration optionalMigration
69+
}
70+
5371
var (
54-
// dbVersions is storing all versions of database. If current version
55-
// of database don't match with latest version this list will be used
56-
// for retrieving all migration function that are need to apply to the
57-
// current db.
58-
dbVersions = []version{
72+
// dbVersions is storing all mandatory versions of database. If current
73+
// version of database don't match with latest version this list will
74+
// be used for retrieving all migration function that are need to apply
75+
// to the current db.
76+
dbVersions = []mandatoryVersion{
5977
{
6078
// The base DB version requires no migration.
6179
number: 0,
@@ -237,6 +255,19 @@ var (
237255
},
238256
}
239257

258+
// optionalVersions stores all optional migrations that are applied
259+
// after dbVersions.
260+
//
261+
// NOTE: optional migrations must be fault-tolerant and re-run already
262+
// migrated data must be noop, which means the migration must be able
263+
// to determine its state.
264+
optionalVersions = []optionalVersion{
265+
{
266+
name: "prune revocation log",
267+
migration: migration30.MigrateRevocationLog,
268+
},
269+
}
270+
240271
// Big endian is the preferred byte order, due to cursor scans over
241272
// integer keys iterating in order.
242273
byteOrder = binary.BigEndian
@@ -337,6 +368,13 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
337368
backend.Close()
338369
return nil, err
339370
}
371+
372+
// Grab the optional migration config.
373+
omc := opts.OptionalMiragtionConfig
374+
if err := chanDB.applyOptionalVersions(omc); err != nil {
375+
backend.Close()
376+
return nil, err
377+
}
340378
}
341379

342380
return chanDB, nil
@@ -1309,7 +1347,7 @@ func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
13091347
// syncVersions function is used for safe db version synchronization. It
13101348
// applies migration functions to the current database and recovers the
13111349
// previous state of db if at least one error/panic appeared during migration.
1312-
func (d *DB) syncVersions(versions []version) error {
1350+
func (d *DB) syncVersions(versions []mandatoryVersion) error {
13131351
meta, err := d.FetchMeta(nil)
13141352
if err != nil {
13151353
if err == ErrMetaNotFound {
@@ -1379,6 +1417,69 @@ func (d *DB) syncVersions(versions []version) error {
13791417
}, func() {})
13801418
}
13811419

1420+
// applyOptionalVersions takes a config to determine whether the optional
1421+
// migrations will be applied.
1422+
//
1423+
// NOTE: only support the prune_revocation_log optional migration atm.
1424+
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1425+
// TODO(yy): need to design the db to support dry run for optional
1426+
// migrations.
1427+
if d.dryRun {
1428+
log.Info("Skipped optional migrations as dry run mode is not " +
1429+
"supported yet")
1430+
return nil
1431+
}
1432+
1433+
om, err := d.fetchOptionalMeta()
1434+
if err != nil {
1435+
if err == ErrMetaNotFound {
1436+
om = &OptionalMeta{
1437+
Versions: make(map[uint64]string),
1438+
}
1439+
} else {
1440+
return err
1441+
}
1442+
}
1443+
1444+
log.Infof("Checking for optional update: prune_revocation_log=%v, "+
1445+
"db_version=%s", cfg.PruneRevocationLog, om)
1446+
1447+
// Exit early if the optional migration is not specified.
1448+
if !cfg.PruneRevocationLog {
1449+
return nil
1450+
}
1451+
1452+
// Exit early if the optional migration has already been applied.
1453+
if _, ok := om.Versions[0]; ok {
1454+
return nil
1455+
}
1456+
1457+
// Get the optional version.
1458+
version := optionalVersions[0]
1459+
log.Infof("Performing database optional migration: %s", version.name)
1460+
1461+
// Migrate the data.
1462+
if err := version.migration(d); err != nil {
1463+
log.Errorf("Unable to apply optional migration: %s, error: %v",
1464+
version.name, err)
1465+
return err
1466+
}
1467+
1468+
// Update the optional meta. Notice that unlike the mandatory db
1469+
// migrations where we perform the migration and updating meta in a
1470+
// single db transaction, we use different transactions here. Even when
1471+
// the following update is failed, we should be fine here as we would
1472+
// re-run the optional migration again, which is a noop, during next
1473+
// startup.
1474+
om.Versions[0] = version.name
1475+
if err := d.putOptionalMeta(om); err != nil {
1476+
log.Errorf("Unable to update optional meta: %v", err)
1477+
return err
1478+
}
1479+
1480+
return nil
1481+
}
1482+
13821483
// ChannelGraph returns the current instance of the directed channel graph.
13831484
func (d *DB) ChannelGraph() *ChannelGraph {
13841485
return d.graph
@@ -1390,13 +1491,15 @@ func (d *DB) ChannelStateDB() *ChannelStateDB {
13901491
return d.channelStateDB
13911492
}
13921493

1393-
func getLatestDBVersion(versions []version) uint32 {
1494+
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
13941495
return versions[len(versions)-1].number
13951496
}
13961497

13971498
// getMigrationsToApply retrieves the migration function that should be
13981499
// applied to the database.
1399-
func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) {
1500+
func getMigrationsToApply(versions []mandatoryVersion,
1501+
version uint32) ([]migration, []uint32) {
1502+
14001503
migrations := make([]migration, 0, len(versions))
14011504
migrationVersions := make([]uint32, 0, len(versions))
14021505

channeldb/log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/lightningnetwork/lnd/channeldb/migration13"
99
"github.com/lightningnetwork/lnd/channeldb/migration16"
1010
"github.com/lightningnetwork/lnd/channeldb/migration24"
11+
"github.com/lightningnetwork/lnd/channeldb/migration30"
1112
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
1213
"github.com/lightningnetwork/lnd/kvdb"
1314
)
@@ -38,5 +39,6 @@ func UseLogger(logger btclog.Logger) {
3839
migration13.UseLogger(logger)
3940
migration16.UseLogger(logger)
4041
migration24.UseLogger(logger)
42+
migration30.UseLogger(logger)
4143
kvdb.UseLogger(logger)
4244
}

channeldb/meta.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package channeldb
22

33
import (
4+
"bytes"
5+
"fmt"
6+
47
"github.com/lightningnetwork/lnd/kvdb"
8+
"github.com/lightningnetwork/lnd/tlv"
59
)
610

711
var (
@@ -12,6 +16,10 @@ var (
1216
// dbVersionKey is a boltdb key and it's used for storing/retrieving
1317
// current database version.
1418
dbVersionKey = []byte("dbp")
19+
20+
// dbVersionKey is a boltdb key and it's used for storing/retrieving
21+
// a list of optional migrations that have been applied.
22+
optionalVersionKey = []byte("ovk")
1523
)
1624

1725
// Meta structure holds the database meta information.
@@ -80,3 +88,92 @@ func putDbVersion(metaBucket kvdb.RwBucket, meta *Meta) error {
8088
byteOrder.PutUint32(scratch, meta.DbVersionNumber)
8189
return metaBucket.Put(dbVersionKey, scratch)
8290
}
91+
92+
// OptionalMeta structure holds the database optional migration information.
93+
type OptionalMeta struct {
94+
// Versions is a set that contains the versions that have been applied.
95+
// When saved to disk, only the indexes are stored.
96+
Versions map[uint64]string
97+
}
98+
99+
func (om *OptionalMeta) String() string {
100+
s := ""
101+
for index, name := range om.Versions {
102+
s += fmt.Sprintf("%d: %s", index, name)
103+
}
104+
if s == "" {
105+
s = "empty"
106+
}
107+
return s
108+
}
109+
110+
// fetchOptionalMeta reads the optional meta from the database.
111+
func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) {
112+
om := &OptionalMeta{
113+
Versions: make(map[uint64]string),
114+
}
115+
116+
err := kvdb.View(d, func(tx kvdb.RTx) error {
117+
metaBucket := tx.ReadBucket(metaBucket)
118+
if metaBucket == nil {
119+
return ErrMetaNotFound
120+
}
121+
122+
vBytes := metaBucket.Get(optionalVersionKey)
123+
// Exit early if nothing found.
124+
if vBytes == nil {
125+
return nil
126+
}
127+
128+
// Read the versions' length.
129+
r := bytes.NewReader(vBytes)
130+
vLen, err := tlv.ReadVarInt(r, &[8]byte{})
131+
if err != nil {
132+
return err
133+
}
134+
135+
// Write the version index.
136+
for i := uint64(0); i < vLen; i++ {
137+
version, err := tlv.ReadVarInt(r, &[8]byte{})
138+
if err != nil {
139+
return err
140+
}
141+
om.Versions[version] = optionalVersions[i].name
142+
}
143+
144+
return nil
145+
}, func() {})
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
return om, nil
151+
}
152+
153+
// fetchOptionalMeta writes an optional meta to the database.
154+
func (d *DB) putOptionalMeta(om *OptionalMeta) error {
155+
return kvdb.Update(d, func(tx kvdb.RwTx) error {
156+
metaBucket, err := tx.CreateTopLevelBucket(metaBucket)
157+
if err != nil {
158+
return err
159+
}
160+
161+
var b bytes.Buffer
162+
163+
// Write the total length.
164+
err = tlv.WriteVarInt(&b, uint64(len(om.Versions)), &[8]byte{})
165+
if err != nil {
166+
return err
167+
}
168+
169+
// Write the version indexes.
170+
for v := range om.Versions {
171+
err := tlv.WriteVarInt(&b, v, &[8]byte{})
172+
if err != nil {
173+
return err
174+
}
175+
}
176+
177+
return metaBucket.Put(optionalVersionKey, b.Bytes())
178+
}, func() {})
179+
}

0 commit comments

Comments
 (0)