Skip to content

Commit

Permalink
cmd/bsky-webhook: add optional support for a secrets service
Browse files Browse the repository at this point in the history
- When --secrets-url is set, the program will attempt to fetch its webhook URL
  and app key from the specified setec server, overriding the values set on the
  command line. Include environment hooks.

- Plumb a context to handle signals.

- Plumb an HTTP client (currently only the default) so we can stub in a proxy.

- Update configuration docs in README.
  • Loading branch information
creachadair committed Nov 14, 2024
1 parent 2d72fd0 commit 632f63c
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 35 deletions.
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ environment variables. Those without defaults are required.

Here's the complete table based on the provided Go code:

| Command-line flag | Environment variable | Default value | Description |
| ------------------ | -------------------- | --------------------------------------- | ------------------------------------------------------------------------------------ |
| `-addr` | `JETSTREAM_ADDRESS` | Rotation of all public jetsream servers | The [jetstream](https://github.com/bluesky-social/jetstream) hostname to connect to. |
| `-bsky-handle` | `BSKY_HANDLE` | none | The Bluesky handle of the account that will make API requests. |
| `-bsky-app-password` | `BSKY_APP_PASSWORD` | none | The Bluesky app password for authentication. |
| `-slack-webhook-url` | `SLACK_WEBHOOK_URL` | none | The Slack webhook URL for sending notifications. |
| `-bsky-server-url` | `BSKY_SERVER_URL` | "https://bsky.network" | The Bluesky PDS server to send API requests to URL. |
| `-watch-word` | `WATCH_WORD` | "tailscale" | The word to watch out for; may support multiple words in the future. |

| Command-line flag | Environment variable | Default value | Description |
|----------------------|----------------------|-----------------------------------------|-------------------------------------------------------------------------|
| `-addr` | `JETSTREAM_ADDRESS` | Rotation of all public jetsream servers | The [jetstream][jetstream] hostname to connect to. |
| `-bsky-handle` | `BSKY_HANDLE` | none | The Bluesky handle of the account that will make API requests. |
| `-bsky-app-password` | `BSKY_APP_PASSWORD` | none | The Bluesky app password for authentication. |
| `-slack-webhook-url` | `SLACK_WEBHOOK_URL` | none | The Slack webhook URL for sending notifications. |
| `-bsky-server-url` | `BSKY_SERVER_URL` | "https://bsky.network" | The Bluesky PDS server to send API requests to URL. |
| `-watch-word` | `WATCH_WORD` | "tailscale" | The word to watch out for; may support multiple words in the future. |
| `-secrets-url` | `SECRETS_URL` | none | The address of a [setec][setec] server to fetch secrets from (optional) |
| `-secret-prefix` | `SECRET_PREFIX` | "" | A prefix to prepend to secret names fetched from setec (optional) |

[jetstream]: https://github.com/bluesky-social/jetstream
[setec]: https://github.com/tailscale/setec
75 changes: 54 additions & 21 deletions cmd/bsky-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ import (
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"
"time"

"github.com/bluesky-social/jetstream/pkg/models"
"github.com/gorilla/websocket"
"github.com/karalabe/go-bluesky"
bluesky "github.com/karalabe/go-bluesky"
"github.com/klauspost/compress/zstd"
"github.com/tailscale/setec/client/setec"
)

var (
Expand All @@ -36,6 +40,11 @@ var (
"https://bsky.network"), "bluesky PDS server URL")
watchWord = flag.String("watch-word", envOr("WATCH_WORD", "tailscale"),
"the word to watch out for. may be multiple words in future (required)")

secretsURL = flag.String("secrets-url", envOr("SECRETS_URL", ""),
"the URL of a secrets server (if empty, no server is used)")
secretPrefix = flag.String("secrets-prefix", envOr("SECRET_PREFIX", ""),
"the prefix to prepend to secret names fetched from --secrets-url")
)

