Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reply to "wants" messages with "has" messages #12

Merged
merged 7 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions cmd/ssb-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/planetary-social/go-ssb/service/domain/feeds/formats"
"github.com/planetary-social/go-ssb/service/domain/identity"
"github.com/planetary-social/go-ssb/service/domain/invites"
"github.com/planetary-social/go-ssb/service/domain/network"
"github.com/planetary-social/go-ssb/service/domain/refs"
"github.com/planetary-social/go-ssb/service/domain/transport/boxstream"
"github.com/sirupsen/logrus"
)
Expand All @@ -33,27 +31,27 @@ func main() {
}

var (
myPatchwork = refs.MustNewIdentity("@qFtLJ6P5Eh9vKxnj7Rsh8SkE6B6Z36DVLP7ZOKNeQ/Y=.ed25519")
myPatchworkConnect = commands.Connect{
Remote: myPatchwork.Identity(),
Address: network.NewAddress("127.0.0.1:8008"),
}

//localGoSSB = refs.MustNewIdentity("@ln1Bdt8lEy4/F/szWlFVAIAIdCBKmzH2MNEVad8BWus=.ed25519")
//localGoSSBConnect = commands.Connect{
// Remote: localGoSSB.Identity(),
// Address: network.NewAddress("127.0.0.1:8008"),
//}

mainnetPub = invites.MustNewInviteFromString("one.planetary.pub:8008:@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519~KVvak/aZeQJQUrn1imLIvwU+EVTkCzGW8TJWTmK8lOk=")

//soapdog = refs.MustNewIdentity("@qv10rF4IsmxRZb7g5ekJ33EakYBpdrmV/vtP1ij5BS4=.ed25519")

//pub = refs.MustNewIdentity("@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519")
//hubConnect = commands2.Connect{
// Remote: pub.Identity(),
// Address: network2.NewAddress("one.planetary.pub:8008"),
//}
//myPatchwork = refs.MustNewIdentity("@qFtLJ6P5Eh9vKxnj7Rsh8SkE6B6Z36DVLP7ZOKNeQ/Y=.ed25519")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important, this whole file is just a test program not used in production.

//myPatchworkConnect = commands.Connect{
// Remote: myPatchwork.Identity(),
// Address: network.NewAddress("127.0.0.1:8008"),
//}

//localGoSSB = refs.MustNewIdentity("@ln1Bdt8lEy4/F/szWlFVAIAIdCBKmzH2MNEVad8BWus=.ed25519")
//localGoSSBConnect = commands.Connect{
// Remote: localGoSSB.Identity(),
// Address: network.NewAddress("127.0.0.1:8008"),
//}

//mainnetPub = invites.MustNewInviteFromString("one.planetary.pub:8008:@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519~KVvak/aZeQJQUrn1imLIvwU+EVTkCzGW8TJWTmK8lOk=")

//soapdog = refs.MustNewIdentity("@qv10rF4IsmxRZb7g5ekJ33EakYBpdrmV/vtP1ij5BS4=.ed25519")

//pub = refs.MustNewIdentity("@CIlwTOK+m6v1hT2zUVOCJvvZq7KE/65ErN6yA2yrURY=.ed25519")
//hubConnect = commands2.Connect{
// Remote: pub.Identity(),
// Address: network2.NewAddress("one.planetary.pub:8008"),
//}
)

