Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/bsky-webhook: add optional support for a secrets service #3

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@ go run ./cmd/bsky-webhook/ -bskyHandle me.example.com -watchWord "pangolin"
## Configuration

These configuration options are available as command-line flags and
environment variables. Those without defaults are required.
environment variables. Those without defaults are required, unless
explicitly marked as optional.

Here's the complete table based on the provided Go code:

| Command-line flag | Environment variable | Default value | Description |
| ------------------ | -------------------- | --------------------------------------- | ------------------------------------------------------------------------------------ |
| `-addr` | `JETSTREAM_ADDRESS` | Rotation of all public jetsream servers | The [jetstream](https://github.com/bluesky-social/jetstream) hostname to connect to. |
| `-bsky-handle` | `BSKY_HANDLE` | none | The Bluesky handle of the account that will make API requests. |
| `-bsky-app-password` | `BSKY_APP_PASSWORD` | none | The Bluesky app password for authentication. |
| `-slack-webhook-url` | `SLACK_WEBHOOK_URL` | none | The Slack webhook URL for sending notifications. |
| `-bsky-server-url` | `BSKY_SERVER_URL` | "https://bsky.network" | The Bluesky PDS server to send API requests to URL. |
| `-watch-word` | `WATCH_WORD` | "tailscale" | The word to watch out for; may support multiple words in the future. |

| Command-line flag | Environment variable | Default value | Description |
|----------------------|----------------------|-----------------------------------------|-------------------------------------------------------------------------|
| `-addr` | `JETSTREAM_ADDRESS` | Rotation of all public jetsream servers | The [jetstream][jetstream] hostname to connect to. |
| `-bsky-handle` | `BSKY_HANDLE` | none | The Bluesky handle of the account that will make API requests. |
| `-bsky-app-password` | `BSKY_APP_PASSWORD` | none | The Bluesky app password for authentication. |
| `-slack-webhook-url` | `SLACK_WEBHOOK_URL` | none | The Slack webhook URL for sending notifications. |
| `-bsky-server-url` | `BSKY_SERVER_URL` | "https://bsky.social" | The Bluesky PDS server to send API requests to URL. |
| `-watch-word` | `WATCH_WORD` | "tailscale" | The word to watch out for; may support multiple words in the future. |
| `-secrets-url` | `SECRETS_URL` | none | The address of a [setec][setec] server to fetch secrets from (optional) |
| `-secrets-prefix` | `SECRETS_PREFIX` | "" | A prefix to prepend to secret names fetched from setec (optional) |

[jetstream]: https://github.com/bluesky-social/jetstream
[setec]: https://github.com/tailscale/setec
77 changes: 55 additions & 22 deletions cmd/bsky-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ import (
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"
"time"

"github.com/bluesky-social/jetstream/pkg/models"
"github.com/gorilla/websocket"
"github.com/karalabe/go-bluesky"
bluesky "github.com/karalabe/go-bluesky"
"github.com/klauspost/compress/zstd"
"github.com/tailscale/setec/client/setec"
)

var (
Expand All @@ -33,9 +37,14 @@ var (
webhookURL = flag.String("slack-webhook-url", envOr("SLACK_WEBHOOK_URL", ""),
"slack webhook URL (required)")
bskyServerURL = flag.String("bsky-server-url", envOr("BSKY_SERVER_URL",
"https://bsky.network"), "bluesky PDS server URL")
"https://bsky.social"), "bluesky PDS server URL")
watchWord = flag.String("watch-word", envOr("WATCH_WORD", "tailscale"),
"the word to watch out for. may be multiple words in future (required)")

secretsURL = flag.String("secrets-url", envOr("SECRETS_URL", ""),
"the URL of a secrets server (if empty, no server is used)")
secretsPrefix = flag.String("secrets-prefix", envOr("SECRETS_PREFIX", ""),
"the prefix to prepend to secret names fetched from --secrets-url")
)

