From 56a0794a670c63c704444c054c3de3900a608d9d Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 2 Sep 2019 20:48:14 +0800 Subject: [PATCH] 1. Expire non-provider records older than MaxAge 2. Original publisher shoulld republish putvalue records --- dht.go | 9 ++++ go.mod | 1 + handlers.go | 2 +- providers/providers.go | 7 +-- providers/providers_test.go | 2 +- records.go | 74 ++++++++++++++++++++++++-- records_test.go | 102 ++++++++++++++++++++++++++++++++++-- routing.go | 24 +++++++++ 8 files changed, 208 insertions(+), 13 deletions(-) diff --git a/dht.go b/dht.go index ce2151dfb..1bb20c3e0 100644 --- a/dht.go +++ b/dht.go @@ -68,6 +68,8 @@ type IpfsDHT struct { stripedPutLocks [256]sync.Mutex protocols []protocol.ID // DHT protocols + + nonProvRecordsCleanupInterval time.Duration } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -100,11 +102,18 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator + // proc to expire putValue records + dht.nonProvRecordsCleanupInterval = nonProvRecordsCleanupInterval + recordExpiryProc := goprocessctx.WithContext(ctx) + recordExpiryProc.Go(dht.expireNonProviderRecords) + dht.proc.AddChild(recordExpiryProc) + if !cfg.Client { for _, p := range cfg.Protocols { h.SetStreamHandler(p, dht.handleNewStream) } } + return dht, nil } diff --git a/go.mod b/go.mod index 770abdc53..958eff7c7 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/mr-tron/base58 v1.1.2 github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-dns v0.0.2 + github.com/multiformats/go-multihash v0.0.5 github.com/multiformats/go-multistream v0.1.0 github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc diff --git a/handlers.go b/handlers.go index 95f60e674..c1c425840 100644 --- a/handlers.go +++ b/handlers.go @@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) { recordIsBad = true } - if time.Since(recvtime) > MaxRecordAge { + if time.Since(recvtime) > maxNonProviderRecordAge { logger.Debug("old record found, tossing.") recordIsBad = true } diff --git a/providers/providers.go b/providers/providers.go index ec44cc511..c72125152 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -74,10 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) return pm } -const providersKeyPrefix = "/providers/" +// prefix to be used for all provider record keys +const ProvidersKeyPrefix = "/providers/" func mkProvKey(k cid.Cid) string { - return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()) + return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()) } func (pm *ProviderManager) Process() goprocess.Process { @@ -284,7 +285,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { // Now, kick off a GC of the datastore. q, err := pm.dstore.Query(dsq.Query{ - Prefix: providersKeyPrefix, + Prefix: ProvidersKeyPrefix, }) if err != nil { log.Error("provider record GC query failed: ", err) diff --git a/providers/providers_test.go b/providers/providers_test.go index 58756c55c..08423ddf6 100644 --- a/providers/providers_test.go +++ b/providers/providers_test.go @@ -185,7 +185,7 @@ func TestProvidesExpire(t *testing.T) { t.Fatal("providers map not cleaned up") } - res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix}) + res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix}) if err != nil { t.Fatal(err) } diff --git a/records.go b/records.go index 5f641b056..c04e290c0 100644 --- a/records.go +++ b/records.go @@ -3,21 +3,30 @@ package dht import ( "context" "fmt" + "strings" "time" + "github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + u "github.com/ipfs/go-ipfs-util" + "github.com/jbenet/goprocess" + ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" - - ci "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-kad-dht/providers" + recpb "github.com/libp2p/go-libp2p-record/pb" ) -// MaxRecordAge specifies the maximum time that any node will hold onto a record +// maxNonProviderRecordAge specifies the maximum time that any node will hold onto a record // from the time its received. This does not apply to any other forms of validity that // the record may contain. // For example, a record may contain an ipns entry with an EOL saying its valid // until the year 2020 (a great time in the future). For that record to stick around -// it must be rebroadcasted more frequently than once every 'MaxRecordAge' -const MaxRecordAge = time.Hour * 36 +// it must be rebroadcasted more frequently than once every 'maxNonProviderRecordAge' +var maxNonProviderRecordAge = time.Hour * 12 + +var nonProvRecordsCleanupInterval = time.Hour * 1 type pubkrs struct { pubk ci.PubKey @@ -135,3 +144,58 @@ func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.Pub logger.Debugf("Got public key from node %v itself", p) return pubk, nil } + +func (dht *IpfsDHT) expireNonProviderRecords(proc goprocess.Process) { + for { + select { + case <-proc.Closing(): + return + case <-time.After(dht.nonProvRecordsCleanupInterval): + } + + res, err := dht.datastore.Query(query.Query{Filters: []query.Filter{&expireRecordFilter{}}}) + if err != nil { + logger.Errorf("expire records proc: failed to run query against datastore, error is %+v", err) + continue + } + + for { + e, ok := res.NextSync() + if !ok { + break + } + if err := dht.datastore.Delete(ds.RawKey(e.Key)); err != nil { + logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %+v", e.Key, err) + } + } + } +} + +type expireRecordFilter struct{} + +func (f *expireRecordFilter) Filter(e query.Entry) bool { + // unmarshal record + rec := new(recpb.Record) + if err := proto.Unmarshal(e.Value, rec); err != nil { + logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %+v", err) + return false + } + + // should not be a provider record + if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) { + return false + } + + // age should be greater than maxNonProviderRecordAge + t, err := u.ParseRFC3339(rec.TimeReceived) + if err != nil { + logger.Debugf("expire records filter: failed to parse time in DHT record, error is %+v", err) + return false + } + + if time.Since(t) > maxNonProviderRecordAge { + return true + } + + return false +} diff --git a/records_test.go b/records_test.go index 7083dcdd9..72ace81fb 100644 --- a/records_test.go +++ b/records_test.go @@ -3,16 +3,22 @@ package dht import ( "context" "crypto/rand" - "github.com/libp2p/go-libp2p-core/test" "testing" "time" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/test" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" + + ds "github.com/ipfs/go-datastore" u "github.com/ipfs/go-ipfs-util" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" - record "github.com/libp2p/go-libp2p-record" - tnet "github.com/libp2p/go-libp2p-testing/net" + pb "github.com/libp2p/go-libp2p-kad-dht/pb" + "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-testing/net" ) // Check that GetPublicKey() correctly extracts a public key @@ -305,3 +311,93 @@ func TestPubkeyGoodKeyFromDHTGoodKeyDirect(t *testing.T) { t.Fatal("got incorrect public key") } } + +func TestExpireNonProviderRecords(t *testing.T) { + // short sweep duration for testing + sVal := nonProvRecordsCleanupInterval + defer func() { nonProvRecordsCleanupInterval = sVal }() + nonProvRecordsCleanupInterval = 10 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // helper functions + putRecord := func(d *IpfsDHT, key string, value []byte) error { + rec := record.MakePutRecord(key, value) + pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0) + pmes.Record = rec + _, err := d.handlePutValue(ctx, "testpeer", pmes) + return err + } + + addProv := func(d *IpfsDHT, c cid.Cid) error { + msg, err := d.makeProvRecord(c) + pi := peer.AddrInfo{ + ID: "testpeer", + Addrs: d.host.Addrs(), + } + msg.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi}) + assert.NoError(t, err) + + _, err = d.handleAddProvider(ctx, "testpeer", msg) + return err + } + + getProv := func(d *IpfsDHT, c cid.Cid) (*pb.Message, error) { + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, c.Bytes(), 0) + m, err := d.handleGetProviders(ctx, "test peer", pmes) + return m, err + } + + // TEST expiry does not happen if age(record) < MaxAge + d := setupDHT(ctx, t, false) + + // put non-provider record with current time + key1 := "/v/key1" + value1 := []byte("v1") + assert.NoError(t, putRecord(d, key1, value1)) + + // sweep will not delete it + time.Sleep(100 * time.Millisecond) + + // get & verify it's present + + // we need to check the datastore for non-provider records to test the expiry Proc + // because a side-effect of handle get value is also that it deletes records which are beyond MaxAge + // & we do not want to hit that path + _, err := d.datastore.Get(convertToDsKey([]byte(key1))) + assert.NoError(t, err) + d.Close() + d.host.Close() + + // TEST expiry happens if age(record) > MaxAge + mVal := maxNonProviderRecordAge + maxNonProviderRecordAge = 50 * time.Millisecond + defer func() { maxNonProviderRecordAge = mVal }() + + d = setupDHT(ctx, t, false) + + // put non-provider record with current time + assert.NoError(t, putRecord(d, key1, value1)) + + // add provider record with current time + mh, err := multihash.Sum([]byte("data"), multihash.SHA2_256, -1) + assert.NoError(t, err) + c := cid.NewCidV0(mh) + assert.NoError(t, addProv(d, c)) + + // sweep will remove non-provider record now + time.Sleep(100 * time.Millisecond) + + // verify non-provider record is absent + _, err = d.datastore.Get(convertToDsKey([]byte(key1))) + assert.Equal(t, ds.ErrNotFound, err) + + // but.... provider record is still available + m, err := getProv(d, c) + assert.NoError(t, err) + assert.NotEmpty(t, m.ProviderPeers) + + d.Close() + d.host.Close() +} diff --git a/routing.go b/routing.go index 9435d1d98..041e57ed1 100644 --- a/routing.go +++ b/routing.go @@ -27,6 +27,8 @@ import ( // results will wait for the channel to drain. var asyncQueryBuffer = 10 +var putValueRepublishInterval = 6 * time.Hour + // This file implements the Routing interface for the IpfsDHT struct. // Basic Put/Get @@ -98,6 +100,28 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts }(p) } wg.Wait() + + // original publisher should keep re-publishing the record because the network isn't `steady`/`stable` + // and the K closet peers we just published to can become unavailable / no longer be the K closet + go func() { + for { + select { + case <-dht.proc.Closing(): + return + case <-time.After(putValueRepublishInterval): + // TODO:We can not re-use the original context here as it may have expired + // But, is it fair to use this one ? + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + if err := dht.PutValue(ctx, key, value, opts...); err != nil { + logger.Errorf("putValue republish proc: failed to republish key %s, error is %+v", key, err) + } else { + logger.Debugf("putValue republish proc: successfully republished key %s", key) + } + cancel() + } + } + }() + return nil }