Skip to content

Commit 5b80a56

Browse files
using primary and slave DBs to solve the panic problem caused by DB conflicts
Signed-off-by: yingchunliu-zte <[email protected]>
1 parent 0cecda6 commit 5b80a56

File tree

3 files changed

+322
-31
lines changed

3 files changed

+322
-31
lines changed

bucket.go

+105-13
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type Bucket struct {
4141
//
4242
// This is non-persisted across transactions so it must be set in every Tx.
4343
FillPercent float64
44+
45+
slave *Bucket
4446
}
4547

4648
// newBucket returns a new bucket associated with a transaction.
@@ -82,10 +84,20 @@ func (b *Bucket) Cursor() *Cursor {
8284
}
8385
}
8486

85-
// Bucket retrieves a nested bucket by name.
87+
// Bucket retrieves a nested bucket by name with slave.
88+
func (b *Bucket) Bucket(name []byte) *Bucket {
89+
rb := b.bucket(name)
90+
if rb != nil && b.slave != nil {
91+
rb.slave = b.slave.bucket(name)
92+
}
93+
94+
return rb
95+
}
96+
97+
// bucket retrieves a nested bucket by name.
8698
// Returns nil if the bucket does not exist.
8799
// The bucket instance is only valid for the lifetime of the transaction.
88-
func (b *Bucket) Bucket(name []byte) *Bucket {
100+
func (b *Bucket) bucket(name []byte) *Bucket {
89101
if b.buckets != nil {
90102
if child := b.buckets[string(name)]; child != nil {
91103
return child
@@ -142,10 +154,20 @@ func (b *Bucket) openBucket(value []byte) *Bucket {
142154
return &child
143155
}
144156

145-
// CreateBucket creates a new bucket at the given key and returns the new bucket.
157+
// CreateBucket creates a new bucket at the given key and returns the new bucket with slave.
158+
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
159+
rb, err = b.createBucket(key)
160+
if err == nil && b.slave != nil {
161+
rb.slave, err = b.slave.createBucket(key)
162+
}
163+
164+
return
165+
}
166+
167+
// createBucket creates a new bucket at the given key and returns the new bucket.
146168
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
147169
// The bucket instance is only valid for the lifetime of the transaction.
148-
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
170+
func (b *Bucket) createBucket(key []byte) (rb *Bucket, err error) {
149171
if lg := b.tx.db.Logger(); lg != discardLogger {
150172
lg.Debugf("Creating bucket %q", key)
151173
defer func() {
@@ -199,10 +221,20 @@ func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
199221
return b.Bucket(newKey), nil
200222
}
201223

202-
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
224+
// CreateBucketIfNotExists creates a new bucket with slave if it doesn't already exist and returns a reference to it.
225+
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
226+
rb, err = b.createBucketIfNotExists(key)
227+
if err == nil && b.slave != nil {
228+
rb.slave, err = b.slave.createBucketIfNotExists(key)
229+
}
230+
231+
return
232+
}
233+
234+
// createBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
203235
// Returns an error if the bucket name is blank, or if the bucket name is too long.
204236
// The bucket instance is only valid for the lifetime of the transaction.
205-
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
237+
func (b *Bucket) createBucketIfNotExists(key []byte) (rb *Bucket, err error) {
206238
if lg := b.tx.db.Logger(); lg != discardLogger {
207239
lg.Debugf("Creating bucket if not exist %q", key)
208240
defer func() {
@@ -269,8 +301,18 @@ func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
269301
}
270302

271303
// DeleteBucket deletes a bucket at the given key.
272-
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
273304
func (b *Bucket) DeleteBucket(key []byte) (err error) {
305+
err = b.deleteBucket(key)
306+
if err == nil && b.slave != nil {
307+
err = b.slave.deleteBucket(key)
308+
}
309+
310+
return
311+
}
312+
313+
// deleteBucket deletes a bucket at the given key.
314+
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
315+
func (b *Bucket) deleteBucket(key []byte) (err error) {
274316
if lg := b.tx.db.Logger(); lg != discardLogger {
275317
lg.Debugf("Deleting bucket %q", key)
276318
defer func() {
@@ -327,13 +369,23 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
327369
return nil
328370
}
329371

372+
// MoveBucket moves a sub-bucket from the source bucket to the destination bucket with slave.
373+
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
374+
err = b.moveBucket(key, dstBucket)
375+
if err == nil && b.slave != nil && dstBucket.slave != nil {
376+
err = b.slave.moveBucket(key, dstBucket.slave)
377+
}
378+
379+
return
380+
}
381+
330382
// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
331383
// Returns an error if
332384
// 1. the sub-bucket cannot be found in the source bucket;
333385
// 2. or the key already exists in the destination bucket;
334386
// 3. or the key represents a non-bucket value;
335387
// 4. the source and destination buckets are the same.
336-
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
388+
func (b *Bucket) moveBucket(key []byte, dstBucket *Bucket) (err error) {
337389
lg := b.tx.db.Logger()
338390
if lg != discardLogger {
339391
lg.Debugf("Moving bucket %q", key)
@@ -445,11 +497,21 @@ func (b *Bucket) Get(key []byte) []byte {
445497
return v
446498
}
447499

448-
// Put sets the value for a key in the bucket.
500+
// Put sets the value for a key in the bucket with slave.
501+
func (b *Bucket) Put(key []byte, value []byte) (err error) {
502+
err = b.put(key, value)
503+
if err == nil && b.slave != nil {
504+
err = b.slave.put(key, value)
505+
}
506+
507+
return
508+
}
509+
510+
// put sets the value for a key in the bucket.
449511
// If the key exist then its previous value will be overwritten.
450512
// Supplied value must remain valid for the life of the transaction.
451513
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
452-
func (b *Bucket) Put(key []byte, value []byte) (err error) {
514+
func (b *Bucket) put(key []byte, value []byte) (err error) {
453515
if lg := b.tx.db.Logger(); lg != discardLogger {
454516
lg.Debugf("Putting key %q", key)
455517
defer func() {
@@ -493,10 +555,20 @@ func (b *Bucket) Put(key []byte, value []byte) (err error) {
493555
return nil
494556
}
495557

558+
// Delete removes a key from the bucket with slave.
559+
func (b *Bucket) Delete(key []byte) (err error) {
560+
err = b.delete(key)
561+
if err == nil && b.slave != nil {
562+
err = b.slave.delete(key)
563+
}
564+
565+
return
566+
}
567+
496568
// Delete removes a key from the bucket.
497569
// If the key does not exist then nothing is done and a nil error is returned.
498570
// Returns an error if the bucket was created from a read-only transaction.
499-
func (b *Bucket) Delete(key []byte) (err error) {
571+
func (b *Bucket) delete(key []byte) (err error) {
500572
if lg := b.tx.db.Logger(); lg != discardLogger {
501573
lg.Debugf("Deleting key %q", key)
502574
defer func() {
@@ -539,8 +611,18 @@ func (b *Bucket) Sequence() uint64 {
539611
return b.InSequence()
540612
}
541613

542-
// SetSequence updates the sequence number for the bucket.
614+
// SetSequence updates the sequence number for the bucket with slave.
543615
func (b *Bucket) SetSequence(v uint64) error {
616+
err := b.setSequence(v)
617+
if err == nil && b.slave != nil {
618+
err = b.slave.setSequence(v)
619+
}
620+
621+
return err
622+
}
623+
624+
// SetSequence updates the sequence number for the bucket.
625+
func (b *Bucket) setSequence(v uint64) error {
544626
if b.tx.db == nil {
545627
return errors.ErrTxClosed
546628
} else if !b.Writable() {
@@ -558,8 +640,18 @@ func (b *Bucket) SetSequence(v uint64) error {
558640
return nil
559641
}
560642

561-
// NextSequence returns an autoincrementing integer for the bucket.
643+
// NextSequence returns an autoincrementing integer for the bucket with slave.
562644
func (b *Bucket) NextSequence() (uint64, error) {
645+
r, err := b.nextSequence()
646+
if err == nil && b.slave != nil {
647+
_, err = b.slave.nextSequence()
648+
}
649+
650+
return r, err
651+
}
652+
653+
// nextSequence returns an autoincrementing integer for the bucket.
654+
func (b *Bucket) nextSequence() (uint64, error) {
563655
if b.tx.db == nil {
564656
return 0, errors.ErrTxClosed
565657
} else if !b.Writable() {

db.go

+128-3
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ type DB struct {
154154
// Read only mode.
155155
// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
156156
readOnly bool
157+
158+
slave *DB
157159
}
158160

159161
// Path returns the path to currently open database file.
@@ -171,11 +173,113 @@ func (db *DB) String() string {
171173
return fmt.Sprintf("DB<%q>", db.path)
172174
}
173175

174-
// Open creates and opens a database at the given path with a given file mode.
176+
// Open creates and opens master database and slave database
177+
func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
178+
openSlave := true
179+
if options != nil && options.OpenSlave != nil {
180+
openSlave = *options.OpenSlave
181+
}
182+
183+
var master *DB
184+
185+
master, err = tryOpenMasterDB(path, 0644, options, openSlave)
186+
if err != nil {
187+
return nil, err
188+
}
189+
190+
if !openSlave {
191+
return master, nil
192+
}
193+
194+
slaveDbPath := path + ".slave"
195+
err = copyFile(path, slaveDbPath)
196+
if err != nil {
197+
return nil, err
198+
}
199+
200+
var slave *DB
201+
slave, err = open(slaveDbPath, 0644, options)
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
master.slave = slave
207+
208+
return master, nil
209+
}
210+
211+
func tryOpenMasterDB(path string, mode os.FileMode, options *Options, openSlave bool) (db *DB, err error) {
212+
slaveDbPath := path + ".slave"
213+
pathBackup := path + ".backup"
214+
215+
defer func() {
216+
if e := recover(); e != nil {
217+
if openSlave {
218+
if _, err := os.Stat(slaveDbPath); err == nil {
219+
if err := os.Rename(slaveDbPath, pathBackup); err != nil {
220+
panic(fmt.Sprintf("rename %s-%s err %v: failed (%v)", slaveDbPath, pathBackup, err, e))
221+
}
222+
} else {
223+
panic(fmt.Sprintf("slave db path %s err %v, by open db %s failed (%v)", slaveDbPath, err, path, e))
224+
}
225+
}
226+
227+
panic(fmt.Sprintf("open db %s failed (%v),rename %s-%s success", path, e, slaveDbPath, pathBackup))
228+
}
229+
}()
230+
231+
if openSlave {
232+
if _, err := os.Stat(pathBackup); err == nil {
233+
if err := os.Rename(pathBackup, path); err != nil {
234+
return nil, err
235+
}
236+
}
237+
}
238+
239+
db, err = open(path, 0644, options)
240+
241+
return
242+
}
243+
244+
func copyFile(src, dst string) error {
245+
os.RemoveAll(dst)
246+
247+
srcFile, err := os.Open(src)
248+
if err != nil {
249+
return err
250+
}
251+
defer srcFile.Close()
252+
253+
tmpDst := dst + ".tmp"
254+
os.RemoveAll(tmpDst)
255+
256+
dstFile, err := os.Create(tmpDst)
257+
if err != nil {
258+
return err
259+
}
260+
defer dstFile.Close()
261+
262+
_, err = io.Copy(dstFile, srcFile)
263+
if err != nil {
264+
return err
265+
}
266+
267+
if err := dstFile.Sync(); err != nil {
268+
return err
269+
}
270+
271+
if err := os.Rename(tmpDst, dst); err != nil {
272+
return err
273+
}
274+
275+
return nil
276+
}
277+
278+
// open creates and opens a database at the given path with a given file mode.
175279
// If the file does not exist then it will be created automatically with a given file mode.
176280
// Passing in nil options will cause Bolt to open the database with the default options.
177281
// Note: For read/write transactions, ensure the owner has write permission on the created/opened database file, e.g. 0600
178-
func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
282+
func open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
179283
db = &DB{
180284
opened: true,
181285
}
@@ -664,10 +768,20 @@ func (db *DB) init() error {
664768
return nil
665769
}
666770

771+
// Close releases all database resources with slave.
772+
func (db *DB) Close() error {
773+
err := db._close()
774+
if err == nil && db.slave != nil {
775+
err = db.slave._close()
776+
}
777+
778+
return err
779+
}
780+
667781
// Close releases all database resources.
668782
// It will block waiting for any open transactions to finish
669783
// before closing the database and returning.
670-
func (db *DB) Close() error {
784+
func (db *DB) _close() error {
671785
db.rwlock.Lock()
672786
defer db.rwlock.Unlock()
673787

@@ -814,6 +928,15 @@ func (db *DB) beginTx() (*Tx, error) {
814928
}
815929

816930
func (db *DB) beginRWTx() (*Tx, error) {
931+
tx, err := db._beginRWTx()
932+
if err == nil && db.slave != nil {
933+
tx.slave, err = db.slave._beginRWTx()
934+
}
935+
936+
return tx, err
937+
}
938+
939+
func (db *DB) _beginRWTx() (*Tx, error) {
817940
// If the database was opened with Options.ReadOnly, return an error.
818941
if db.readOnly {
819942
return nil, berrors.ErrDatabaseReadOnly
@@ -1330,6 +1453,8 @@ type Options struct {
13301453

13311454
// Logger is the logger used for bbolt.
13321455
Logger Logger
1456+
1457+
OpenSlave *bool
13331458
}
13341459

13351460
func (o *Options) String() string {

0 commit comments

Comments
 (0)