From c7e00c6eaeb4463877e78b9a75ecdbd6ad9bc14b Mon Sep 17 00:00:00 2001
From: Aarsh Shah <aarshkshah1992@gmail.com>
Date: Mon, 2 Sep 2019 20:48:14 +0800
Subject: [PATCH] 1. Expire non-provider records older than MaxAge 2. Original
 publisher should republish putvalue records 3. Peers who receive a record
 will republish hourly

---
 dht.go                                |   5 +
 go.mod                                |   1 +
 handlers.go                           |   6 +-
 non_prov_records.go                   | 193 ++++++++++++++++++++++++++
 non_prov_records_test.go              | 166 ++++++++++++++++++++++
 records.go => pk_records.go           |   9 --
 records_test.go => pk_records_test.go |   0
 providers/providers.go                |   7 +-
 providers/providers_test.go           |   2 +-
 routing.go                            |  24 ++++
 10 files changed, 399 insertions(+), 14 deletions(-)
 create mode 100644 non_prov_records.go
 create mode 100644 non_prov_records_test.go
 rename records.go => pk_records.go (87%)
 rename records_test.go => pk_records_test.go (100%)

diff --git a/dht.go b/dht.go
index ce2151dfb..63d3e54b3 100644
--- a/dht.go
+++ b/dht.go
@@ -53,6 +53,8 @@ type IpfsDHT struct {
 	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
 	providers    *providers.ProviderManager
 
+	nonProvRecordsManager *NonProvRecordsManager
+
 	birth time.Time // When this peer started up
 
 	Validator record.Validator
@@ -98,6 +100,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
 	})
 
 	dht.proc.AddChild(dht.providers.Process())
+	dht.proc.AddChild(dht.nonProvRecordsManager.Process())
 	dht.Validator = cfg.Validator
 
 	if !cfg.Client {
@@ -105,6 +108,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
 			h.SetStreamHandler(p, dht.handleNewStream)
 		}
 	}
+
 	return dht, nil
 }
 
@@ -156,6 +160,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
 	}
 
 	dht.ctx = dht.newContextWithLocalTags(ctx)
+	dht.nonProvRecordsManager = NewNonProvRecordsManager(ctx, dht, dstore)
 
 	return dht
 }
diff --git a/go.mod b/go.mod
index 55b7713a3..7eb2086d5 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
 	github.com/mr-tron/base58 v1.1.2
 	github.com/multiformats/go-multiaddr v0.0.4
 	github.com/multiformats/go-multiaddr-dns v0.0.3
+	github.com/multiformats/go-multihash v0.0.5
 	github.com/multiformats/go-multistream v0.1.0
 	github.com/stretchr/testify v1.3.0
 	github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
diff --git a/handlers.go b/handlers.go
index 95f60e674..a0c730520 100644
--- a/handlers.go
+++ b/handlers.go
@@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
 		recordIsBad = true
 	}
 
