Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cluster/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2025 Dima Krasner

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"log/slog"
"os"

"github.com/dimkr/tootik/logcontext"
)

func init() {
opts := slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
}

slog.SetDefault(slog.New(logcontext.NewHandler(slog.NewJSONHandler(os.Stderr, &opts))))
}
27 changes: 15 additions & 12 deletions cmd/tootik/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/dimkr/tootik/httpsig"
"github.com/dimkr/tootik/icon"
"github.com/dimkr/tootik/inbox"
"github.com/dimkr/tootik/logcontext"
"github.com/dimkr/tootik/migrations"
"github.com/dimkr/tootik/outbox"
_ "github.com/mattn/go-sqlite3"
Expand Down Expand Up @@ -141,7 +142,7 @@ func main() {
opts.AddSource = true
}

slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &opts)))
slog.SetDefault(slog.New(logcontext.NewHandler(slog.NewJSONHandler(os.Stderr, &opts))))
slog.SetLogLoggerLevel(slog.Level(*logLevel))

var blockList *fed.BlockList
Expand Down Expand Up @@ -188,7 +189,7 @@ func main() {
wg.Go(func() {
select {
case <-sigs:
slog.Info("Received termination signal")
slog.InfoContext(ctx, "Received termination signal")
cancel()
case <-ctx.Done():
}
Expand Down Expand Up @@ -385,8 +386,8 @@ func main() {
},
} {
wg.Go(func() {
if err := svc.Listener.ListenAndServe(ctx); err != nil {
slog.Error("Listener has failed", "listener", svc.Name, "error", err)
if err := svc.Listener.ListenAndServe(logcontext.Add(ctx, "listener", svc.Name)); err != nil {
slog.ErrorContext(ctx, "Listener has failed", "listener", svc.Name, "error", err)
}
cancel()
})
Expand Down Expand Up @@ -419,8 +420,8 @@ func main() {
},
} {
wg.Go(func() {
if err := queue.Queue.Process(ctx); err != nil {
slog.Error("Failed to process queue", "queue", queue.Name, "error", err)
if err := queue.Queue.Process(logcontext.Add(ctx, "queue", queue.Name)); err != nil {
slog.ErrorContext(ctx, "Failed to process queue", "queue", queue.Name, "error", err)
}
cancel()
})
Expand Down Expand Up @@ -498,17 +499,19 @@ func main() {
t := time.NewTicker(job.Interval)
defer t.Stop()

jobCtx := logcontext.Add(ctx, "job", job.Name)

for {
slog.Info("Running periodic job", "job", job.Name)
slog.InfoContext(ctx, "Running periodic job", "job", job.Name)
start := time.Now()
if err := job.Runner.Run(ctx); err != nil {
slog.Error("Periodic job has failed", "job", job.Name, "error", err)
if err := job.Runner.Run(jobCtx); err != nil {
slog.ErrorContext(ctx, "Periodic job has failed", "job", job.Name, "error", err)
break
}
slog.Info("Done running periodic job", "job", job.Name, "duration", time.Since(start).String())
slog.InfoContext(ctx, "Done running periodic job", "job", job.Name, "duration", time.Since(start).String())

select {
case <-ctx.Done():
case <-jobCtx.Done():
return

case <-t.C:
Expand All @@ -518,6 +521,6 @@ func main() {
}

<-ctx.Done()
slog.Info("Shutting down")
slog.InfoContext(ctx, "Shutting down")
wg.Wait()
}
6 changes: 3 additions & 3 deletions fed/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ import (
func (l *Listener) handleActivity(w http.ResponseWriter, r *http.Request, prefix string) {
activityID := fmt.Sprintf("https://%s/%s/%s", l.Domain, prefix, r.PathValue("hash"))

slog.Info("Fetching activity", "activity", activityID)
slog.InfoContext(r.Context(), "Fetching activity", "activity", activityID)

var raw string
var activity ap.Activity
if err := l.DB.QueryRowContext(r.Context(), `select json(activity), json(activity) as raw from outbox where cid = ?`, activityID).Scan(&raw, &activity); errors.Is(err, sql.ErrNoRows) {
w.WriteHeader(http.StatusNotFound)
return
} else if err != nil {
slog.Warn("Failed to fetch activity", "activity", activityID, "error", err)
slog.WarnContext(r.Context(), "Failed to fetch activity", "activity", activityID, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if !activity.IsPublic() {
slog.Warn("Refused attempt to fetch a non-public activity", "activity", activityID)
slog.WarnContext(r.Context(), "Refused attempt to fetch a non-public activity", "activity", activityID)
w.WriteHeader(http.StatusNotFound)
return
}
Expand Down
30 changes: 15 additions & 15 deletions fed/apgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (l *Listener) handleAPGatewayPost(w http.ResponseWriter, r *http.Request) {

m := inboxRegex.FindStringSubmatch(resource)
if m == nil {
slog.Info("Invalid resource", "resource", resource)
slog.InfoContext(r.Context(), "Invalid resource", "resource", resource)
w.WriteHeader(http.StatusNotFound)
return
}
Expand All @@ -52,25 +52,25 @@ func (l *Listener) handleAPGatewayPost(w http.ResponseWriter, r *http.Request) {
var actor ap.Actor
var rsaPrivKeyPem, ed25519PrivKeyMultibase string
if err := l.DB.QueryRowContext(r.Context(), `select json(actor), rsaprivkey, ed25519privkey from persons where cid = ? and ed25519privkey is not null`, receiver).Scan(&actor, &rsaPrivKeyPem, &ed25519PrivKeyMultibase); errors.Is(err, sql.ErrNoRows) {
slog.Debug("Receiving user does not exist", "receiver", receiver)
slog.DebugContext(r.Context(), "Receiving user does not exist", "receiver", receiver)
w.WriteHeader(http.StatusNotFound)
return
} else if err != nil {
slog.Warn("Failed to check if receiving user exists", "receiver", receiver, "error", err)
slog.WarnContext(r.Context(), "Failed to check if receiving user exists", "receiver", receiver, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

rsaPrivKey, err := data.ParseRSAPrivateKey(rsaPrivKeyPem)
if err != nil {
slog.Warn("Failed to parse RSA private key", "receiver", receiver, "error", err)
slog.WarnContext(r.Context(), "Failed to parse RSA private key", "receiver", receiver, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

ed25519PrivKey, err := data.DecodeEd25519PrivateKey(ed25519PrivKeyMultibase)
if err != nil {
slog.Warn("Failed to decode Ed25519 private key", "receiver", receiver, "error", err)
slog.WarnContext(r.Context(), "Failed to decode Ed25519 private key", "receiver", receiver, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -84,7 +84,7 @@ func (l *Listener) handleAPGatewayPost(w http.ResponseWriter, r *http.Request) {
func (l *Listener) handleApGatewayFollowers(w http.ResponseWriter, r *http.Request, did string) {
_, sender, err := l.verifyRequest(r, nil, ap.InstanceActor, l.ActorKeys)
if err != nil {
slog.Warn("Failed to verify followers request", "error", err)
slog.WarnContext(r.Context(), "Failed to verify followers request", "error", err)
w.WriteHeader(http.StatusUnauthorized)
return
}
Expand All @@ -97,7 +97,7 @@ func (l *Listener) handleApGatewayFollowers(w http.ResponseWriter, r *http.Reque

rows, err := l.DB.QueryContext(r.Context(), `SELECT follower FROM follows WHERE followed = 'https://' || ? || '/.well-known/apgateway/' || ? || '/actor' AND follower LIKE 'https://' || ? || '/%' AND accepted = 1`, l.Domain, did, u.Host)
if err != nil {
slog.Warn("Failed to fetch followers", "did", did, "host", u.Host, "error", err)
slog.WarnContext(r.Context(), "Failed to fetch followers", "did", did, "host", u.Host, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -107,7 +107,7 @@ func (l *Listener) handleApGatewayFollowers(w http.ResponseWriter, r *http.Reque
for rows.Next() {
var follower string
if err := rows.Scan(&follower); err != nil {
slog.Warn("Failed to fetch followers", "did", did, "host", u.Host, "error", err)
slog.WarnContext(r.Context(), "Failed to fetch followers", "did", did, "host", u.Host, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -116,7 +116,7 @@ func (l *Listener) handleApGatewayFollowers(w http.ResponseWriter, r *http.Reque
defer rows.Close()

if err := rows.Err(); err != nil {
slog.Warn("Failed to fetch followers", "did", did, "host", u.Host, "error", err)
slog.WarnContext(r.Context(), "Failed to fetch followers", "did", did, "host", u.Host, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -128,12 +128,12 @@ func (l *Listener) handleApGatewayFollowers(w http.ResponseWriter, r *http.Reque
"orderedItems": items,
})
if err != nil {
slog.Warn("Failed to fetch followers", "did", did, "host", u.Host, "error", err)
slog.WarnContext(r.Context(), "Failed to fetch followers", "did", did, "host", u.Host, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

slog.Info("Received followers request", "sender", sender.ID, "did", did, "host", u.Host, "count", len(items.OrderedMap))
slog.InfoContext(r.Context(), "Received followers request", "sender", sender.ID, "did", did, "host", u.Host, "count", len(items.OrderedMap))

w.Header().Set("Content-Type", `application/activity+json; charset=utf-8`)
w.Write([]byte(collection))
Expand All @@ -149,14 +149,14 @@ func (l *Listener) handleAPGatewayGet(w http.ResponseWriter, r *http.Request) {

id := portableObjectRegex.FindString(resource)
if id == "" {
slog.Info("Invalid resource", "resource", resource)
slog.InfoContext(r.Context(), "Invalid resource", "resource", resource)
w.WriteHeader(http.StatusNotFound)
return
}

id = "ap://" + id

slog.Info("Fetching object", "id", id)
slog.InfoContext(r.Context(), "Fetching object", "id", id)

var raw string
if err := l.DB.QueryRowContext(
Expand All @@ -180,11 +180,11 @@ func (l *Listener) handleAPGatewayGet(w http.ResponseWriter, r *http.Request) {
id,
ap.Public,
).Scan(&raw); errors.Is(err, sql.ErrNoRows) {
slog.Info("Notifying about missing object", "id", id)
slog.InfoContext(r.Context(), "Notifying about missing object", "id", id)
w.WriteHeader(http.StatusNotFound)
return
} else if err != nil {
slog.Warn("Failed to fetch object", "id", id, "error", err)
slog.WarnContext(r.Context(), "Failed to fetch object", "id", id, "error", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
Loading
Loading