Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Persistence Layer on top of PubSub #33

Merged
merged 28 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
43ebd02
removed bootstrapping functionality
aschmahmann May 12, 2019
f57d48c
start pubsub validator changes to implement LWW pubsub
aschmahmann May 22, 2019
e2a5fca
Merge branch 'master' into feat/persistence
aschmahmann May 31, 2019
ff89016
Implement persistence --temporary
aschmahmann Jun 14, 2019
90c980a
Added message protobuf that was missing from previous commit
aschmahmann Jun 21, 2019
3ae7b1f
Use changes from ongoing pubsub PRs
aschmahmann Jul 3, 2019
6fb9435
removed unused protobufs
aschmahmann Jul 3, 2019
f3f8dd4
records past EOL should fail
aschmahmann Jul 3, 2019
9e9d778
reorder imports
aschmahmann Jul 3, 2019
a24054d
Improved the get-latest protocol (protobufs for request and response,…
aschmahmann Aug 9, 2019
092d0e1
better context cancel in get-latest protocol
aschmahmann Aug 9, 2019
3bf24fa
restore bootstrapping ... for now
aschmahmann Aug 9, 2019
e9b0864
In get-latest tests wait a bit after connecting hosts so they have ti…
aschmahmann Aug 9, 2019
f85f2bc
Changed get-latest protocol to have responses with a status code.
aschmahmann Aug 9, 2019
9756263
get-latest responds with ERR message even when sender sends an incorr…
aschmahmann Aug 9, 2019
a4d4d6c
Removed ERR from protobuf and we just reset the stream when we encoun…
aschmahmann Aug 9, 2019
97ac549
protobuf Makefile supports spaces in path name
aschmahmann Aug 9, 2019
b0eeb77
fixed potential goroutine leak. switched order of protobuf fields.
aschmahmann Aug 11, 2019
4fbc97b
Small Makefile refactor
aschmahmann Aug 11, 2019
aa737d1
changed get-latest protocol to be called fetch. some refactoring
aschmahmann Aug 12, 2019
ded823d
renamed protocol files to match protocol rename
aschmahmann Aug 12, 2019
76d8fca
made function passed into the fetch protocol a typedef
aschmahmann Aug 14, 2019
1913974
renames in the fetch protobufs
aschmahmann Aug 15, 2019
fdbeaec
Makefile is more MSYS friendly but you still need a weird GOPATH
aschmahmann Aug 15, 2019
b303f91
Updated go.mod to use unreleased version of pubsub. Refactored Fetch …
aschmahmann Aug 16, 2019
0787cb5
rebroadcast initial delay using timer
aschmahmann Aug 16, 2019
41f6fb8
Added Error status code to Fetch protobuf. Currently unused.
aschmahmann Aug 16, 2019
981b38c
use go-libp2p-pubsub v0.1.1. Fix `Fetch` function to be a pointer rec…
aschmahmann Aug 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions getlatest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package namesys

import (
"context"
"errors"
"time"

ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"

"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
)

type getLatestProtocol struct {
ctx context.Context
host host.Host
}

func newGetLatestProtocol(ctx context.Context, host host.Host, getLocal func(key string) ([]byte, error)) *getLatestProtocol {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably make getLocal part of the type, given its usage pattern.
Also, better to typedef the function type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typedef 👍 (or an interface would also be fine)
part of the type 👎

If you'd prefer passing in an interface we can do that, but there's no reason to tie this to any particular implementation of "get stuff". We use a data store for this in the rest of this library, but data store has put + get, and we only need get.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to be part of the type i think...
no need for an interface, just a function typedef will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, although we'll need to revisit this if/when we expose this function publicly since an interface will likely be more appropriate.

p := &getLatestProtocol{ctx, host}

host.SetStreamHandler(PSGetLatestProto, func(s network.Stream) {
p.receive(s, getLocal)
})

return p
}

func (p *getLatestProtocol) receive(s network.Stream, getLocal func(key string) ([]byte, error)) {
defer helpers.FullClose(s)

msg := &pb.RequestLatest{}
if err := readMsg(p.ctx, s, msg); err != nil {
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err)
s.Reset()
return
}

response, err := getLocal(msg.Identifier)
var respProto pb.RespondLatest

if err != nil {
respProto = pb.RespondLatest{Status: pb.RespondLatest_NOT_FOUND}
} else {
respProto = pb.RespondLatest{Data: response}
}

if err := writeMsg(p.ctx, s, &respProto); err != nil {
return
}
}

func (p getLatestProtocol) Get(ctx context.Context, pid peer.ID, key string) ([]byte, error) {
peerCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

s, err := p.host.NewStream(peerCtx, pid, PSGetLatestProto)
if err != nil {
return nil, err
}
defer helpers.FullClose(s)

msg := &pb.RequestLatest{Identifier: key}

if err := writeMsg(ctx, s, msg); err != nil {
return nil, err
}
s.Close()

response := &pb.RespondLatest{}
if err := readMsg(ctx, s, response); err != nil {
return nil, err
}

switch response.Status {
case pb.RespondLatest_SUCCESS:
return response.Data, nil
case pb.RespondLatest_NOT_FOUND:
return nil, nil
default:
return nil, errors.New("get-latest: received unknown status code")
}
}