// Public addresses of jetstream websocket services.
Expand All @@ -51,6 +60,10 @@ var jetstreams = []string{
// Only the DecodeAll method may be used.
var zstdDecoder *zstd.Decoder

// httpClient must be used for all HTTP requests. It is a variable so that it
// can be replaced with a proxy.
var httpClient = http.DefaultClient

func init() {
// Jetstream uses a custom zstd dictionary, so make sure we do the same.
var err error
Expand All @@ -65,20 +78,38 @@ func main() {
// TODO(creachadair): Usage text.

switch {
case *webhookURL == "":
case *webhookURL == "" && *secretsURL == "":
log.Fatal("missing slack webhook URL (SLACK_WEBHOOK_URL)")
case *bskyServerURL == "":
log.Fatal("missing Bluesky server URL (BSKY_SERVER_URL)")
case *bskyHandle == "":
log.Fatal("Missing Bluesky account handle (BSKY_HANDLE)")
case *bskyAppKey == "":
case *bskyAppKey == "" && *secretsURL == "":
log.Fatal("missing Bluesky app secret (BSKY_APP_PASSWORD)")
case *watchWord == "":
log.Fatal("missing watchword")
}

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if *secretsURL != "" {
webhookSecret := path.Join(*secretsPrefix, "slack-webhook-url")
appKeySecret := path.Join(*secretsPrefix, "bluesky-app-key")
st, err := setec.NewStore(ctx, setec.StoreConfig{
Client: setec.Client{Server: *secretsURL, DoHTTP: httpClient.Do},
Secrets: []string{webhookSecret, appKeySecret},
})
if err != nil {
log.Fatalf("initialize secrets store: %v", err)
}
*webhookURL = st.Secret(webhookSecret).GetString()
*bskyAppKey = st.Secret(appKeySecret).GetString()
log.Printf("Fetched client secrets from %q", *secretsURL)
}

nextAddr := nextWSAddress()
for {
for ctx.Err() == nil {
wsURL := url.URL{
Scheme: "wss",
Host: nextAddr(),
Expand All @@ -87,7 +118,7 @@ func main() {
}
slog.Info("ws connecting", "url", wsURL.String())

err := websocketConnection(wsURL)
err := websocketConnection(ctx, wsURL)
slog.Error("ws connection", "url", wsURL, "err", err)

// TODO(erisa): exponential backoff
Expand Down Expand Up @@ -119,13 +150,12 @@ func nextWSAddress() func() string {
}
}

func websocketConnection(wsUrl url.URL) error {
func websocketConnection(ctx context.Context, wsUrl url.URL) error {
// add compression headers
headers := http.Header{}
headers.Add("Socket-Encoding", "zstd")

c, _, err := websocket.DefaultDialer.Dial(wsUrl.String(), headers)

if err != nil {
return fmt.Errorf("dial jetstream: %v", err)
}
Expand All @@ -135,9 +165,7 @@ func websocketConnection(wsUrl url.URL) error {
return nil
})

ctx := context.Background()

bsky, err := bluesky.Dial(ctx, *bskyServerURL)
bsky, err := bluesky.DialWithClient(ctx, *bskyServerURL, httpClient)
if err != nil {
log.Fatal("dial bsky: ", err)
}
Expand All @@ -148,7 +176,7 @@ func websocketConnection(wsUrl url.URL) error {
log.Fatal("login bsky: ", err)
}

for {
for ctx.Err() == nil {
// bail if we take too long for a read
c.SetReadDeadline(time.Now().Add(time.Second * 5))

Expand All @@ -157,15 +185,16 @@ func websocketConnection(wsUrl url.URL) error {
return err
}

err = readJetstreamMessage(jetstreamMessage, bsky)
err = readJetstreamMessage(ctx, jetstreamMessage, bsky)
if err != nil {
log.Println("error reading jetstream message: ", jetstreamMessage, err)
continue
}
}
return ctx.Err()
}

func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client) error {
func readJetstreamMessage(ctx context.Context, jetstreamMessageEncoded []byte, bsky *bluesky.Client) error {
// Decompress the message
m, err := zstdDecoder.DecodeAll(jetstreamMessageEncoded, nil)
if err != nil {
Expand All @@ -189,7 +218,7 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
jetstreamMessageStr := string(jetstreamMessage)

go func() {
profile, err := getBskyProfile(bskyMessage, bsky)
profile, err := getBskyProfile(ctx, bskyMessage, bsky)
if err != nil {
slog.Error("fetch profile", "err", err, "msg", jetstreamMessageStr)
return
Expand All @@ -201,7 +230,7 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
imageURL = fmt.Sprintf("https://cdn.bsky.app/img/feed_fullsize/plain/%s/%s", bskyMessage.Did, bskyMessage.Commit.Record.Embed.Images[0].Image.Ref.Link)
}

err = sendToSlack(jetstreamMessageStr, bskyMessage, imageURL, *profile)
err = sendToSlack(ctx, jetstreamMessageStr, bskyMessage, imageURL, *profile)
if err != nil {
slog.Error("slack error", "err", err)
}
Expand All @@ -211,8 +240,8 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
return nil
}

func getBskyProfile(bskyMessage BskyMessage, bsky *bluesky.Client) (*bluesky.Profile, error) {
profile, err := bsky.FetchProfile(context.Background(), bskyMessage.Did)
func getBskyProfile(ctx context.Context, bskyMessage BskyMessage, bsky *bluesky.Client) (*bluesky.Profile, error) {
profile, err := bsky.FetchProfile(ctx, bskyMessage.Did)
if err != nil {
return nil, err
}
Expand All @@ -225,7 +254,7 @@ func getBskyProfile(bskyMessage BskyMessage, bsky *bluesky.Client) (*bluesky.Pro
return profile, nil
}

func sendToSlack(jetstreamMessageStr string, bskyMessage BskyMessage, imageURL string, profile bluesky.Profile) error {
func sendToSlack(ctx context.Context, jetstreamMessageStr string, bskyMessage BskyMessage, imageURL string, profile bluesky.Profile) error {
attachments := []SlackAttachment{
{
AuthorName: fmt.Sprintf("%s (@%s)", profile.Name, profile.Handle),
Expand All @@ -246,20 +275,24 @@ func sendToSlack(jetstreamMessageStr string, bskyMessage BskyMessage, imageURL s
log.Printf("failed to marshal text: %v", err)

}
res, err := http.Post(*webhookURL, "application/json", bytes.NewBuffer(body))
req, err := http.NewRequestWithContext(ctx, "POST", *webhookURL, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := httpClient.Do(req)
if err != nil {
slog.Error("failed to post to slack", "msg", jetstreamMessageStr)
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
body, err := io.ReadAll(res.Body)
if err != nil {
slog.Error("bad error code from slack and fail to read body", "statusCode", res.StatusCode, "msg", jetstreamMessageStr)
return err
}
defer res.Body.Close()

slog.Error("error code response from slack", "statusCode", res.StatusCode, "responseBody", string(body), "msg", jetstreamMessageStr)
return fmt.Errorf("slack: %s %s", res.Status, string(body))
}
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
module github.com/tailscale/bsky-webhook

go 1.22.6
go 1.23

require (
github.com/bluesky-social/jetstream v0.0.0-20241031234625-0ab10bd041fe
github.com/gorilla/websocket v1.5.3
github.com/karalabe/go-bluesky v0.0.0-20230506152134-dd72fcf127a8
github.com/klauspost/compress v1.17.9
github.com/tailscale/setec v0.0.0-20241107175935-3954dc4aade5
)

require (
github.com/bluesky-social/indigo v0.0.0-20241008040750-06bacb465af7 // indirect
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down Expand Up @@ -55,9 +57,13 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go4.org/mem v0.0.0-20220726221520-4f986261bf13 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
lukechampine.com/blake3 v1.3.0 // indirect
tailscale.com v1.73.0-pre.0.20240822193108-696711cc17c4 // indirect
)
Loading