var (
Expand Down
1 change: 1 addition & 0 deletions di/inject_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var blobsAdaptersSet = wire.NewSet(
newFilesystemStorage,
wire.Bind(new(blobReplication.BlobStorage), new(*blobs.FilesystemStorage)),
wire.Bind(new(queries.BlobStorage), new(*blobs.FilesystemStorage)),
wire.Bind(new(blobReplication.BlobSizeRepository), new(*blobs.FilesystemStorage)),
)

func newFilesystemStorage(logger logging.Logger, config Config) (*blobs.FilesystemStorage, error) {
Expand Down
2 changes: 1 addition & 1 deletion di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions service/adapters/blobs/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/boreq/errors"
"github.com/planetary-social/go-ssb/logging"
"github.com/planetary-social/go-ssb/service/domain/blobs"
"github.com/planetary-social/go-ssb/service/domain/blobs/replication"
"github.com/planetary-social/go-ssb/service/domain/refs"
)

Expand Down Expand Up @@ -93,6 +94,9 @@ func (f FilesystemStorage) Size(id refs.Blob) (blobs.Size, error) {
name := f.pathStorage(id)
fi, err := os.Stat(name)
if err != nil {
if os.IsNotExist(err) {
return blobs.Size{}, replication.ErrBlobNotFound
}
return blobs.Size{}, errors.Wrap(err, "stat failed")
}
return blobs.NewSize(fi.Size())
Expand Down
16 changes: 16 additions & 0 deletions service/adapters/blobs/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/planetary-social/go-ssb/logging"
"github.com/planetary-social/go-ssb/service/adapters/blobs"
blobsdomain "github.com/planetary-social/go-ssb/service/domain/blobs"
blobReplication "github.com/planetary-social/go-ssb/service/domain/blobs/replication"
"github.com/planetary-social/go-ssb/service/domain/refs"
"github.com/stretchr/testify/require"
)
Expand All @@ -25,6 +26,10 @@ func TestStorage(t *testing.T) {
err = storage.Store(id, r)
require.NoError(t, err)

size, err := storage.Size(id)
require.NoError(t, err)
require.EqualValues(t, len(data), size.InBytes())

rc, err := storage.Get(id)
require.NoError(t, err)
defer rc.Close()
Expand All @@ -35,6 +40,17 @@ func TestStorage(t *testing.T) {
require.Equal(t, data, readData)
}

func TestSizeReturnsBlobNotFound(t *testing.T) {
directory := fixtures.Directory(t)
logger := logging.NewDevNullLogger()

storage, err := blobs.NewFilesystemStorage(directory, logger)
require.NoError(t, err)

_, err = storage.Size(fixtures.SomeRefBlob())
require.ErrorIs(t, err, blobReplication.ErrBlobNotFound)
}

func newFakeBlob(t *testing.T) (refs.Blob, io.Reader, []byte) {
buf := &bytes.Buffer{}

Expand Down
29 changes: 29 additions & 0 deletions service/domain/blobs/replication/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package replication
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the code in this file was moved here from other files.


import (
"context"
"io"

"github.com/boreq/errors"
"github.com/planetary-social/go-ssb/service/domain/blobs"
"github.com/planetary-social/go-ssb/service/domain/refs"
"github.com/planetary-social/go-ssb/service/domain/transport"
)

type WantListStorage interface {
GetWantList() (blobs.WantList, error)
}

type Downloader interface {
OnHasReceived(ctx context.Context, peer transport.Peer, blob refs.Blob, size blobs.Size)
}

var ErrBlobNotFound = errors.New("blob not found")

type BlobStorage interface {
Store(id refs.Blob, r io.Reader) error

// Size returns the size of the blob. If the blob is not found it returns
// ErrBlobNotFound.
Size(id refs.Blob) (blobs.Size, error)
}
4 changes: 0 additions & 4 deletions service/domain/blobs/replication/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
"github.com/planetary-social/go-ssb/service/domain/transport/rpc"
)

type BlobStorage interface {
Store(id refs.Blob, r io.Reader) error
}

type BlobsGetDownloader struct {
storage BlobStorage
logger logging.Logger
Expand Down
122 changes: 41 additions & 81 deletions service/domain/blobs/replication/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,118 +2,78 @@ package replication

import (
"context"
"time"
"sync"

"github.com/boreq/errors"
"github.com/planetary-social/go-ssb/logging"
"github.com/planetary-social/go-ssb/service/domain/blobs"
"github.com/planetary-social/go-ssb/service/domain/messages"
"github.com/planetary-social/go-ssb/service/domain/refs"
"github.com/planetary-social/go-ssb/service/domain/transport"
"github.com/planetary-social/go-ssb/service/domain/transport/rpc"
)

type WantListStorage interface {
GetWantList() (blobs.WantList, error)
}

type Downloader interface {
OnHasReceived(ctx context.Context, peer transport.Peer, blob refs.Blob, size blobs.Size)
}

type Manager struct {
storage WantListStorage
downloader Downloader
logger logging.Logger
wantListStorage WantListStorage
blobStorage BlobSizeRepository
downloader Downloader
logger logging.Logger

// todo cleanup processes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be done later, it could even stay like this all the way to production unless we ran the app for a very long time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the underlying process goroutines for incoming and outgoing loops will get terminated on connection context termination anyway, right? So the only thing we'd have to improve would be reacting to connection termination by deleting the process struct from this map (for example by running a goroutine awaiting ctx close while creating a new process in getOrCreateProcess). Do I understand the problem correctly?
I'm actually ok with leaving it as it is if that's only about cleaning up the process struct. As I can imagine the map won't grow huge in our normal use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

processes map[rpc.ConnectionId]*WantsProcess
lock sync.Mutex // guards processes
}

func NewManager(storage WantListStorage, downloader Downloader, logger logging.Logger) *Manager {
func NewManager(
wantListStorage WantListStorage,
blobStorage BlobSizeRepository,
downloader Downloader,
logger logging.Logger,
) *Manager {
return &Manager{
storage: storage,
downloader: downloader,
logger: logger.New("replication_manager"),
wantListStorage: wantListStorage,
blobStorage: blobStorage,
downloader: downloader,
processes: make(map[rpc.ConnectionId]*WantsProcess),
logger: logger.New("replication_manager"),
}
}

func (r *Manager) HandleIncomingCreateWantsRequest(ctx context.Context) (<-chan messages.BlobWithSizeOrWantDistance, error) {
func (m *Manager) HandleIncomingCreateWantsRequest(ctx context.Context) (<-chan messages.BlobWithSizeOrWantDistance, error) {
connectionId, ok := rpc.GetConnectionIdFromContext(ctx)
if !ok {
return nil, errors.New("connection id not found in context")
}
r.logger.WithField("connectionId", connectionId).Debug("incoming create wants")

m.lock.Lock()
defer m.lock.Unlock()

ch := make(chan messages.BlobWithSizeOrWantDistance)
go r.sendWantListPeriodically(ctx, ch)
m.getOrCreateProcess(connectionId).AddIncoming(ctx, ch)
return ch, nil
}

func (r *Manager) HandleOutgoingCreateWantsRequest(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) error {
func (m *Manager) HandleOutgoingCreateWantsRequest(ctx context.Context, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) error {
connectionId, ok := rpc.GetConnectionIdFromContext(ctx)
if !ok {
return errors.New("connection id not found in context")
}
r.logger.WithField("connectionId", connectionId).Debug("outgoing create wants")

go r.handleOutgoing(ctx, connectionId, ch, peer)
return nil
}

func (r *Manager) handleOutgoing(ctx context.Context, id rpc.ConnectionId, ch <-chan messages.BlobWithSizeOrWantDistance, peer transport.Peer) {
for blobWithSizeOrWantDistance := range ch {
logger := r.logger.WithField("connection_id", id).WithField("blob", blobWithSizeOrWantDistance.Id().String())

if size, ok := blobWithSizeOrWantDistance.SizeOrWantDistance().Size(); ok {
logger.WithField("size", size.InBytes()).Debug("got size")
go r.downloader.OnHasReceived(ctx, peer, blobWithSizeOrWantDistance.Id(), size)
continue
}

if distance, ok := blobWithSizeOrWantDistance.SizeOrWantDistance().WantDistance(); ok {
// peer wants a blob
// todo tell it that we have it if we have it
logger.WithField("distance", distance.Int()).Debug("got distance")
continue
}
m.lock.Lock()
defer m.lock.Unlock()

panic("logic error")
}

// todo channel closed
m.getOrCreateProcess(connectionId).AddOutgoing(ctx, ch, peer)
return nil
}

func (r *Manager) sendWantListPeriodically(ctx context.Context, ch chan<- messages.BlobWithSizeOrWantDistance) {
defer close(ch)
defer r.logger.Debug("terminating sending want list")

for {
wl, err := r.storage.GetWantList()
if err != nil {
r.logger.WithError(err).Error("could not get the want list")
continue
}

for _, v := range wl.List() {
v, err := messages.NewBlobWithWantDistance(v.Id, v.Distance)
if err != nil {
r.logger.WithError(err).Error("could not create a blob with want distance")
continue
}

r.logger.WithField("blob", v.Id()).Debug("sending wants")

select {
case ch <- v:
continue
case <-ctx.Done():
return
}
}

select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second): // todo change
continue
}
func (m *Manager) getOrCreateProcess(id rpc.ConnectionId) *WantsProcess {
v, ok := m.processes[id]
if !ok {
v = NewWantsProcess(
m.wantListStorage,
m.blobStorage,
m.downloader,
m.logger.WithField("connection_id", id),
)
m.processes[id] = v
}
return v
}
3 changes: 3 additions & 0 deletions service/domain/blobs/replication/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/planetary-social/go-ssb/fixtures"
"github.com/planetary-social/go-ssb/logging"
"github.com/planetary-social/go-ssb/service/adapters/mocks"
"github.com/planetary-social/go-ssb/service/domain/blobs"
"github.com/planetary-social/go-ssb/service/domain/blobs/replication"
"github.com/planetary-social/go-ssb/service/domain/messages"
Expand Down Expand Up @@ -90,11 +91,13 @@ type testManager struct {

func newTestManager(t *testing.T) testManager {
wantListStorage := newWantListStorageMock()
blobStorage := mocks.NewBlobStorageMock()
downloader := newDownloaderMock()
logger := logging.NewDevNullLogger()

manager := replication.NewManager(
wantListStorage,
blobStorage,
downloader,
logger,
)
Expand Down
Loading