diff --git a/di/inject_adapters.go b/di/inject_adapters.go index 9cf46cd5..6f45c845 100644 --- a/di/inject_adapters.go +++ b/di/inject_adapters.go @@ -5,6 +5,7 @@ import ( "github.com/google/wire" "github.com/planetary-social/go-ssb/logging" + "github.com/planetary-social/go-ssb/service/adapters" "github.com/planetary-social/go-ssb/service/adapters/blobs" "github.com/planetary-social/go-ssb/service/adapters/bolt" "github.com/planetary-social/go-ssb/service/adapters/mocks" @@ -25,6 +26,9 @@ var txBoltAdaptersSet = wire.NewSet( bolt.NewSocialGraphRepository, wire.Bind(new(commands.SocialGraphRepository), new(*bolt.SocialGraphRepository)), + bolt.NewWantListRepository, + wire.Bind(new(commands.WantListRepository), new(*bolt.WantListRepository)), + bolt.NewReceiveLogRepository, bolt.NewMessageRepository, bolt.NewPubRepository, @@ -80,3 +84,9 @@ var blobsAdaptersSet = wire.NewSet( func newFilesystemStorage(logger logging.Logger, config Config) (*blobs.FilesystemStorage, error) { return blobs.NewFilesystemStorage(path.Join(config.DataDirectory, "blobs"), logger) } + +//nolint:deadcode,varcheck +var adaptersSet = wire.NewSet( + adapters.NewCurrentTimeProvider, + wire.Bind(new(commands.CurrentTimeProvider), new(*adapters.CurrentTimeProvider)), +) diff --git a/di/inject_application.go b/di/inject_application.go index 7b1f89ba..f5b9b755 100644 --- a/di/inject_application.go +++ b/di/inject_application.go @@ -27,9 +27,11 @@ var commandsSet = wire.NewSet( commands.NewProcessNewLocalDiscoveryHandler, commands.NewPublishRawHandler, commands.NewEstablishNewConnectionsHandler, + commands.NewCreateWantsHandler, + commands.NewDownloadBlobHandler, + commands.NewRawMessageHandler, wire.Bind(new(replication.RawMessageHandler), new(*commands.RawMessageHandler)), - commands.NewCreateWantsHandler, ) var queriesSet = wire.NewSet( diff --git a/di/wire.go b/di/wire.go index 73788067..a8645c27 100644 --- a/di/wire.go +++ b/di/wire.go @@ -37,6 +37,9 @@ type TxTestAdapters struct { MessageRepository *bolt.MessageRepository FeedRepository *bolt.FeedRepository ReceiveLog *bolt.ReceiveLogRepository + WantList *bolt.WantListRepository + + CurrentTimeProvider *mocks.CurrentTimeProviderMock } func BuildTxTestAdapters(*bbolt.Tx) (TxTestAdapters, error) { @@ -54,6 +57,9 @@ func BuildTxTestAdapters(*bbolt.Tx) (TxTestAdapters, error) { formatsSet, wire.Value(hops), + + mocks.NewCurrentTimeProviderMock, + wire.Bind(new(commands.CurrentTimeProvider), new(*mocks.CurrentTimeProviderMock)), ) return TxTestAdapters{}, nil @@ -122,6 +128,7 @@ func BuildTransactableAdapters(*bbolt.Tx, identity.Public, Config) (commands.Ada txBoltAdaptersSet, formatsSet, extractFromConfigSet, + adaptersSet, wire.Value(hops), ) @@ -135,6 +142,7 @@ func BuildTxRepositories(*bbolt.Tx, identity.Public, logging.Logger, formats.Mes txBoltAdaptersSet, formatsSet, + adaptersSet, wire.Value(hops), ) @@ -186,6 +194,7 @@ func BuildService(context.Context, identity.Private, Config) (Service, error) { messagePubSubSet, boltAdaptersSet, blobsAdaptersSet, + adaptersSet, newBolt, ) diff --git a/di/wire_gen.go b/di/wire_gen.go index 70f8989f..75ed7905 100644 --- a/di/wire_gen.go +++ b/di/wire_gen.go @@ -15,6 +15,7 @@ import ( "github.com/google/wire" "github.com/planetary-social/go-ssb/fixtures" "github.com/planetary-social/go-ssb/logging" + "github.com/planetary-social/go-ssb/service/adapters" "github.com/planetary-social/go-ssb/service/adapters/bolt" "github.com/planetary-social/go-ssb/service/adapters/mocks" "github.com/planetary-social/go-ssb/service/adapters/pubsub" @@ -65,10 +66,14 @@ func BuildTxTestAdapters(tx *bbolt.Tx) (TxTestAdapters, error) { pubRepository := bolt.NewPubRepository(tx) blobRepository := bolt.NewBlobRepository(tx) feedRepository := bolt.NewFeedRepository(tx, socialGraphRepository, receiveLogRepository, messageRepository, pubRepository, blobRepository, scuttlebutt) + currentTimeProviderMock := mocks.NewCurrentTimeProviderMock() + wantListRepository := bolt.NewWantListRepository(tx, currentTimeProviderMock) txTestAdapters := TxTestAdapters{ - MessageRepository: messageRepository, - FeedRepository: feedRepository, - ReceiveLog: receiveLogRepository, + MessageRepository: messageRepository, + FeedRepository: feedRepository, + ReceiveLog: receiveLogRepository, + WantList: wantListRepository, + CurrentTimeProvider: currentTimeProviderMock, } return txTestAdapters, nil } @@ -156,11 +161,14 @@ func BuildTransactableAdapters(tx *bbolt.Tx, public identity.Public, config Conf pubRepository := bolt.NewPubRepository(tx) blobRepository := bolt.NewBlobRepository(tx) feedRepository := bolt.NewFeedRepository(tx, socialGraphRepository, receiveLogRepository, messageRepository, pubRepository, blobRepository, scuttlebutt) - adapters := commands.Adapters{ + currentTimeProvider := adapters.NewCurrentTimeProvider() + wantListRepository := bolt.NewWantListRepository(tx, currentTimeProvider) + commandsAdapters := commands.Adapters{ Feed: feedRepository, SocialGraph: socialGraphRepository, + WantList: wantListRepository, } - return adapters, nil + return commandsAdapters, nil } var ( @@ -183,12 +191,15 @@ func BuildTxRepositories(tx *bbolt.Tx, public identity.Public, logger logging.Lo pubRepository := bolt.NewPubRepository(tx) blobRepository := bolt.NewBlobRepository(tx) feedRepository := bolt.NewFeedRepository(tx, socialGraphRepository, receiveLogRepository, messageRepository, pubRepository, blobRepository, scuttlebutt) + currentTimeProvider := adapters.NewCurrentTimeProvider() + wantListRepository := bolt.NewWantListRepository(tx, currentTimeProvider) txRepositories := bolt.TxRepositories{ Feed: feedRepository, Graph: socialGraphRepository, ReceiveLog: receiveLogRepository, Message: messageRepository, Blob: blobRepository, + WantList: wantListRepository, } return txRepositories, nil } @@ -256,6 +267,8 @@ func BuildService(contextContext context.Context, private identity.Private, conf acceptNewPeerHandler := commands.NewAcceptNewPeerHandler(peerManager) processNewLocalDiscoveryHandler := commands.NewProcessNewLocalDiscoveryHandler(peerManager) createWantsHandler := commands.NewCreateWantsHandler(replicationManager) + currentTimeProvider := adapters.NewCurrentTimeProvider() + downloadBlobHandler := commands.NewDownloadBlobHandler(transactionProvider, currentTimeProvider) appCommands := app.Commands{ RedeemInvite: redeemInviteHandler, Follow: followHandler, @@ -265,6 +278,7 @@ func BuildService(contextContext context.Context, private identity.Private, conf AcceptNewPeer: acceptNewPeerHandler, ProcessNewLocalDiscovery: processNewLocalDiscoveryHandler, CreateWants: createWantsHandler, + DownloadBlobHandler: downloadBlobHandler, } readFeedRepository := bolt.NewReadFeedRepository(db, txRepositoriesFactory) messagePubSub := pubsub.NewMessagePubSub() @@ -325,6 +339,9 @@ type TxTestAdapters struct { MessageRepository *bolt.MessageRepository FeedRepository *bolt.FeedRepository ReceiveLog *bolt.ReceiveLogRepository + WantList *bolt.WantListRepository + + CurrentTimeProvider *mocks.CurrentTimeProviderMock } type TestAdapters struct { diff --git a/fixtures/fixtures.go b/fixtures/fixtures.go index fa601252..3182aaa8 100644 --- a/fixtures/fixtures.go +++ b/fixtures/fixtures.go @@ -90,6 +90,10 @@ func SomeTime() time.Time { return time.Unix(rand.Int63(), 0) } +func SomeDuration() time.Duration { + return time.Duration(time.Duration(SomePositiveInt32()) * time.Second) +} + func SomePublicIdentity() identity.Public { return SomePrivateIdentity().Public() } diff --git a/service/adapters/bolt/blob_repository.go b/service/adapters/bolt/blob_repository.go index 55c41eca..701578a9 100644 --- a/service/adapters/bolt/blob_repository.go +++ b/service/adapters/bolt/blob_repository.go @@ -36,28 +36,6 @@ func (r BlobRepository) Put(blob feeds.BlobsToSave) error { return nil } -func (r BlobRepository) List() ([]refs.Blob, error) { - var result []refs.Blob - - bucket := r.tx.Bucket(bucketBlobs) - if bucket == nil { - return nil, nil - } - - if err := bucket.ForEach(func(k, v []byte) error { - ref, err := refs.NewBlob(string(k)) - if err != nil { - return errors.Wrap(err, "could not create a ref") - } - result = append(result, ref) - return nil - }); err != nil { - return nil, errors.Wrap(err, "for each failed") - } - - return result, nil -} - func (r BlobRepository) createBucket(blob refs.Blob, feed refs.Feed) (*bbolt.Bucket, error) { return createBucket(r.tx, r.bucketPath(blob, feed)) } diff --git a/service/adapters/bolt/read_wantlist_repository.go b/service/adapters/bolt/read_want_list_repository.go similarity index 96% rename from service/adapters/bolt/read_wantlist_repository.go rename to service/adapters/bolt/read_want_list_repository.go index 15848862..ecb9f533 100644 --- a/service/adapters/bolt/read_wantlist_repository.go +++ b/service/adapters/bolt/read_want_list_repository.go @@ -24,7 +24,7 @@ func (b ReadWantListRepository) GetWantList() (blobs.WantList, error) { return errors.Wrap(err, "could not call the factory") } - list, err := r.Blob.List() + list, err := r.WantList.List() if err != nil { return errors.Wrap(err, "could not get blobs") } diff --git a/service/adapters/bolt/transaction.go b/service/adapters/bolt/transaction.go index 5f94e2f7..7dbe1d40 100644 --- a/service/adapters/bolt/transaction.go +++ b/service/adapters/bolt/transaction.go @@ -37,4 +37,5 @@ type TxRepositories struct { ReceiveLog *ReceiveLogRepository Message *MessageRepository Blob *BlobRepository + WantList *WantListRepository } diff --git a/service/adapters/bolt/want_list_repository.go b/service/adapters/bolt/want_list_repository.go new file mode 100644 index 00000000..15576cd4 --- /dev/null +++ b/service/adapters/bolt/want_list_repository.go @@ -0,0 +1,126 @@ +package bolt + +import ( + "time" + + "github.com/boreq/errors" + "github.com/planetary-social/go-ssb/service/app/commands" + "github.com/planetary-social/go-ssb/service/domain/refs" + "go.etcd.io/bbolt" +) + +var bucketWantList = bucketName("want_list") + +type WantListRepository struct { + tx *bbolt.Tx + currentTimeProvider commands.CurrentTimeProvider +} + +func NewWantListRepository( + tx *bbolt.Tx, + currentTimeProvider commands.CurrentTimeProvider, +) *WantListRepository { + return &WantListRepository{ + tx: tx, + currentTimeProvider: currentTimeProvider, + } +} + +func (r WantListRepository) AddToWantList(id refs.Blob, until time.Time) error { + bucket, err := r.createBucket() + if err != nil { + return errors.Wrap(err, "failed to get the bucket") + } + + key := r.toKey(id) + + value := bucket.Get(key) + if value != nil { + t, err := r.fromValue(value) + if err != nil { + return errors.Wrap(err, "failed to read the value") + } + + if t.After(until) { + return nil + } + } + + return bucket.Put(key, r.toValue(until)) +} + +func (r WantListRepository) List() ([]refs.Blob, error) { + var result []refs.Blob + var toDelete []refs.Blob + + bucket, err := r.getBucket() + if err != nil { + return nil, errors.Wrap(err, "failed to get the bucket") + } + + if bucket == nil { + return nil, nil + } + + now := r.currentTimeProvider.Get() + + if err := bucket.ForEach(func(k, v []byte) error { + id, err := r.fromKey(k) + if err != nil { + return errors.Wrap(err, "could not read the key") + } + + until, err := r.fromValue(v) + if err != nil { + return errors.Wrap(err, "could not read the value") + } + + if now.After(until) { + toDelete = append(toDelete, id) + return nil + } + + result = append(result, id) + return nil + }); err != nil { + return nil, errors.Wrap(err, "for each failed") + } + + for _, id := range toDelete { + if err := bucket.Delete(r.toKey(id)); err != nil { + return nil, errors.Wrap(err, "deletion failed") + } + } + + return result, nil +} + +func (r WantListRepository) toKey(id refs.Blob) []byte { + return []byte(id.String()) +} + +func (r WantListRepository) fromKey(key []byte) (refs.Blob, error) { + return refs.NewBlob(string(key)) +} + +func (r WantListRepository) toValue(t time.Time) []byte { + return []byte(t.Format(time.RFC3339)) +} + +func (r WantListRepository) fromValue(v []byte) (time.Time, error) { + return time.Parse(time.RFC3339, string(v)) +} + +func (r WantListRepository) createBucket() (*bbolt.Bucket, error) { + return createBucket(r.tx, r.bucketPath()) +} + +func (r WantListRepository) getBucket() (*bbolt.Bucket, error) { + return getBucket(r.tx, r.bucketPath()) +} + +func (r WantListRepository) bucketPath() []bucketName { + return []bucketName{ + bucketWantList, + } +} diff --git a/service/adapters/bolt/want_list_repository_test.go b/service/adapters/bolt/want_list_repository_test.go new file mode 100644 index 00000000..c979e734 --- /dev/null +++ b/service/adapters/bolt/want_list_repository_test.go @@ -0,0 +1,104 @@ +package bolt_test + +import ( + "testing" + "time" + + "github.com/planetary-social/go-ssb/di" + "github.com/planetary-social/go-ssb/fixtures" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func TestWantListRepositoryListDoesNotReturnValuesForWhichUntilIsBeforeCurrentTime(t *testing.T) { + db := fixtures.Bolt(t) + + err := db.Update(func(tx *bbolt.Tx) error { + txadapters, err := di.BuildTxTestAdapters(tx) + require.NoError(t, err) + + until := time.Now() + afterUntil := until.Add(fixtures.SomeDuration()) + beforeUntil := until.Add(-fixtures.SomeDuration()) + + err = txadapters.WantList.AddToWantList(fixtures.SomeRefBlob(), until) + require.NoError(t, err) + + txadapters.CurrentTimeProvider.CurrentTime = beforeUntil + + l, err := txadapters.WantList.List() + require.NoError(t, err) + require.NotEmpty(t, l, "if the deadline hasn't passed the value should be returned") + + txadapters.CurrentTimeProvider.CurrentTime = afterUntil + + l, err = txadapters.WantList.List() + require.NoError(t, err) + require.Empty(t, l, "if the deadline passed the value shouldn't be returned") + + txadapters.CurrentTimeProvider.CurrentTime = beforeUntil + + l, err = txadapters.WantList.List() + require.NoError(t, err) + require.Empty(t, l, "calling list should have cleaned up values for which the deadline has passed") + + return nil + }) + require.NoError(t, err) +} + +func TestWantListRepositoryLongerUntilOverwritesShorterUntil(t *testing.T) { + db := fixtures.Bolt(t) + + err := db.Update(func(tx *bbolt.Tx) error { + txadapters, err := di.BuildTxTestAdapters(tx) + require.NoError(t, err) + + firstUntil := time.Now() + afterFirstUntil := firstUntil.Add(fixtures.SomeDuration()) + secondUntil := afterFirstUntil.Add(fixtures.SomeDuration()) + + err = txadapters.WantList.AddToWantList(fixtures.SomeRefBlob(), firstUntil) + require.NoError(t, err) + + err = txadapters.WantList.AddToWantList(fixtures.SomeRefBlob(), secondUntil) + require.NoError(t, err) + + txadapters.CurrentTimeProvider.CurrentTime = afterFirstUntil + + l, err := txadapters.WantList.List() + require.NoError(t, err) + require.NotEmpty(t, l, "if the deadline hasn't passed the value should be returned") + + return nil + }) + require.NoError(t, err) +} + +func TestWantListRepositoryShorterUntilDoesNotOverwriteShorterUntil(t *testing.T) { + db := fixtures.Bolt(t) + + err := db.Update(func(tx *bbolt.Tx) error { + txadapters, err := di.BuildTxTestAdapters(tx) + require.NoError(t, err) + + firstUntil := time.Now() + afterFirstUntil := firstUntil.Add(fixtures.SomeDuration()) + secondUntil := afterFirstUntil.Add(fixtures.SomeDuration()) + + err = txadapters.WantList.AddToWantList(fixtures.SomeRefBlob(), secondUntil) + require.NoError(t, err) + + err = txadapters.WantList.AddToWantList(fixtures.SomeRefBlob(), firstUntil) + require.NoError(t, err) + + txadapters.CurrentTimeProvider.CurrentTime = afterFirstUntil + + l, err := txadapters.WantList.List() + require.NoError(t, err) + require.NotEmpty(t, l, "if the deadline hasn't passed the value should be returned") + + return nil + }) + require.NoError(t, err) +} diff --git a/service/adapters/current_time_provider.go b/service/adapters/current_time_provider.go new file mode 100644 index 00000000..7dd0a4f4 --- /dev/null +++ b/service/adapters/current_time_provider.go @@ -0,0 +1,16 @@ +package adapters + +import ( + "time" +) + +type CurrentTimeProvider struct { +} + +func NewCurrentTimeProvider() *CurrentTimeProvider { + return &CurrentTimeProvider{} +} + +func (c CurrentTimeProvider) Get() time.Time { + return time.Now() +} diff --git a/service/adapters/mocks/blob_storage_mock.go b/service/adapters/mocks/blob_storage.go similarity index 100% rename from service/adapters/mocks/blob_storage_mock.go rename to service/adapters/mocks/blob_storage.go diff --git a/service/adapters/mocks/current_time_provider.go b/service/adapters/mocks/current_time_provider.go new file mode 100644 index 00000000..34cf267f --- /dev/null +++ b/service/adapters/mocks/current_time_provider.go @@ -0,0 +1,18 @@ +package mocks + +import "time" + +type CurrentTimeProviderMock struct { + CurrentTime time.Time +} + +func NewCurrentTimeProviderMock() *CurrentTimeProviderMock { + return &CurrentTimeProviderMock{} +} + +func (c CurrentTimeProviderMock) Get() time.Time { + if c.CurrentTime.IsZero() { + return time.Now() + } + return c.CurrentTime +} diff --git a/service/adapters/mocks/message_pubsub_mock.go b/service/adapters/mocks/message_pubsub.go similarity index 100% rename from service/adapters/mocks/message_pubsub_mock.go rename to service/adapters/mocks/message_pubsub.go diff --git a/service/adapters/mocks/message_repository_mock.go b/service/adapters/mocks/message_repository.go similarity index 100% rename from service/adapters/mocks/message_repository_mock.go rename to service/adapters/mocks/message_repository.go diff --git a/service/adapters/mocks/peer_manager_mock.go b/service/adapters/mocks/peer_manager.go similarity index 100% rename from service/adapters/mocks/peer_manager_mock.go rename to service/adapters/mocks/peer_manager.go diff --git a/service/app/application.go b/service/app/application.go index 13c95e62..ab7b87c5 100644 --- a/service/app/application.go +++ b/service/app/application.go @@ -20,7 +20,8 @@ type Commands struct { AcceptNewPeer *commands.AcceptNewPeerHandler ProcessNewLocalDiscovery *commands.ProcessNewLocalDiscoveryHandler - CreateWants *commands.CreateWantsHandler + CreateWants *commands.CreateWantsHandler + DownloadBlobHandler *commands.DownloadBlobHandler } type Queries struct { diff --git a/service/app/commands/common.go b/service/app/commands/common.go index ed6737ed..01d6baa2 100644 --- a/service/app/commands/common.go +++ b/service/app/commands/common.go @@ -1,6 +1,8 @@ package commands import ( + "time" + "github.com/planetary-social/go-ssb/service/domain/feeds" "github.com/planetary-social/go-ssb/service/domain/graph" "github.com/planetary-social/go-ssb/service/domain/identity" @@ -34,6 +36,7 @@ type TransactionProvider interface { type Adapters struct { Feed FeedRepository SocialGraph SocialGraphRepository + WantList WantListRepository } type FeedRepository interface { @@ -44,3 +47,14 @@ type FeedRepository interface { type SocialGraphRepository interface { GetSocialGraph() (*graph.SocialGraph, error) } + +type WantListRepository interface { + // AddToWantList puts the blob in the want list. If the blob can't be + // retrieved before the specified point of time it will be removed from the + // want list. + AddToWantList(id refs.Blob, until time.Time) error +} + +type CurrentTimeProvider interface { + Get() time.Time +} diff --git a/service/app/commands/handler_download_blob.go b/service/app/commands/handler_download_blob.go new file mode 100644 index 00000000..53c54e54 --- /dev/null +++ b/service/app/commands/handler_download_blob.go @@ -0,0 +1,35 @@ +package commands + +import ( + "time" + + "github.com/planetary-social/go-ssb/service/domain/refs" +) + +const temporaryWantListDuration = 1 * time.Hour + +type DownloadBlob struct { + Id refs.Blob +} + +type DownloadBlobHandler struct { + transaction TransactionProvider + currentTimeProvider CurrentTimeProvider +} + +func NewDownloadBlobHandler( + transaction TransactionProvider, + currentTimeProvider CurrentTimeProvider, +) *DownloadBlobHandler { + return &DownloadBlobHandler{ + transaction: transaction, + currentTimeProvider: currentTimeProvider, + } +} + +func (h *DownloadBlobHandler) Handle(cmd DownloadBlob) error { + until := h.currentTimeProvider.Get().Add(temporaryWantListDuration) + return h.transaction.Transact(func(adapters Adapters) error { + return adapters.WantList.AddToWantList(cmd.Id, until) + }) +}