Skip to content

Commit

Permalink
Change the want list to work in the on demand mode
Browse files Browse the repository at this point in the history
After this change the Swift code can request blobs to be downloaded and
the blobs remain in the want list for a specific amount of time. This
replace the previous approach of downloading all blobs right away and
copies the current behaviour exhibited by go-ssb. The new code behaves
slightly better as the want list isn't cleared when application exists
and is persisted.
  • Loading branch information
boreq committed Jul 4, 2022
1 parent 5a6dbb7 commit 5b682bc
Show file tree
Hide file tree
Showing 19 changed files with 365 additions and 30 deletions.
10 changes: 10 additions & 0 deletions di/inject_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/google/wire"
"github.com/planetary-social/go-ssb/logging"
"github.com/planetary-social/go-ssb/service/adapters"
"github.com/planetary-social/go-ssb/service/adapters/blobs"
"github.com/planetary-social/go-ssb/service/adapters/bolt"
"github.com/planetary-social/go-ssb/service/adapters/mocks"
Expand All @@ -25,6 +26,9 @@ var txBoltAdaptersSet = wire.NewSet(
bolt.NewSocialGraphRepository,
wire.Bind(new(commands.SocialGraphRepository), new(*bolt.SocialGraphRepository)),

bolt.NewWantListRepository,
wire.Bind(new(commands.WantListRepository), new(*bolt.WantListRepository)),

bolt.NewReceiveLogRepository,
bolt.NewMessageRepository,
bolt.NewPubRepository,
Expand Down Expand Up @@ -80,3 +84,9 @@ var blobsAdaptersSet = wire.NewSet(
func newFilesystemStorage(logger logging.Logger, config Config) (*blobs.FilesystemStorage, error) {
return blobs.NewFilesystemStorage(path.Join(config.DataDirectory, "blobs"), logger)
}

//nolint:deadcode,varcheck
var adaptersSet = wire.NewSet(
adapters.NewCurrentTimeProvider,
wire.Bind(new(commands.CurrentTimeProvider), new(*adapters.CurrentTimeProvider)),
)
4 changes: 3 additions & 1 deletion di/inject_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ var commandsSet = wire.NewSet(
commands.NewProcessNewLocalDiscoveryHandler,
commands.NewPublishRawHandler,
commands.NewEstablishNewConnectionsHandler,
commands.NewCreateWantsHandler,
commands.NewDownloadBlobHandler,

commands.NewRawMessageHandler,
wire.Bind(new(replication.RawMessageHandler), new(*commands.RawMessageHandler)),
commands.NewCreateWantsHandler,
)

var queriesSet = wire.NewSet(
Expand Down
9 changes: 9 additions & 0 deletions di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type TxTestAdapters struct {
MessageRepository *bolt.MessageRepository
FeedRepository *bolt.FeedRepository
ReceiveLog *bolt.ReceiveLogRepository
WantList *bolt.WantListRepository

CurrentTimeProvider *mocks.CurrentTimeProviderMock
}

func BuildTxTestAdapters(*bbolt.Tx) (TxTestAdapters, error) {
Expand All @@ -54,6 +57,9 @@ func BuildTxTestAdapters(*bbolt.Tx) (TxTestAdapters, error) {

formatsSet,
wire.Value(hops),

mocks.NewCurrentTimeProviderMock,
wire.Bind(new(commands.CurrentTimeProvider), new(*mocks.CurrentTimeProviderMock)),
)

return TxTestAdapters{}, nil
Expand Down Expand Up @@ -122,6 +128,7 @@ func BuildTransactableAdapters(*bbolt.Tx, identity.Public, Config) (commands.Ada
txBoltAdaptersSet,
formatsSet,
extractFromConfigSet,
adaptersSet,

wire.Value(hops),
)
Expand All @@ -135,6 +142,7 @@ func BuildTxRepositories(*bbolt.Tx, identity.Public, logging.Logger, formats.Mes

txBoltAdaptersSet,
formatsSet,
adaptersSet,

wire.Value(hops),
)
Expand Down Expand Up @@ -186,6 +194,7 @@ func BuildService(context.Context, identity.Private, Config) (Service, error) {
messagePubSubSet,
boltAdaptersSet,
blobsAdaptersSet,
adaptersSet,

newBolt,
)
Expand Down
27 changes: 22 additions & 5 deletions di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func SomeTime() time.Time {
return time.Unix(rand.Int63(), 0)
}

func SomeDuration() time.Duration {
return time.Duration(time.Duration(SomePositiveInt32()) * time.Second)
}

func SomePublicIdentity() identity.Public {
return SomePrivateIdentity().Public()
}
Expand Down
22 changes: 0 additions & 22 deletions service/adapters/bolt/blob_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,6 @@ func (r BlobRepository) Put(blob feeds.BlobsToSave) error {
return nil
}

func (r BlobRepository) List() ([]refs.Blob, error) {
var result []refs.Blob

bucket := r.tx.Bucket(bucketBlobs)
if bucket == nil {
return nil, nil
}

if err := bucket.ForEach(func(k, v []byte) error {
ref, err := refs.NewBlob(string(k))
if err != nil {
return errors.Wrap(err, "could not create a ref")
}
result = append(result, ref)
return nil
}); err != nil {
return nil, errors.Wrap(err, "for each failed")
}

return result, nil
}

func (r BlobRepository) createBucket(blob refs.Blob, feed refs.Feed) (*bbolt.Bucket, error) {
return createBucket(r.tx, r.bucketPath(blob, feed))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (b ReadWantListRepository) GetWantList() (blobs.WantList, error) {
return errors.Wrap(err, "could not call the factory")
}

list, err := r.Blob.List()
list, err := r.WantList.List()
if err != nil {
return errors.Wrap(err, "could not get blobs")
}
Expand Down
1 change: 1 addition & 0 deletions service/adapters/bolt/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ type TxRepositories struct {
ReceiveLog *ReceiveLogRepository
Message *MessageRepository
Blob *BlobRepository
WantList *WantListRepository
}
126 changes: 126 additions & 0 deletions service/adapters/bolt/want_list_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package bolt

import (
"time"

"github.com/boreq/errors"
"github.com/planetary-social/go-ssb/service/app/commands"
"github.com/planetary-social/go-ssb/service/domain/refs"
"go.etcd.io/bbolt"
)

var bucketWantList = bucketName("want_list")

type WantListRepository struct {
tx *bbolt.Tx
currentTimeProvider commands.CurrentTimeProvider
}

func NewWantListRepository(
tx *bbolt.Tx,
currentTimeProvider commands.CurrentTimeProvider,
) *WantListRepository {
return &WantListRepository{
tx: tx,
currentTimeProvider: currentTimeProvider,
}
}

func (r WantListRepository) AddToWantList(id refs.Blob, until time.Time) error {
bucket, err := r.createBucket()
if err != nil {
return errors.Wrap(err, "failed to get the bucket")
}

key := r.toKey(id)

value := bucket.Get(key)
if value != nil {
t, err := r.fromValue(value)
if err != nil {
return errors.Wrap(err, "failed to read the value")
}

if t.After(until) {
return nil
}
}

return bucket.Put(key, r.toValue(until))
}

func (r WantListRepository) List() ([]refs.Blob, error) {
var result []refs.Blob
var toDelete []refs.Blob

bucket, err := r.getBucket()
if err != nil {
return nil, errors.Wrap(err, "failed to get the bucket")
}

if bucket == nil {
return nil, nil
}

now := r.currentTimeProvider.Get()

if err := bucket.ForEach(func(k, v []byte) error {
id, err := r.fromKey(k)
if err != nil {
return errors.Wrap(err, "could not read the key")
}

until, err := r.fromValue(v)
if err != nil {
return errors.Wrap(err, "could not read the value")
}

if now.After(until) {
toDelete = append(toDelete, id)
return nil
}

result = append(result, id)
return nil
}); err != nil {
return nil, errors.Wrap(err, "for each failed")
}

for _, id := range toDelete {
if err := bucket.Delete(r.toKey(id)); err != nil {
return nil, errors.Wrap(err, "deletion failed")
}
}

return result, nil
}

func (r WantListRepository) toKey(id refs.Blob) []byte {
return []byte(id.String())
}

func (r WantListRepository) fromKey(key []byte) (refs.Blob, error) {
return refs.NewBlob(string(key))
}

func (r WantListRepository) toValue(t time.Time) []byte {
return []byte(t.Format(time.RFC3339))
}

func (r WantListRepository) fromValue(v []byte) (time.Time, error) {
return time.Parse(time.RFC3339, string(v))
}

func (r WantListRepository) createBucket() (*bbolt.Bucket, error) {
return createBucket(r.tx, r.bucketPath())
}

func (r WantListRepository) getBucket() (*bbolt.Bucket, error) {
return getBucket(r.tx, r.bucketPath())
}

func (r WantListRepository) bucketPath() []bucketName {
return []bucketName{
bucketWantList,
}
}
Loading

0 comments on commit 5b682bc

Please sign in to comment.