// Public addresses of jetstream websocket services.
Expand All @@ -51,6 +60,10 @@ var jetstreams = []string{
// Only the DecodeAll method may be used.
var zstdDecoder *zstd.Decoder

// httpClient must be used for all HTTP requests. It is a variable so that it
// can be replaced with a proxy.
var httpClient = http.DefaultClient

func init() {
// Jetstream uses a custom zstd dictionary, so make sure we do the same.
var err error
Expand All @@ -65,20 +78,38 @@ func main() {
// TODO(creachadair): Usage text.

switch {
case *webhookURL == "":
case *webhookURL == "" && *secretsURL == "":
log.Fatal("missing slack webhook URL (SLACK_WEBHOOK_URL)")
case *bskyServerURL == "":
log.Fatal("missing Bluesky server URL (BSKY_SERVER_URL)")
case *bskyHandle == "":
log.Fatal("Missing Bluesky account handle (BSKY_HANDLE)")
case *bskyAppKey == "":
case *bskyAppKey == "" && *secretsURL == "":
log.Fatal("missing Bluesky app secret (BSKY_APP_PASSWORD)")
case *watchWord == "":
log.Fatal("missing watchword")
}

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if *secretsURL != "" {
webhookSecret := path.Join(*secretPrefix, "slack-webhook-url")
appKeySecret := path.Join(*bskyAppKey, "bluesky-app-key")
st, err := setec.NewStore(ctx, setec.StoreConfig{
Client: setec.Client{Server: *secretsURL, DoHTTP: httpClient.Do},
Secrets: []string{webhookSecret, appKeySecret},
})
if err != nil {
log.Fatalf("initialize secrets store: %v", err)
}
*webhookURL = st.Secret(webhookSecret).GetString()
*bskyAppKey = st.Secret(appKeySecret).GetString()
log.Printf("Fetched client secrets from %q", *secretsURL)
}

nextAddr := nextWSAddress()
for {
for ctx.Err() == nil {
wsURL := url.URL{
Scheme: "wss",
Host: nextAddr(),
Expand All @@ -87,7 +118,7 @@ func main() {
}
slog.Info("ws connecting", "url", wsURL.String())

err := websocketConnection(wsURL)
err := websocketConnection(ctx, wsURL)
slog.Error("ws connection", "url", wsURL, "err", err)

// TODO(erisa): exponential backoff
Expand Down Expand Up @@ -119,13 +150,12 @@ func nextWSAddress() func() string {
}
}

func websocketConnection(wsUrl url.URL) error {
func websocketConnection(ctx context.Context, wsUrl url.URL) error {
// add compression headers
headers := http.Header{}
headers.Add("Socket-Encoding", "zstd")

c, _, err := websocket.DefaultDialer.Dial(wsUrl.String(), headers)

if err != nil {
return fmt.Errorf("dial jetstream: %v", err)
}
Expand All @@ -135,9 +165,7 @@ func websocketConnection(wsUrl url.URL) error {
return nil
})

ctx := context.Background()

bsky, err := bluesky.Dial(ctx, *bskyServerURL)
bsky, err := bluesky.DialWithClient(ctx, *bskyServerURL, httpClient)
if err != nil {
log.Fatal("dial bsky: ", err)
}
Expand All @@ -148,7 +176,7 @@ func websocketConnection(wsUrl url.URL) error {
log.Fatal("login bsky: ", err)
}

for {
for ctx.Err() == nil {
// bail if we take too long for a read
c.SetReadDeadline(time.Now().Add(time.Second * 5))

Expand All @@ -157,15 +185,16 @@ func websocketConnection(wsUrl url.URL) error {
return err
}

err = readJetstreamMessage(jetstreamMessage, bsky)
err = readJetstreamMessage(ctx, jetstreamMessage, bsky)
if err != nil {
log.Println("error reading jetstream message: ", jetstreamMessage, err)
continue
}
}
return ctx.Err()
}

func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client) error {
func readJetstreamMessage(ctx context.Context, jetstreamMessageEncoded []byte, bsky *bluesky.Client) error {
// Decompress the message
m, err := zstdDecoder.DecodeAll(jetstreamMessageEncoded, nil)
if err != nil {
Expand All @@ -189,7 +218,7 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
jetstreamMessageStr := string(jetstreamMessage)

go func() {
profile, err := getBskyProfile(bskyMessage, bsky)
profile, err := getBskyProfile(ctx, bskyMessage, bsky)
if err != nil {
slog.Error("fetch profile", "err", err, "msg", jetstreamMessageStr)
return
Expand All @@ -201,7 +230,7 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
imageURL = fmt.Sprintf("https://cdn.bsky.app/img/feed_fullsize/plain/%s/%s", bskyMessage.Did, bskyMessage.Commit.Record.Embed.Images[0].Image.Ref.Link)
}

err = sendToSlack(jetstreamMessageStr, bskyMessage, imageURL, *profile)
err = sendToSlack(ctx, jetstreamMessageStr, bskyMessage, imageURL, *profile)
if err != nil {
slog.Error("slack error", "err", err)
}
Expand All @@ -211,8 +240,8 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
return nil
}

func getBskyProfile(bskyMessage BskyMessage, bsky *bluesky.Client) (*bluesky.Profile, error) {
profile, err := bsky.FetchProfile(context.Background(), bskyMessage.Did)
func getBskyProfile(ctx context.Context, bskyMessage BskyMessage, bsky *bluesky.Client) (*bluesky.Profile, error) {
profile, err := bsky.FetchProfile(ctx, bskyMessage.Did)
if err != nil {
return nil, err
}
Expand All @@ -225,7 +254,7 @@ func getBskyProfile(bskyMessage BskyMessage, bsky *bluesky.Client) (*bluesky.Pro
return profile, nil
}

func sendToSlack(jetstreamMessageStr string, bskyMessage BskyMessage, imageURL string, profile bluesky.Profile) error {
func sendToSlack(ctx context.Context, jetstreamMessageStr string, bskyMessage BskyMessage, imageURL string, profile bluesky.Profile) error {
attachments := []SlackAttachment{
{
AuthorName: fmt.Sprintf("%s (@%s)", profile.Name, profile.Handle),
Expand All @@ -246,20 +275,24 @@ func sendToSlack(jetstreamMessageStr string, bskyMessage BskyMessage, imageURL s
log.Printf("failed to marshal text: %v", err)

}
res, err := http.Post(*webhookURL, "application/json", bytes.NewBuffer(body))
req, err := http.NewRequestWithContext(ctx, "POST", *webhookURL, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := httpClient.Do(req)
if err != nil {
slog.Error("failed to post to slack", "msg", jetstreamMessageStr)
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
body, err := io.ReadAll(res.Body)
if err != nil {
slog.Error("bad error code from slack and fail to read body", "statusCode", res.StatusCode, "msg", jetstreamMessageStr)
return err
}
defer res.Body.Close()

slog.Error("error code response from slack", "statusCode", res.StatusCode, "responseBody", string(body), "msg", jetstreamMessageStr)
return fmt.Errorf("slack: %s %s", res.Status, string(body))
}
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
module github.com/tailscale/bsky-webhook

go 1.22.6
go 1.23

require (
github.com/bluesky-social/jetstream v0.0.0-20241031234625-0ab10bd041fe
github.com/gorilla/websocket v1.5.3
github.com/karalabe/go-bluesky v0.0.0-20230506152134-dd72fcf127a8
github.com/klauspost/compress v1.17.9
github.com/tailscale/setec v0.0.0-20241107175935-3954dc4aade5
)

require (
github.com/bluesky-social/indigo v0.0.0-20241008040750-06bacb465af7 // indirect
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down Expand Up @@ -55,9 +57,13 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go4.org/mem v0.0.0-20220726221520-4f986261bf13 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
lukechampine.com/blake3 v1.3.0 // indirect
tailscale.com v1.73.0-pre.0.20240822193108-696711cc17c4 // indirect
)
Loading

0 comments on commit 632f63c

Please sign in to comment.