Skip to content

Commit b47a5e3

Browse files
authoredAug 19, 2019
Merge pull request #33 from aschmahmann/feat/persistence
Add Persistence Layer on top of PubSub
2 parents 1b619f8 + 981b38c commit b47a5e3

9 files changed

+1137
-42
lines changed
 

‎fetch.go

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package namesys
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
ggio "github.com/gogo/protobuf/io"
9+
"github.com/gogo/protobuf/proto"
10+
11+
"github.com/libp2p/go-libp2p-core/helpers"
12+
"github.com/libp2p/go-libp2p-core/host"
13+
"github.com/libp2p/go-libp2p-core/network"
14+
"github.com/libp2p/go-libp2p-core/peer"
15+
"github.com/libp2p/go-libp2p-core/protocol"
16+
17+
pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
18+
)
19+
20+
const FetchProtoID = protocol.ID("/libp2p/fetch/0.0.1")
21+
22+
type fetchProtocol struct {
23+
ctx context.Context
24+
host host.Host
25+
}
26+
27+
type getValue func(key string) ([]byte, error)
28+
29+
func newFetchProtocol(ctx context.Context, host host.Host, getData getValue) *fetchProtocol {
30+
p := &fetchProtocol{ctx, host}
31+
32+
host.SetStreamHandler(FetchProtoID, func(s network.Stream) {
33+
p.receive(s, getData)
34+
})
35+
36+
return p
37+
}
38+
39+
func (p *fetchProtocol) receive(s network.Stream, getData getValue) {
40+
defer helpers.FullClose(s)
41+
42+
msg := &pb.FetchRequest{}
43+
if err := readMsg(p.ctx, s, msg); err != nil {
44+
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err)
45+
s.Reset()
46+
return
47+
}
48+
49+
response, err := getData(msg.Identifier)
50+
var respProto pb.FetchResponse
51+
52+
if err != nil {
53+
respProto = pb.FetchResponse{Status: pb.FetchResponse_NOT_FOUND}
54+
} else {
55+
respProto = pb.FetchResponse{Data: response}
56+
}
57+
58+
if err := writeMsg(p.ctx, s, &respProto); err != nil {
59+
return
60+
}
61+
}
62+
63+
func (p *fetchProtocol) Fetch(ctx context.Context, pid peer.ID, key string) ([]byte, error) {
64+
peerCtx, cancel := context.WithTimeout(ctx, time.Second*10)
65+
defer cancel()
66+
67+
s, err := p.host.NewStream(peerCtx, pid, FetchProtoID)
68+
if err != nil {
69+
return nil, err
70+
}
71+
defer helpers.FullClose(s)
72+
73+
msg := &pb.FetchRequest{Identifier: key}
74+
75+
if err := writeMsg(ctx, s, msg); err != nil {
76+
return nil, err
77+
}
78+
s.Close()
79+
80+
response := &pb.FetchResponse{}
81+
if err := readMsg(ctx, s, response); err != nil {
82+
return nil, err
83+
}
84+
85+
switch response.Status {
86+
case pb.FetchResponse_OK:
87+
return response.Data, nil
88+
case pb.FetchResponse_NOT_FOUND:
89+
return nil, nil
90+
default:
91+
return nil, errors.New("fetch: received unknown status code")
92+
}
93+
}
94+
95+
func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
96+
done := make(chan error, 1)
97+
go func() {
98+
wc := ggio.NewDelimitedWriter(s)
99+
100+
if err := wc.WriteMsg(msg); err != nil {
101+
done <- err
102+
return
103+
}
104+
105+
done <- nil
106+
}()
107+
108+
var retErr error
109+
select {
110+
case retErr = <-done:
111+
case <-ctx.Done():
112+
retErr = ctx.Err()
113+
}
114+
115+
if retErr != nil {
116+
s.Reset()
117+
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr)
118+
}
119+
return retErr
120+
}
121+
122+
func readMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
123+
done := make(chan error, 1)
124+
go func() {
125+
r := ggio.NewDelimitedReader(s, 1<<20)
126+
if err := r.ReadMsg(msg); err != nil {
127+
done <- err
128+
return
129+
}
130+
done <- nil
131+
}()
132+
133+
select {
134+
case err := <-done:
135+
return err
136+
case <-ctx.Done():
137+
s.Reset()
138+
return ctx.Err()
139+
}
140+
}