-	if time.Since(recvtime) > MaxRecordAge {
+	if time.Since(recvtime) > maxNonProvRecordAge {
 		logger.Debug("old record found, tossing.")
 		recordIsBad = true
 	}
@@ -396,3 +396,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
 func convertToDsKey(s []byte) ds.Key {
 	return ds.NewKey(base32.RawStdEncoding.EncodeToString(s))
 }
+
+func convertToOriginalKey(k string) ([]byte, error) {
+	return base32.RawStdEncoding.DecodeString(k)
+}
diff --git a/non_prov_records.go b/non_prov_records.go
new file mode 100644
index 000000000..b72a0611e
--- /dev/null
+++ b/non_prov_records.go
@@ -0,0 +1,193 @@
+package dht
+
+import (
+	"context"
+	"math/rand"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/gogo/protobuf/proto"
+	ds "github.com/ipfs/go-datastore"
+	"github.com/ipfs/go-datastore/query"
+	u "github.com/ipfs/go-ipfs-util"
+	"github.com/jbenet/goprocess"
+	"github.com/jbenet/goprocess/context"
+	"github.com/libp2p/go-libp2p-kad-dht/providers"
+	recpb "github.com/libp2p/go-libp2p-record/pb"
+)
+
+// vars for cleaning up expired records
+var nonProvRecordCleanupInterval = time.Hour
+
+// maxNonProvRecordAge specifies the maximum time that any node will hold onto a record
+// from the time its received. This does not apply to any other forms of validity that
+// the record may contain.
+// For example, a record may contain an ipns entry with an EOL saying its valid
+// until the year 2020 (a great time in the future). For that record to stick around
+// it must be rebroadcasted more frequently than once every 'maxNonProvRecordAge'
+var maxNonProvRecordAge = time.Hour * 36
+
+// vars for republishing records
+var nonProvRecordRePublishInterval = 1 * time.Hour
+var nonProvRecordRePublishAge = 1 * time.Hour
+var enableRepublishJitter = true
+
+type NonProvRecordsManager struct {
+	dht *IpfsDHT
+	ctx context.Context
+
+	proc   goprocess.Process
+	dstore ds.Batching
+
+	cleanupInterval time.Duration // scan interval for expiring records
+
+	rePublishInterval time.Duration // scan interval for republishing records
+}
+
+func NewNonProvRecordsManager(ctx context.Context, dht *IpfsDHT, dstore ds.Batching) *NonProvRecordsManager {
+	m := new(NonProvRecordsManager)
+	m.dht = dht
+	m.ctx = ctx
+	m.dstore = dstore
+	m.proc = goprocessctx.WithContext(ctx)
+
+	// expire records beyond maxage
+	m.cleanupInterval = nonProvRecordCleanupInterval
+	m.proc.Go(m.expire)
+
+	// republish records older than prescribed age
+	m.rePublishInterval = nonProvRecordRePublishInterval
+	m.proc.Go(m.rePublish)
+
+	return m
+}
+
+func (m *NonProvRecordsManager) Process() goprocess.Process {
+	return m.proc
+}
+
+func (m *NonProvRecordsManager) rePublish(proc goprocess.Process) {
+	for {
+		var d = 0 * time.Minute
+		// minimizes the probability of all peers re-publishing together
+		// the first peer that re-publishes resets the receivedAt time on the record
+		// on all other peers that are among the K closest to the key, thus minimizing the number of republishes by other peers
+		if enableRepublishJitter {
+			d = time.Duration(rand.Intn(16)) * time.Minute
+		}
+
+		select {
+		case <-proc.Closing():
+			return
+		case <-time.After(m.rePublishInterval + d):
+		}
+
+		tFnc := func(t time.Time) bool {
+			return time.Since(t) > nonProvRecordRePublishAge && time.Since(t) < maxNonProvRecordAge
+		}
+
+		res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
+		if err != nil {
+			logger.Errorf("republish records proc: failed to run query against datastore, error is %s", err)
+			continue
+		}
+
+		var wg sync.WaitGroup
+		// semaphore to rate-limit number of concurrent PutValue calls
+		semaphore := make(chan struct{}, 5)
+		for {
+			e, ok := res.NextSync()
+			if !ok {
+				break
+			}
+
+			semaphore <- struct{}{}
+			wg.Add(1)
+			go func(e query.Result) {
+				defer func() {
+					<-semaphore
+					wg.Done()
+				}()
+
+				// unmarshal record
+				rec := new(recpb.Record)
+				if err := proto.Unmarshal(e.Value, rec); err != nil {
+					logger.Debugf("republish records proc: failed to unmarshal DHT record from datastore, error is %s", err)
+					return
+				}
+
+				// call put value
+				putCtx, cancel := context.WithTimeout(m.ctx, 2*time.Minute)
+				defer cancel()
+
+				// do not use e.key here as that represents the transformed version of the original key
+				// rec.GetKey is the original key sent by the peer who put this record to dht
+				if err := m.dht.PutValue(putCtx, string(rec.GetKey()), rec.Value); err != nil {
+					logger.Debugf("republish records proc: failed to re-publish to the network, error is %s", err)
+				}
+			}(e)
+		}
+		wg.Wait()
+	}
+}
+
+func (m *NonProvRecordsManager) expire(proc goprocess.Process) {
+	for {
+		select {
+		case <-proc.Closing():
+			return
+		case <-time.After(m.cleanupInterval):
+		}
+
+		tFnc := func(t time.Time) bool {
+			return time.Since(t) > maxNonProvRecordAge
+		}
+
+		res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
+		if err != nil {
+			logger.Errorf("expire records proc: failed to run query against datastore, error is %s", err)
+			continue
+		}
+
+		for {
+			e, ok := res.NextSync()
+			if !ok {
+				break
+			}
+			if err := m.dstore.Delete(ds.RawKey(e.Key)); err != nil {
+				logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %s", e.Key, err)
+			}
+		}
+	}
+}
+
+type timeFilterFnc = func(t time.Time) bool
+
+type nonProvRecordFilter struct {
+	tFnc timeFilterFnc
+}
+
+func (f *nonProvRecordFilter) Filter(e query.Entry) bool {
+	// unmarshal record
+	rec := new(recpb.Record)
+	if err := proto.Unmarshal(e.Value, rec); err != nil {
+		logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %s", err)
+		return false
+	}
+
+	// should not be a provider record
+	if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) {
+		return false
+	}
+
+	// parse received time
+	t, err := u.ParseRFC3339(rec.TimeReceived)
+	if err != nil {
+		logger.Debugf("expire records filter: failed to parse time in DHT record, error is %s", err)
+		return false
+	}
+
+	// apply the time filter fnc to the received time
+	return f.tFnc(t)
+}
diff --git a/non_prov_records_test.go b/non_prov_records_test.go
new file mode 100644
index 000000000..c11172239
--- /dev/null
+++ b/non_prov_records_test.go
@@ -0,0 +1,166 @@
+package dht
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/ipfs/go-cid"
+	ds "github.com/ipfs/go-datastore"
+	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
+	recpb "github.com/libp2p/go-libp2p-record/pb"
+	"github.com/multiformats/go-multihash"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestExpireNonProviderRecords(t *testing.T) {
+	// short sweep duration for testing
+	sVal := nonProvRecordCleanupInterval
+	defer func() { nonProvRecordCleanupInterval = sVal }()
+	nonProvRecordCleanupInterval = 10 * time.Millisecond
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// TEST expiry does not happen if age(record) < MaxAge
+
+	dhtA := setupDHT(ctx, t, false)
+	dhtB := setupDHT(ctx, t, false)
+	connect(t, ctx, dhtA, dhtB)
+
+	// dhtA puts non-provider record with current time which WILL get stored on B
+	key1 := "/v/key1"
+	value1 := []byte("v1")
+	assert.NoError(t, dhtA.PutValue(ctx, key1, value1))
+
+	// sweep will not delete it
+	time.Sleep(100 * time.Millisecond)
+
+	// get & verify it's present on B
+	_, err := dhtB.datastore.Get(convertToDsKey([]byte(key1)))
+	assert.NoError(t, err)
+
+	// cleanup
+	dhtA.Close()
+	dhtA.host.Close()
+	dhtB.Close()
+	dhtB.host.Close()
+
+	// TEST expiry happens if age(record) > MaxAge
+
+	mVal := maxNonProvRecordAge
+	maxNonProvRecordAge = 50 * time.Millisecond
+	defer func() { maxNonProvRecordAge = mVal }()
+
+	dhtA = setupDHT(ctx, t, false)
+	dhtB = setupDHT(ctx, t, false)
+	connect(t, ctx, dhtA, dhtB)
+	defer func() {
+		dhtA.Close()
+		dhtA.host.Close()
+		dhtB.Close()
+		dhtB.host.Close()
+	}()
+
+	// dhtA puts non-provider record with current time
+	assert.NoError(t, dhtA.PutValue(ctx, key1, value1))
+
+	// dhtA adds provider record with current time
+	mh, err := multihash.Sum([]byte("data"), multihash.SHA2_256, -1)
+	assert.NoError(t, err)
+	c := cid.NewCidV0(mh)
+	assert.NoError(t, dhtA.Provide(ctx, c, true))
+
+	// sweep will remove non-provider record on B now
+	time.Sleep(1 * time.Second)
+
+	// verify non-provider record is absent on B
+	_, err = dhtB.datastore.Get(convertToDsKey([]byte(key1)))
+	assert.Equal(t, ds.ErrNotFound, err)
+
+	// but.... provider record is still available
+	m, err := getTestProvRecord(t, ctx, dhtB, c)
+	assert.NoError(t, err)
+	assert.NotEmpty(t, m.ProviderPeers)
+}
+
+func TestRepublishNonProvRecords(t *testing.T) {
+	// short scan duration for re-publish
+	sVal := nonProvRecordRePublishInterval
+	nonProvRecordRePublishInterval = 10 * time.Millisecond
+	jVal := enableRepublishJitter
+	enableRepublishJitter = false
+	defer func() {
+		enableRepublishJitter = jVal
+		nonProvRecordRePublishInterval = sVal
+	}()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// TEST re-publish does not happen if age(record) < republishAge
+
+	dhtA := setupDHT(ctx, t, false)
+	dhtB := setupDHT(ctx, t, false)
+	connect(t, ctx, dhtA, dhtB)
+
+	// dhtA puts non-provider record with current time which WILL get stored on B
+	key1 := "/v/key1"
+	value1 := []byte("v1")
+	assert.NoError(t, dhtA.PutValue(ctx, key1, value1))
+
+	// dhtA DELETES it locally
+	assert.NoError(t, dhtA.datastore.Delete(convertToDsKey([]byte(key1))))
+
+	// it will not  be re-published by B
+	time.Sleep(2 * time.Second)
+
+	// get on dhtA & verify it's absent
+	_, err := dhtA.datastore.Get(convertToDsKey([]byte(key1)))
+	assert.Equal(t, ds.ErrNotFound, err)
+
+	// cleanup
+	dhtA.Close()
+	dhtA.host.Close()
+	dhtB.Close()
+	dhtB.host.Close()
+
+	// TEST re-publish happens if age(record) > republishAge
+
+	mVal := nonProvRecordRePublishAge
+	nonProvRecordRePublishAge = 100 * time.Millisecond
+	defer func() { nonProvRecordRePublishAge = mVal }()
+
+	dhtA = setupDHT(ctx, t, false)
+	dhtB = setupDHT(ctx, t, false)
+	connect(t, ctx, dhtA, dhtB)
+	defer func() {
+		dhtA.Close()
+		dhtA.host.Close()
+		dhtB.Close()
+		dhtB.host.Close()
+	}()
+
+	// dhtA puts non-provider record with current time
+	assert.NoError(t, dhtA.PutValue(ctx, key1, value1))
+
+	// dhtA DELETES it locally
+	assert.NoError(t, dhtA.datastore.Delete(convertToDsKey([]byte(key1))))
+
+	// it will be re-published by B
+	time.Sleep(2 * time.Second)
+
+	// get on dhtA & verify key is present (because it SHOULD have been re-published by B)
+	v, err := dhtA.datastore.Get(convertToDsKey([]byte(key1)))
+	assert.NoError(t, err)
+	rec := new(recpb.Record)
+	assert.NoError(t, proto.Unmarshal(v, rec))
+	assert.Equal(t, value1, rec.Value)
+}
+
+func getTestProvRecord(t *testing.T, ctx context.Context, d *IpfsDHT, c cid.Cid) (*pb.Message, error) {
+	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, c.Bytes(), 0)
+	m, err := d.handleGetProviders(ctx, "test peer", pmes)
+	return m, err
+}
diff --git a/records.go b/pk_records.go
similarity index 87%
rename from records.go
rename to pk_records.go
index 5f641b056..7a1ee0ca0 100644
--- a/records.go
+++ b/pk_records.go
@@ -3,7 +3,6 @@ package dht
 import (
 	"context"
 	"fmt"
-	"time"
 
 	"github.com/libp2p/go-libp2p-core/peer"
 	"github.com/libp2p/go-libp2p-core/routing"
@@ -11,14 +10,6 @@ import (
 	ci "github.com/libp2p/go-libp2p-core/crypto"
 )
 
-// MaxRecordAge specifies the maximum time that any node will hold onto a record
-// from the time its received. This does not apply to any other forms of validity that
-// the record may contain.
-// For example, a record may contain an ipns entry with an EOL saying its valid
-// until the year 2020 (a great time in the future). For that record to stick around
-// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
-const MaxRecordAge = time.Hour * 36
-
 type pubkrs struct {
 	pubk ci.PubKey
 	err  error
diff --git a/records_test.go b/pk_records_test.go
similarity index 100%
rename from records_test.go
rename to pk_records_test.go
diff --git a/providers/providers.go b/providers/providers.go
index ec44cc511..c72125152 100644
--- a/providers/providers.go
+++ b/providers/providers.go
@@ -74,10 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
 	return pm
 }
 
-const providersKeyPrefix = "/providers/"
+// prefix to be used for all provider record keys
+const ProvidersKeyPrefix = "/providers/"
 
 func mkProvKey(k cid.Cid) string {
-	return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
+	return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
 }
 
 func (pm *ProviderManager) Process() goprocess.Process {
@@ -284,7 +285,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
 
 			// Now, kick off a GC of the datastore.
 			q, err := pm.dstore.Query(dsq.Query{
-				Prefix: providersKeyPrefix,
+				Prefix: ProvidersKeyPrefix,
 			})
 			if err != nil {
 				log.Error("provider record GC query failed: ", err)
diff --git a/providers/providers_test.go b/providers/providers_test.go
index 58756c55c..08423ddf6 100644
--- a/providers/providers_test.go
+++ b/providers/providers_test.go
@@ -185,7 +185,7 @@ func TestProvidesExpire(t *testing.T) {
 		t.Fatal("providers map not cleaned up")
 	}
 
-	res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix})
+	res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/routing.go b/routing.go
index 9435d1d98..f26b8e314 100644
--- a/routing.go
+++ b/routing.go
@@ -27,6 +27,8 @@ import (
 // results will wait for the channel to drain.
 var asyncQueryBuffer = 10
 
+var putValueRepublishInterval = 24 * time.Hour
+
 // This file implements the Routing interface for the IpfsDHT struct.
 
 // Basic Put/Get
@@ -98,6 +100,28 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
 		}(p)
 	}
 	wg.Wait()
+
+	// original publisher should keep re-publishing the record because the network isn't `steady`/`stable`
+	// and the K closet peers we just published to can become unavailable / no longer be the K closet
+	go func() {
+		for {
+			select {
+			case <-dht.proc.Closing():
+				return
+			case <-time.After(putValueRepublishInterval):
+				// TODO:We can not re-use the original context here as it may have expired
+				// But, is it fair to use this one ?
+				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
+				if err := dht.PutValue(ctx, key, value, opts...); err != nil {
+					logger.Errorf("putValue republish proc: failed to republish key %s, error is %+v", key, err)
+				} else {
+					logger.Debugf("putValue republish proc: successfully republished key %s", key)
+				}
+				cancel()
+			}
+		}
+	}()
+
 	return nil
 }