-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Persistence Layer on top of PubSub #33
Changes from 23 commits
43ebd02
f57d48c
e2a5fca
ff89016
90c980a
3ae7b1f
6fb9435
f3f8dd4
9e9d778
a24054d
092d0e1
3bf24fa
e9b0864
f85f2bc
9756263
a4d4d6c
97ac549
b0eeb77
4fbc97b
aa737d1
ded823d
76d8fca
1913974
fdbeaec
b303f91
0787cb5
41f6fb8
981b38c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package namesys | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"time" | ||
|
||
ggio "github.com/gogo/protobuf/io" | ||
"github.com/gogo/protobuf/proto" | ||
|
||
"github.com/libp2p/go-libp2p-core/helpers" | ||
"github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
|
||
pb "github.com/libp2p/go-libp2p-pubsub-router/pb" | ||
) | ||
|
||
const FetchProtoID = protocol.ID("/libp2p/fetch/0.0.1") | ||
|
||
type fetchProtocol struct { | ||
ctx context.Context | ||
host host.Host | ||
} | ||
|
||
type getValue func(key string) ([]byte, error) | ||
|
||
func newFetchProtocol(ctx context.Context, host host.Host, getData getValue) *fetchProtocol { | ||
p := &fetchProtocol{ctx, host} | ||
|
||
host.SetStreamHandler(FetchProtoID, func(s network.Stream) { | ||
p.receive(s, getData) | ||
}) | ||
|
||
return p | ||
} | ||
|
||
func (p *fetchProtocol) receive(s network.Stream, getData getValue) { | ||
defer helpers.FullClose(s) | ||
|
||
msg := &pb.FetchRequest{} | ||
if err := readMsg(p.ctx, s, msg); err != nil { | ||
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err) | ||
s.Reset() | ||
return | ||
} | ||
|
||
response, err := getData(msg.Identifier) | ||
var respProto pb.FetchResponse | ||
|
||
if err != nil { | ||
respProto = pb.FetchResponse{Status: pb.FetchResponse_NOT_FOUND} | ||
} else { | ||
respProto = pb.FetchResponse{Data: response} | ||
} | ||
|
||
if err := writeMsg(p.ctx, s, &respProto); err != nil { | ||
return | ||
} | ||
} | ||
|
||
func (p fetchProtocol) Get(ctx context.Context, pid peer.ID, key string) ([]byte, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: inconsistent receiver. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think this is fixed/better now. Just called it |
||
peerCtx, cancel := context.WithTimeout(ctx, time.Second*10) | ||
defer cancel() | ||
|
||
s, err := p.host.NewStream(peerCtx, pid, FetchProtoID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer helpers.FullClose(s) | ||
|
||
msg := &pb.FetchRequest{Identifier: key} | ||
|
||
if err := writeMsg(ctx, s, msg); err != nil { | ||
return nil, err | ||
} | ||
s.Close() | ||
|
||
response := &pb.FetchResponse{} | ||
if err := readMsg(ctx, s, response); err != nil { | ||
return nil, err | ||
} | ||
|
||
switch response.Status { | ||
case pb.FetchResponse_OK: | ||
return response.Data, nil | ||
case pb.FetchResponse_NOT_FOUND: | ||
return nil, nil | ||
default: | ||
return nil, errors.New("get-latest: received unknown status code") | ||
} | ||
} | ||
|
||
func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error { | ||
done := make(chan error, 1) | ||
go func() { | ||
wc := ggio.NewDelimitedWriter(s) | ||
|
||
if err := wc.WriteMsg(msg); err != nil { | ||
done <- err | ||
return | ||
} | ||
|
||
done <- nil | ||
}() | ||
|
||
var retErr error | ||
select { | ||
case retErr = <-done: | ||
case <-ctx.Done(): | ||
retErr = ctx.Err() | ||
} | ||
|
||
if retErr != nil { | ||
s.Reset() | ||
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr) | ||
} | ||
return retErr | ||
} | ||
|
||
func readMsg(ctx context.Context, s network.Stream, msg proto.Message) error { | ||
done := make(chan error, 1) | ||
go func() { | ||
r := ggio.NewDelimitedReader(s, 1<<20) | ||
if err := r.ReadMsg(msg); err != nil { | ||
done <- err | ||
return | ||
} | ||
done <- nil | ||
}() | ||
|
||
select { | ||
case err := <-done: | ||
return err | ||
case <-ctx.Done(): | ||
s.Reset() | ||
return ctx.Err() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package namesys | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
) | ||
|
||
func connect(t *testing.T, a, b host.Host) { | ||
pinfo := a.Peerstore().PeerInfo(a.ID()) | ||
err := b.Connect(context.Background(), pinfo) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
type datastore struct { | ||
data map[string][]byte | ||
} | ||
|
||
func (d *datastore) Lookup(key string) ([]byte, error) { | ||
v, ok := d.data[key] | ||
if !ok { | ||
return nil, errors.New("key not found") | ||
} | ||
return v, nil | ||
} | ||
|
||
func TestFetchProtocolTrip(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
hosts := newNetHosts(ctx, t, 2) | ||
connect(t, hosts[0], hosts[1]) | ||
|
||
// wait for hosts to get connected | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
d1 := &datastore{map[string][]byte{"key": []byte("value1")}} | ||
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup) | ||
|
||
d2 := &datastore{map[string][]byte{"key": []byte("value2")}} | ||
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) | ||
|
||
fetchCheck(t, ctx, h1, h2, "key", []byte("value2")) | ||
fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) | ||
} | ||
|
||
func TestFetchProtocolNotFound(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
hosts := newNetHosts(ctx, t, 2) | ||
connect(t, hosts[0], hosts[1]) | ||
|
||
// wait for hosts to get connected | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
d1 := &datastore{map[string][]byte{"key": []byte("value1")}} | ||
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup) | ||
|
||
d2 := &datastore{make(map[string][]byte)} | ||
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) | ||
|
||
fetchCheck(t, ctx, h1, h2, "key", nil) | ||
fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) | ||
} | ||
|
||
func TestFetchProtocolRepeated(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
hosts := newNetHosts(ctx, t, 2) | ||
connect(t, hosts[0], hosts[1]) | ||
|
||
// wait for hosts to get connected | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
d1 := &datastore{map[string][]byte{"key": []byte("value1")}} | ||
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup) | ||
|
||
d2 := &datastore{make(map[string][]byte)} | ||
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) | ||
|
||
for i := 0; i < 10; i++ { | ||
fetchCheck(t, ctx, h1, h2, "key", nil) | ||
fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) | ||
} | ||
} | ||
|
||
func fetchCheck(t *testing.T, ctx context.Context, | ||
requester *fetchProtocol, responder *fetchProtocol, key string, expected []byte) { | ||
data, err := requester.Get(ctx, responder.host.ID(), key) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if !bytes.Equal(data, expected) { | ||
t.Fatalf("expected: %v, received: %v", string(expected), string(data)) | ||
} | ||
|
||
if (data == nil && expected != nil) || (data != nil && expected == nil) { | ||
t.Fatalf("expected []byte{} or nil and received the opposite") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
module github.com/libp2p/go-libp2p-pubsub-router | ||
|
||
require ( | ||
github.com/gogo/protobuf v1.2.1 | ||
github.com/ipfs/go-cid v0.0.2 | ||
github.com/ipfs/go-datastore v0.0.5 | ||
github.com/ipfs/go-ipfs-ds-help v0.0.1 | ||
|
@@ -13,3 +14,5 @@ require ( | |
github.com/libp2p/go-libp2p-routing-helpers v0.1.0 | ||
github.com/libp2p/go-libp2p-swarm v0.1.0 | ||
) | ||
|
||
replace github.com/libp2p/go-libp2p-pubsub v0.1.0 => github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now uses a version of libp2p/pubsub from master. However, there hasn't been a release yet so it's referencing |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
PB = $(wildcard *.proto) | ||
GO = $(PB:.proto=.pb.go) | ||
|
||
ifeq ($(OS),Windows_NT) | ||
LIST_SEPARATOR = ; | ||
else | ||
LIST_SEPARATOR = : | ||
endif | ||
|
||
all: $(GO) | ||
|
||
%.pb.go: %.proto | ||
protoc --proto_path="$(GOPATH)/src$(LIST_SEPARATOR)." --gogofast_out=. $< | ||
|
||
clean: | ||
rm -f *.pb.go | ||
rm -f *.go |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/libp2p/record-fetch/1
(or something). "fetch" is too generic (does it fetch blocks? keys? everything?). Users are going to try to use it to fetch IPLD blocks and be very confused.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I figured that it would just fetch anything (i.e.
[]byte
), and if for some reason someone wanted to configure their nodes to accept/ipfs/bafyABC
via fetch and return a block that's something that would be up to them to do. This would also make it easier for people to reuse this protocol within or on top of other protocols.Is that too generic/confusing for people? Should we just restrict it to records (i.e. things with validators)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually really like this. +1