Skip to content

Commit

Permalink
Subscribe to vanish_requests stream for deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Oct 11, 2024
1 parent 7aaa885 commit e71645a
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 19 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
ci: tools test lint generate fmt tidy check_repository_unchanged

.PHONY: check_repository_unchanged
check_repository_unchanged:
check_repository_unchanged:
_tools/check_repository_unchanged.sh

.PHONY: generate
Expand All @@ -17,10 +17,10 @@ fmt:
test:
go test -race ./...

.PHONY: recreate-emulator
recreate-emulator:
docker compose -f ./docker-compose-integration.yml rm -f -s -v
docker compose -f ./docker-compose-integration.yml up -d
.PHONY: start-services
start-services:
docker compose rm -f -s -v
docker compose up -d

.PHONY: test-integration
test-integration:
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ You may not be able to build it using older compilers.
You have two options when it comes to using Firestore: a local emulator using
Docker or a real Firestore project that we setup for development.

##### Using Firestore emulator
##### Using Firestore emulator, nosrelay and Redis

1. Start the docker daemon.
2. Run `make recreate-emulator` to start the Firestore emulator using Docker compose.
2. Run `make start-services` to start the Firestore emulator, nosrelay and Redis using Docker compose.
3. Run the following command changing `NOTIFICATIONS_APNS_CERTIFICATE_PATH` and `NOTIFICATIONS_APNS_CERTIFICATE_PASSWORD`:

```
Expand All @@ -137,6 +137,7 @@ FIRESTORE_EMULATOR_HOST=localhost:8200 \
NOTIFICATIONS_FIRESTORE_PROJECT_ID=test-project-id \
NOTIFICATIONS_APNS_TOPIC=com.verse.Nos \
NOTIFICATIONS_ENVIRONMENT=DEVELOPMENT \
REDIS_URL=redis://localhost:6379 \
go run ./cmd/notification-service
```