‎fetch_test.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package namesys
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"testing"
8+
"time"
9+
10+
"github.com/libp2p/go-libp2p-core/host"
11+
)
12+
13+
func connect(t *testing.T, a, b host.Host) {
14+
pinfo := a.Peerstore().PeerInfo(a.ID())
15+
err := b.Connect(context.Background(), pinfo)
16+
if err != nil {
17+
t.Fatal(err)
18+
}
19+
}
20+
21+
type datastore struct {
22+
data map[string][]byte
23+
}
24+
25+
func (d *datastore) Lookup(key string) ([]byte, error) {
26+
v, ok := d.data[key]
27+
if !ok {
28+
return nil, errors.New("key not found")
29+
}
30+
return v, nil
31+
}
32+
33+
func TestFetchProtocolTrip(t *testing.T) {
34+
ctx, cancel := context.WithCancel(context.Background())
35+
defer cancel()
36+
37+
hosts := newNetHosts(ctx, t, 2)
38+
connect(t, hosts[0], hosts[1])
39+
40+
// wait for hosts to get connected
41+
time.Sleep(time.Millisecond * 100)
42+
43+
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
44+
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)
45+
46+
d2 := &datastore{map[string][]byte{"key": []byte("value2")}}
47+
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)
48+
49+
fetchCheck(t, ctx, h1, h2, "key", []byte("value2"))
50+
fetchCheck(t, ctx, h2, h1, "key", []byte("value1"))
51+
}
52+
53+
func TestFetchProtocolNotFound(t *testing.T) {
54+
ctx, cancel := context.WithCancel(context.Background())
55+
defer cancel()
56+
57+
hosts := newNetHosts(ctx, t, 2)
58+
connect(t, hosts[0], hosts[1])
59+
60+
// wait for hosts to get connected
61+
time.Sleep(time.Millisecond * 100)
62+
63+
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
64+
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)
65+
66+
d2 := &datastore{make(map[string][]byte)}
67+
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)
68+
69+
fetchCheck(t, ctx, h1, h2, "key", nil)
70+
fetchCheck(t, ctx, h2, h1, "key", []byte("value1"))
71+
}
72+
73+
func TestFetchProtocolRepeated(t *testing.T) {
74+
ctx, cancel := context.WithCancel(context.Background())
75+
defer cancel()
76+
77+
hosts := newNetHosts(ctx, t, 2)
78+
connect(t, hosts[0], hosts[1])
79+
80+
// wait for hosts to get connected
81+
time.Sleep(time.Millisecond * 100)
82+
83+
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
84+
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)
85+
86+
d2 := &datastore{make(map[string][]byte)}
87+
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)
88+
89+
for i := 0; i < 10; i++ {
90+
fetchCheck(t, ctx, h1, h2, "key", nil)
91+
fetchCheck(t, ctx, h2, h1, "key", []byte("value1"))
92+
}
93+
}
94+
95+
func fetchCheck(t *testing.T, ctx context.Context,
96+
requester *fetchProtocol, responder *fetchProtocol, key string, expected []byte) {
97+
data, err := requester.Fetch(ctx, responder.host.ID(), key)
98+
if err != nil {
99+
t.Fatal(err)
100+
}
101+
102+
if !bytes.Equal(data, expected) {
103+
t.Fatalf("expected: %v, received: %v", string(expected), string(data))
104+
}
105+
106+
if (data == nil && expected != nil) || (data != nil && expected == nil) {
107+
t.Fatalf("expected []byte{} or nil and received the opposite")
108+
}
109+
}

‎go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
module github.com/libp2p/go-libp2p-pubsub-router
22

33
require (
4+
github.com/gogo/protobuf v1.2.1
45
github.com/ipfs/go-cid v0.0.2
56
github.com/ipfs/go-datastore v0.0.5
67
github.com/ipfs/go-ipfs-ds-help v0.0.1
78
github.com/ipfs/go-ipfs-util v0.0.1
89
github.com/ipfs/go-log v0.0.1
910
github.com/libp2p/go-libp2p-blankhost v0.1.1
1011
github.com/libp2p/go-libp2p-core v0.0.1
11-
github.com/libp2p/go-libp2p-pubsub v0.1.0
12+
github.com/libp2p/go-libp2p-pubsub v0.1.1
1213
github.com/libp2p/go-libp2p-record v0.1.0
1314
github.com/libp2p/go-libp2p-routing-helpers v0.1.0
1415
github.com/libp2p/go-libp2p-swarm v0.1.0

‎go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUje
9595
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
9696
github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E=
9797
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
98-
github.com/libp2p/go-libp2p-pubsub v0.1.0 h1:SmQeMa7IUv5vadh0fYgYsafWCBA1sCy5d/68kIYqGcU=
99-
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
98+
github.com/libp2p/go-libp2p-pubsub v0.1.1 h1:phDnQvO3H3hAgaEEQi6yt3LILqIYVXaw05bxzezrEwQ=
99+
github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
100100
github.com/libp2p/go-libp2p-record v0.1.0 h1:wHwBGbFzymoIl69BpgwIu0O6ta3TXGcMPvHUAcodzRc=
101101
github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q=
102102
github.com/libp2p/go-libp2p-routing-helpers v0.1.0 h1:BaFvpyv8TyhCN7TihawTiKuzeu8/Pyw7ZnMA4IvqIN8=

‎pb/Makefile

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
PB = $(wildcard *.proto)
2+
GO = $(PB:.proto=.pb.go)
3+
4+
all: $(GO)
5+
6+
%.pb.go: %.proto
7+
protoc --proto_path="$(GOPATH)/src" --proto_path="." --gogofast_out=. $<
8+
9+
clean:
10+
rm -f *.pb.go
11+
rm -f *.go

0 commit comments

Comments
 (0)
Please sign in to comment.