-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Persistence Layer on top of PubSub #33
Merged
Merged
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
43ebd02
removed bootstrapping functionality
aschmahmann f57d48c
start pubsub validator changes to implement LWW pubsub
aschmahmann e2a5fca
Merge branch 'master' into feat/persistence
aschmahmann ff89016
Implement persistence --temporary
aschmahmann 90c980a
Added message protobuf that was missing from previous commit
aschmahmann 3ae7b1f
Use changes from ongoing pubsub PRs
aschmahmann 6fb9435
removed unused protobufs
aschmahmann f3f8dd4
records past EOL should fail
aschmahmann 9e9d778
reorder imports
aschmahmann a24054d
Improved the get-latest protocol (protobufs for request and response,…
aschmahmann 092d0e1
better context cancel in get-latest protocol
aschmahmann 3bf24fa
restore bootstrapping ... for now
aschmahmann e9b0864
In get-latest tests wait a bit after connecting hosts so they have ti…
aschmahmann f85f2bc
Changed get-latest protocol to have responses with a status code.
aschmahmann 9756263
get-latest responds with ERR message even when sender sends an incorr…
aschmahmann a4d4d6c
Removed ERR from protobuf and we just reset the stream when we encoun…
aschmahmann 97ac549
protobuf Makefile supports spaces in path name
aschmahmann b0eeb77
fixed potential goroutine leak. switched order of protobuf fields.
aschmahmann 4fbc97b
Small Makefile refactor
aschmahmann aa737d1
changed get-latest protocol to be called fetch. some refactoring
aschmahmann ded823d
renamed protocol files to match protocol rename
aschmahmann 76d8fca
made function passed into the fetch protocol a typedef
aschmahmann 1913974
renames in the fetch protobufs
aschmahmann fdbeaec
Makefile is more MSYS friendly but you still need a weird GOPATH
aschmahmann b303f91
Updated go.mod to use unreleased version of pubsub. Refactored Fetch …
aschmahmann 0787cb5
rebroadcast initial delay using timer
aschmahmann 41f6fb8
Added Error status code to Fetch protobuf. Currently unused.
aschmahmann 981b38c
use go-libp2p-pubsub v0.1.1. Fix `Fetch` function to be a pointer rec…
aschmahmann File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package namesys | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"time" | ||
|
||
ggio "github.com/gogo/protobuf/io" | ||
"github.com/gogo/protobuf/proto" | ||
|
||
"github.com/libp2p/go-libp2p-core/helpers" | ||
"github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
|
||
pb "github.com/libp2p/go-libp2p-pubsub-router/pb" | ||
) | ||
|
||
const FetchProtoID = protocol.ID("/libp2p/fetch/0.0.1") | ||
|
||
type fetchProtocol struct { | ||
ctx context.Context | ||
host host.Host | ||
} | ||
|
||
type getValue func(key string) ([]byte, error) | ||
|
||
func newFetchProtocol(ctx context.Context, host host.Host, getData getValue) *fetchProtocol { | ||
p := &fetchProtocol{ctx, host} | ||
|
||
host.SetStreamHandler(FetchProtoID, func(s network.Stream) { | ||
p.receive(s, getData) | ||
}) | ||
|
||
return p | ||
} | ||
|
||
func (p *fetchProtocol) receive(s network.Stream, getData getValue) { | ||
defer helpers.FullClose(s) | ||
|
||
msg := &pb.FetchRequest{} | ||
if err := readMsg(p.ctx, s, msg); err != nil { | ||
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err) | ||
s.Reset() | ||
return | ||
} | ||
|
||
response, err := getData(msg.Identifier) | ||
var respProto pb.FetchResponse | ||
|
||
if err != nil { | ||
respProto = pb.FetchResponse{Status: pb.FetchResponse_NOT_FOUND} | ||
} else { | ||
respProto = pb.FetchResponse{Data: response} | ||
} | ||
|
||
if err := writeMsg(p.ctx, s, &respProto); err != nil { | ||
return | ||
} | ||
} | ||
|
||
func (p *fetchProtocol) Fetch(ctx context.Context, pid peer.ID, key string) ([]byte, error) { | ||
peerCtx, cancel := context.WithTimeout(ctx, time.Second*10) | ||
defer cancel() | ||
|
||
s, err := p.host.NewStream(peerCtx, pid, FetchProtoID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer helpers.FullClose(s) | ||
|
||
msg := &pb.FetchRequest{Identifier: key} | ||
|
||
if err := writeMsg(ctx, s, msg); err != nil { | ||
return nil, err | ||
} | ||
s.Close() | ||
|
||
response := &pb.FetchResponse{} | ||
if err := readMsg(ctx, s, response); err != nil { | ||
return nil, err | ||
} | ||
|
||
switch response.Status { | ||
case pb.FetchResponse_OK: | ||
return response.Data, nil | ||
case pb.FetchResponse_NOT_FOUND: | ||
return nil, nil | ||
default: | ||
return nil, errors.New("fetch: received unknown status code") | ||
} | ||
} | ||
|
||
func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error { | ||
done := make(chan error, 1) | ||
go func() { | ||
wc := ggio.NewDelimitedWriter(s) | ||
|
||
if err := wc.WriteMsg(msg); err != nil { | ||
done <- err | ||
return | ||
} | ||
|
||
done <- nil | ||
}() | ||
|
||
var retErr error | ||
select { | ||
case retErr = <-done: | ||
case <-ctx.Done(): | ||
retErr = ctx.Err() | ||
} | ||
|
||
if retErr != nil { | ||
s.Reset() | ||
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr) | ||
} | ||
return retErr | ||
} | ||
|
||
func readMsg(ctx context.Context, s network.Stream, msg proto.Message) error { | ||
done := make(chan error, 1) | ||
go func() { | ||
r := ggio.NewDelimitedReader(s, 1<<20) | ||
if err := r.ReadMsg(msg); err != nil { | ||
done <- err | ||
return | ||
} | ||
done <- nil | ||
}() | ||
|
||
select { | ||
case err := <-done: | ||
return err | ||
case <-ctx.Done(): | ||
s.Reset() | ||
return ctx.Err() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package namesys | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
) | ||
|
||
func connect(t *testing.T, a, b host.Host) { | ||
pinfo := a.Peerstore().PeerInfo(a.ID()) | ||
err := b.Connect(context.Background(), pinfo) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
type datastore struct { | ||
data map[string][]byte | ||
} | ||
|
||
func (d *datastore) Lookup(key string) ([]byte, error) { | ||
v, ok := d.data[key] | ||
if !ok { | ||
return nil, errors.New("key not found") | ||
} | ||
return v, nil | ||
} | ||
|
||
func TestFetchProtocolTrip(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
hosts := newNetHosts(ctx, t, 2) | ||
connect(t, hosts[0], hosts[1]) | ||
|
||
// wait for hosts to get connected | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
d1 := &datastore{map[string][]byte{"key": []byte("value1")}} | ||
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup) | ||
|
||
d2 := &datastore{map[string][]byte{"key": []byte("value2")}} | ||
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) | ||
|
||
fetchCheck(t, ctx, h1, h2, "key", []byte("value2")) | ||
fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) | ||
} | ||
|
||
func TestFetchProtocolNotFound(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
hosts := newNetHosts(ctx, t, 2) | ||
connect(t, hosts[0], hosts[1]) | ||
|
||
// wait for hosts to get connected | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
d1 := &datastore{map[string][]byte{"key": []byte("value1")}} | ||
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup) | ||
|
||
d2 := &datastore{make(map[string][]byte)} | ||
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) | ||
|
||
fetchCheck(t, ctx, h1, h2, "key", nil) | ||
fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) | ||
} | ||
|
||
func TestFetchProtocolRepeated(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
hosts := newNetHosts(ctx, t, 2) | ||
connect(t, hosts[0], hosts[1]) | ||
|
||
// wait for hosts to get connected | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
d1 := &datastore{map[string][]byte{"key": []byte("value1")}} | ||
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup) | ||
|
||
d2 := &datastore{make(map[string][]byte)} | ||
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup) | ||
|
||
for i := 0; i < 10; i++ { | ||
fetchCheck(t, ctx, h1, h2, "key", nil) | ||
fetchCheck(t, ctx, h2, h1, "key", []byte("value1")) | ||
} | ||
} | ||
|
||
func fetchCheck(t *testing.T, ctx context.Context, | ||
requester *fetchProtocol, responder *fetchProtocol, key string, expected []byte) { | ||
data, err := requester.Fetch(ctx, responder.host.ID(), key) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if !bytes.Equal(data, expected) { | ||
t.Fatalf("expected: %v, received: %v", string(expected), string(data)) | ||
} | ||
|
||
if (data == nil && expected != nil) || (data != nil && expected == nil) { | ||
t.Fatalf("expected []byte{} or nil and received the opposite") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
PB = $(wildcard *.proto) | ||
GO = $(PB:.proto=.pb.go) | ||
|
||
all: $(GO) | ||
|
||
%.pb.go: %.proto | ||
protoc --proto_path="$(GOPATH)/src" --proto_path="." --gogofast_out=. $< | ||
|
||
clean: | ||
rm -f *.pb.go | ||
rm -f *.go |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/libp2p/record-fetch/1
(or something). "fetch" is too generic (does it fetch blocks? keys? everything?). Users are going to try to use it to fetch IPLD blocks and be very confused.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I figured that it would just fetch anything (i.e.
[]byte
), and if for some reason someone wanted to configure their nodes to accept/ipfs/bafyABC
via fetch and return a block that's something that would be up to them to do. This would also make it easier for people to reuse this protocol within or on top of other protocols.Is that too generic/confusing for people? Should we just restrict it to records (i.e. things with validators)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually really like this. +1