Expand All @@ -152,6 +153,7 @@ NOTIFICATIONS_FIRESTORE_CREDENTIALS_JSON_PATH="/path/to/your/credentials/file.js
NOTIFICATIONS_FIRESTORE_PROJECT_ID="nos-notification-service-dev" \
NOTIFICATIONS_APNS_TOPIC=com.verse.Nos \
NOTIFICATIONS_ENVIRONMENT=DEVELOPMENT \
REDIS_URL=redis://localhost:6379 \
go run ./cmd/notification-service
```

Expand Down
9 changes: 9 additions & 0 deletions cmd/notification-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Service struct {
metricsServer http.MetricsServer
downloader *app.Downloader
followChangePuller *app.FollowChangePuller
vanishSubscriber *app.VanishSubscriber
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber
externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber
Expand All @@ -30,6 +31,7 @@ func NewService(
metricsServer http.MetricsServer,
downloader *app.Downloader,
followChangePuller *app.FollowChangePuller,
vanishSubscriber *app.VanishSubscriber,
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber,
externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber,
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber,
Expand All @@ -41,6 +43,7 @@ func NewService(
metricsServer: metricsServer,
downloader: downloader,
followChangePuller: followChangePuller,
vanishSubscriber: vanishSubscriber,
receivedEventSubscriber: receivedEventSubscriber,
externalFollowChangeSubscriber: externalFollowChangeSubscriber,
eventSavedSubscriber: eventSavedSubscriber,
Expand Down Expand Up @@ -79,10 +82,16 @@ func (s Service) Run(ctx context.Context) error {
errCh <- errors.Wrap(s.receivedEventSubscriber.Run(ctx), "received event subscriber error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.followChangePuller.Run(ctx), "follow change subscriber error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.vanishSubscriber.Run(ctx), "vanish subscriber error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.eventSavedSubscriber.Run(ctx), "event saved subscriber error")
Expand Down
6 changes: 6 additions & 0 deletions cmd/notification-service/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func BuildService(context.Context, config.Config) (Service, func(), error) {
loggingSet,
adaptersSet,
followChangePullerSet,
vanishSubscriberSet,
)
return Service{}, nil, nil
}
Expand All @@ -49,6 +50,7 @@ func BuildIntegrationService(context.Context, config.Config) (IntegrationService
firestoreAdaptersSet,
downloaderSet,
followChangePullerSet,
vanishSubscriberSet,
generatorSet,
pubsubSet,
loggingSet,
Expand Down Expand Up @@ -80,6 +82,10 @@ var followChangePullerSet = wire.NewSet(
app.NewFollowChangePuller,
)

var vanishSubscriberSet = wire.NewSet(
app.NewVanishSubscriber,
)

var generatorSet = wire.NewSet(
notifications.NewGenerator,
)
11 changes: 7 additions & 4 deletions cmd/notification-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
services:
firestore_emulator:
platform: linux/amd64
image: mtlynch/firestore-emulator
environment:
- FIRESTORE_PROJECT_ID=test-project-id
- PORT=8200
ports:
- 8200:8200

redis:
platform: linux/amd64
image: redis:latest
ports:
- 6379:6379

relay:
platform: linux/amd64
image: ghcr.io/planetary-social/nosrelay:latest
ports:
- "7777:7777"
environment:
- RELAY_URL=wss://example.com
- REDIS_URL=redis://redis:6379
depends_on:
- redis
8 changes: 0 additions & 8 deletions docker-compose-integration.yml

This file was deleted.

2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
Expand Down Expand Up @@ -67,6 +68,7 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/puzpuzpuz/xsync/v2 v2.5.1 // indirect
github.com/redis/go-redis/v9 v9.6.1 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/tidwall/gjson v1.17.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWa
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down Expand Up @@ -263,6 +265,8 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU=
github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down
40 changes: 40 additions & 0 deletions service/adapters/firestore/repository_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,46 @@ func (e *EventRepository) saveUnderEvents(event domain.Event) error {
return nil
}

// DeleteByPublicKey deletes all events and associated notifications for a given public key.
func (e *EventRepository) DeleteByPublicKey(ctx context.Context, pubkey domain.PublicKey) error {
eventsQuery := e.client.Collection(collectionEvents).Where(eventFieldPublicKey, "==", pubkey.Hex())

eventsDocs := e.tx.Documents(eventsQuery)

for {
eventDoc, err := eventsDocs.Next()
if err == iterator.Done {
break
}
if err != nil {
return errors.Wrap(err, "error fetching event document")
}

notificationsCollection := eventDoc.Ref.Collection(collectionEventsNotifications)
notificationsDocs := e.tx.Documents(notificationsCollection)

for {
notificationDoc, err := notificationsDocs.Next()
if err == iterator.Done {
break
}
if err != nil {
return errors.Wrap(err, "error fetching notification document")
}

if err := e.tx.Delete(notificationDoc.Ref); err != nil {
return errors.Wrap(err, "error deleting notification document")
}
}

if err := e.tx.Delete(eventDoc.Ref); err != nil {
return errors.Wrap(err, "error deleting event document")
}
}

return nil
}

func (e *EventRepository) GetEvents(ctx context.Context, filters domain.Filters) <-chan app.EventOrError {
ch := make(chan app.EventOrError)
go e.getEvents(ctx, filters, ch)
Expand Down
27 changes: 27 additions & 0 deletions service/adapters/firestore/repository_public_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,33 @@ func (r *PublicKeyRepository) Save(registration domain.Registration) error {
return nil
}

// DeleteByPublicKey deletes a public key and all its associated APNS tokens
func (r *PublicKeyRepository) DeleteByPublicKey(ctx context.Context, publicKey domain.PublicKey) error {
pubKeyDocRef := r.client.Collection(collectionPublicKeys).Doc(publicKey.Hex())
apnsTokensCollection := pubKeyDocRef.Collection(collectionPublicKeysAPNSTokens)
apnsDocs := r.tx.Documents(apnsTokensCollection)

for {
doc, err := apnsDocs.Next()
if err == iterator.Done {
break
}
if err != nil {
return errors.Wrap(err, "error fetching APNS token document")
}

if err := r.tx.Delete(doc.Ref); err != nil {
return errors.Wrap(err, "error deleting APNS token document")
}
}

if err := r.tx.Delete(pubKeyDocRef); err != nil {
return errors.Wrap(err, "error deleting the public key document")
}

return nil
}

func (r *PublicKeyRepository) GetAPNSTokens(ctx context.Context, publicKey domain.PublicKey, savedAfter time.Time) ([]domain.APNSToken, error) {
docs := r.tx.Documents(
r.client.
Expand Down
2 changes: 2 additions & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ type RelayRepository interface {
}

type PublicKeyRepository interface {
DeleteByPublicKey(ctx context.Context, publicKey domain.PublicKey) error
GetAPNSTokens(ctx context.Context, publicKey domain.PublicKey, savedAfter time.Time) ([]domain.APNSToken, error)
}

type EventRepository interface {
Save(event domain.Event) error
DeleteByPublicKey(ctx context.Context, publicKey domain.PublicKey) error
Get(ctx context.Context, id domain.EventId) (domain.Event, error)
Exists(ctx context.Context, id domain.EventId) (bool, error)
GetEvents(ctx context.Context, filters domain.Filters) <-chan EventOrError
Expand Down
Loading

0 comments on commit e71645a

Please sign in to comment.