Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature flagged follow change notification #59

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions cmd/notification-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions service/adapters/apns/apns.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package apns

import (
"encoding/json"
"time"

"github.com/boreq/errors"
"github.com/google/uuid"
"github.com/planetary-social/go-notification-service/internal/logging"
"github.com/planetary-social/go-notification-service/service/config"
"github.com/planetary-social/go-notification-service/service/domain"
"github.com/planetary-social/go-notification-service/service/domain/notifications"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/certificate"
Expand Down Expand Up @@ -73,3 +78,84 @@ func (a *APNS) SendNotification(notification notifications.Notification) error {

return nil
}

func (a *APNS) SendFollowChangeNotification(followChange domain.FollowChange, apnsToken domain.APNSToken) error {
if apnsToken.Hex() == "" {
return errors.New("invalid APNs token")
}
n, err := a.buildFollowChangeNotification(followChange, apnsToken)
if err != nil {
return err
}
resp, err := a.client.Push(n)
//a.metrics.ReportCallToAPNS(resp.StatusCode, err)
if err != nil {
return errors.Wrap(err, "error pushing the follow change notification")
}

if resp.StatusCode == 200 {
a.logger.Debug().
WithField("uuid", n.ApnsID).
WithField("response.reason", resp.Reason).
WithField("response.statusCode", resp.StatusCode).
WithField("host", a.client.Host).
Message("sent a follow change notification")
} else {
a.logger.Error().
WithField("uuid", n.ApnsID).
WithField("response.reason", resp.Reason).
WithField("response.statusCode", resp.StatusCode).
WithField("host", a.client.Host).
Message("failed to send a follow change notification")
}

return nil
}

func (a *APNS) buildFollowChangeNotification(followChange domain.FollowChange, apnsToken domain.APNSToken) (*apns2.Notification, error) {
payload, err := followChangePayload(followChange)
if err != nil {
return nil, errors.Wrap(err, "error creating a payload")
}

n := &apns2.Notification{
PushType: apns2.PushTypeAlert,
ApnsID: uuid.New().String(),
DeviceToken: apnsToken.Hex(),
Topic: a.cfg.APNSTopic(),
Payload: payload,
Priority: apns2.PriorityLow,
}

return n, nil
}

func followChangePayload(followChange domain.FollowChange) ([]byte, error) {
alertMessage := ""
if followChange.ChangeType == "unfollowed" {
alertMessage = followChange.FriendlyFollower + " has unfollowed you!"
} else {
alertMessage = followChange.FriendlyFollower + " is a new follower!"
}

payload := map[string]interface{}{
"aps": map[string]interface{}{
"alert": alertMessage,
"sound": "default",
"badge": 1,
},
"data": map[string]interface{}{
"changeType": followChange.ChangeType,
"at": followChange.At.Format(time.RFC3339),
"follower": followChange.Follower.Hex(),
"friendlyFollower": followChange.FriendlyFollower,
},
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}

return payloadBytes, nil
}
7 changes: 7 additions & 0 deletions service/adapters/apns/apns_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/planetary-social/go-notification-service/internal"
"github.com/planetary-social/go-notification-service/internal/logging"
"github.com/planetary-social/go-notification-service/service/config"
"github.com/planetary-social/go-notification-service/service/domain"
"github.com/planetary-social/go-notification-service/service/domain/notifications"
)

Expand Down Expand Up @@ -34,6 +35,12 @@ func (a *APNSMock) SendNotification(notification notifications.Notification) err
return nil
}

func (a *APNSMock) SendFollowChangeNotification(followChange domain.FollowChange, token domain.APNSToken) error {
notification := notifications.Notification{}

return a.SendNotification(notification)
}

func (a *APNSMock) SentNotifications() []notifications.Notification {
a.sentNotificationsLock.Lock()
defer a.sentNotificationsLock.Unlock()
Expand Down
1 change: 1 addition & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Queries struct {

type APNS interface {
SendNotification(notification notifications.Notification) error
SendFollowChangeNotification(followChange domain.FollowChange, apnsToken domain.APNSToken) error
}

type EventOrError struct {
Expand Down
61 changes: 52 additions & 9 deletions service/app/follow_change_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,96 @@ import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/go-notification-service/internal/logging"
)

type FollowChangePuller struct {
externalFollowChangeSubscriber ExternalFollowChangeSubscriber
apns APNS
queries Queries
logger logging.Logger
metrics Metrics
counter int
}

func NewFollowChangePuller(
externalFollowChangeSubscriber ExternalFollowChangeSubscriber,
apns APNS,
queries Queries,
logger logging.Logger,
metrics Metrics,
) *FollowChangePuller {
return &FollowChangePuller{
externalFollowChangeSubscriber: externalFollowChangeSubscriber,
apns: apns,
queries: queries,
logger: logger.New("followChangePuller"),
metrics: metrics,
counter: 0,
}
}

// Listens for messages from the follow-change pubsub and for each of them, if
// they belong to one of our users, we send a notification
func (f *FollowChangePuller) Run(ctx context.Context) error {
go f.storeMetricsLoop(ctx)

ch, err := f.externalFollowChangeSubscriber.Subscribe(ctx)
if err != nil {
return err
return errors.Wrap(err, "error subscribing to follow changes")
}

for followChange := range ch {
f.logger.Debug().Message(followChange.String())
for {
select {
case followChange, ok := <-ch:
if !ok {
return nil // Channel closed, exit gracefully
}

f.counter += 1
}
f.logger.Debug().Message(followChange.String())

// TODO: for the moment, just send notifications to Daniel
if followChange.Followee.Hex() != "89ef92b9ebe6dc1e4ea398f6477f227e95429627b0a33dc89b640e137b256be5" {
f.logger.Debug().WithField("followee", followChange.Followee.Hex()).Message("ignoring follow change")
continue
}

tokens, err := f.queries.GetTokens.Handle(ctx, followChange.Followee)
if err != nil {
// Not one of our users, ignore
continue
}

for _, token := range tokens {
if err := f.apns.SendFollowChangeNotification(*followChange, token); err != nil {
f.logger.Error().
WithField("token", token.Hex()).
WithField("followee", followChange.Followee.Hex()).
WithError(err).
Message("error sending follow change notification")
continue
}
}

return nil
f.counter += 1
case <-ctx.Done():
f.logger.Debug().Message("context canceled, shutting down FollowChangePuller")
return nil
}
}
}

func (f *FollowChangePuller) storeMetricsLoop(ctx context.Context) {
for {
f.storeMetrics()
ticker := time.NewTicker(storeMetricsEvery)
defer ticker.Stop()

for {
select {
case <-time.After(storeMetricsEvery):
case <-ticker.C:
f.storeMetrics()
case <-ctx.Done():
f.logger.Debug().Message("context canceled, stopping metrics loop")
return
}
}
Expand Down
Loading