func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
done := make(chan error)
go func() {
wc := ggio.NewDelimitedWriter(s)

if err := wc.WriteMsg(msg); err != nil {
done <- err
return
}

done <- nil
}()

var retErr error
select {
case retErr = <-done:
case <-ctx.Done():
retErr = ctx.Err()
}

if retErr != nil {
s.Reset()
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr)
}
return retErr
}

func readMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
done := make(chan error)
go func() {
r := ggio.NewDelimitedReader(s, 1<<20)
if err := r.ReadMsg(msg); err != nil {
done <- err
return
}
done <- nil
}()

select {
case err := <-done:
return err
case <-ctx.Done():
s.Reset()
return ctx.Err()
}
}
109 changes: 109 additions & 0 deletions getlatest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package namesys

import (
"bytes"
"context"
"errors"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/host"
)

func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}

type datastore struct {
data map[string][]byte
}

func (d *datastore) Lookup(key string) ([]byte, error) {
v, ok := d.data[key]
if !ok {
return nil, errors.New("key not found")
}
return v, nil
}

func TestGetLatestProtocolTrip(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newGetLatestProtocol(ctx, hosts[0], d1.Lookup)

d2 := &datastore{map[string][]byte{"key": []byte("value2")}}
h2 := newGetLatestProtocol(ctx, hosts[1], d2.Lookup)

getLatest(t, ctx, h1, h2, "key", []byte("value2"))
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
}

func TestGetLatestProtocolNotFound(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newGetLatestProtocol(ctx, hosts[0], d1.Lookup)

d2 := &datastore{make(map[string][]byte)}
h2 := newGetLatestProtocol(ctx, hosts[1], d2.Lookup)

getLatest(t, ctx, h1, h2, "key", nil)
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
}

func TestGetLatestProtocolRepeated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newGetLatestProtocol(ctx, hosts[0], d1.Lookup)

d2 := &datastore{make(map[string][]byte)}
h2 := newGetLatestProtocol(ctx, hosts[1], d2.Lookup)

for i := 0; i < 10; i++ {
getLatest(t, ctx, h1, h2, "key", nil)
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
}
}

func getLatest(t *testing.T, ctx context.Context,
requester *getLatestProtocol, responder *getLatestProtocol, key string, expected []byte) {
data, err := requester.Get(ctx, responder.host.ID(), key)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(data, expected) {
t.Fatalf("expected: %v, received: %v", string(expected), string(data))
}

if (data == nil && expected != nil) || (data != nil && expected == nil) {
t.Fatalf("expected []byte{} or nil and received the opposite")
}
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/libp2p/go-libp2p-pubsub-router

require (
github.com/gogo/protobuf v1.2.1
github.com/ipfs/go-cid v0.0.2
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-ds-help v0.0.1
Expand All @@ -13,3 +14,5 @@ require (
github.com/libp2p/go-libp2p-routing-helpers v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
)

replace github.com/libp2p/go-libp2p-pubsub v0.1.0 => github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now uses a version of libp2p/pubsub from master. However, there hasn't been a release yet so it's referencing v0.1.1-0.20190807100218-9f04364996b4

5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd h1:P1//443gNhGLDpMpzwnaNDgKoHpXrlt+iIEVgwlTHoI=
github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd/go.mod h1:ekhyliBSJ0aBg62+j9rECrxW+UUPAj1SLS3jN7JMAGc=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
Expand Down Expand Up @@ -86,6 +88,7 @@ github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand All @@ -95,8 +98,6 @@ github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUje
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E=
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-pubsub v0.1.0 h1:SmQeMa7IUv5vadh0fYgYsafWCBA1sCy5d/68kIYqGcU=
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
github.com/libp2p/go-libp2p-record v0.1.0 h1:wHwBGbFzymoIl69BpgwIu0O6ta3TXGcMPvHUAcodzRc=
github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q=
github.com/libp2p/go-libp2p-routing-helpers v0.1.0 h1:BaFvpyv8TyhCN7TihawTiKuzeu8/Pyw7ZnMA4IvqIN8=
Expand Down
17 changes: 17 additions & 0 deletions pb/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)

ifeq ($(OS),Windows_NT)
GOPATH_DELIMITER = ;
else
GOPATH_DELIMITER = :
endif

all: $(GO)

%.pb.go: %.proto
protoc --proto_path="$(GOPATH)/src$(GOPATH_DELIMITER)." --gogofast_out=. $<

clean:
rm -f *.pb.go
rm -f *.go
Loading