diff --git a/README.md b/README.md index d723897..8f6063f 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,9 @@ Call a remote API on ODK Central database events: ## Usage +The `odkhook` tool is a service that runs continually, monitoring the +ODK Central database for updates and triggering the webhook as appropriate. + ### Binary Download the binary for your platform from the @@ -17,24 +20,24 @@ Then run with: ```bash ./odkhook \ -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ - -webhook 'https://your.domain.com/some/webhook' + -entityUrl 'https://your.domain.com/some/webhook' \ + -submissionUrl 'https://your.domain.com/some/webhook' ``` > [!TIP] -> By default both Entity editing and new submissions trigger the webhook. -> -> Use the -trigger flag to modify this behaviour. +> It's possible to specify a webhook for only Entities or Submissions, or both. ### Docker ```bash docker run -d ghcr.io/hotosm/odk-webhook:latest \ -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ - -webhook 'https://your.domain.com/some/webhook' + -entityUrl 'https://your.domain.com/some/webhook' \ + -submissionUrl 'https://your.domain.com/some/webhook' ``` > [!NOTE] -> Alternatively, add to your docker compose stack. +> Alternatively, add the service to your docker compose stack. ### Code @@ -64,13 +67,14 @@ err = SetupWebhook( log, ctx, dbPool, - "https://your.domain.com/some/webhook", - map[string]bool{ - "entity.update.version": true, - "submission.create": true, - }, + "https://your.domain.com/some/entity/webhook", + "https://your.domain.com/some/submission/webhook", ) if err != nil { fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err) } ``` + +> [!NOTE] +> To not provide a webhook for either entities or submissions, +> pass `nil` instead. diff --git a/compose.yml b/compose.yml index 724f862..6e357d7 100644 --- a/compose.yml +++ b/compose.yml @@ -15,15 +15,24 @@ services: - ./go.mod:/app/go.mod:ro - ./go.sum:/app/go.sum:ro - ./main.go:/app/main.go:ro + - ./main_test.go:/app/main_test.go:ro - ./db:/app/db:ro - ./webhook:/app/webhook:ro + - ./parser:/app/parser:ro + # environment: + # # Override to use database on host + # ODK_WEBHOOK_DB_URI: postgresql://odk:odk@host.docker.internal:5434/odk?sslmode=disable + # ODK_WEBHOOK_WEBHOOK_URL: depends_on: db: condition: service_healthy networks: - net + # This allows usage of services running directly on the host machine + extra_hosts: + - host.docker.internal:host-gateway restart: "no" - entrypoint: go test -v ./... + entrypoint: go test -v . db: image: "postgis/postgis:17-3.5-alpine" diff --git a/db/notifier_test.go b/db/notifier_test.go index 66c2b90..6390a11 100644 --- a/db/notifier_test.go +++ b/db/notifier_test.go @@ -10,21 +10,25 @@ import ( "github.com/matryer/is" ) -// NB: these tests assume you have a postgres server listening on localhost:5432 -// with username postgres and password postgres. You can trivially set this up -// with Docker with the following: +// Note: these tests assume you have a postgres server listening on db:5432 +// with username odk and password odk. // -// docker run --rm --name postgres -p 5432:5432 \ -// -e POSTGRES_PASSWORD=postgres postgres +// The easiest way to ensure this is to run the tests with docker compose: +// docker compose run --rm odkhook func TestNotifier(t *testing.T) { - dbUri := "postgresql://odk:odk@db:5432/odkhook?sslmode=disable" + dbUri := os.Getenv("ODK_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + // Default + dbUri = "postgresql://odk:odk@db:5432/odkhook?sslmode=disable" + } is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx := context.Background() ctx, cancel := context.WithCancel(ctx) wg := sync.WaitGroup{} + pool, err := InitPool(ctx, log, dbUri) is.NoErr(err) diff --git a/db/trigger.go b/db/trigger.go index 9cc3e89..cdf6b22 100644 --- a/db/trigger.go +++ b/db/trigger.go @@ -8,28 +8,68 @@ import ( ) // Example parsed JSON -// {"action":"entity.update.version","actorId":1,"details":{"var1":"test"},"dml_action":"INSERT"}} +// {"action":"entity.update.version","actorId":1,"details":{"entityDefId":1001,...},"dml_action":"INSERT"}} func CreateTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string) error { // This trigger runs on the `audits` table by default, and creates a new event // in the odk-events queue when a new event is created in the table if tableName == "" { - tableName = "audits" // default table + // default table (this is configurable for easier tests mainly) + tableName = "audits" } // SQL for creating the function createFunctionSQL := ` CREATE OR REPLACE FUNCTION new_audit_log() RETURNS trigger AS $$ - DECLARE - js jsonb; - BEGIN - SELECT to_jsonb(NEW.*) INTO js; - js := jsonb_set(js, '{dml_action}', to_jsonb(TG_OP)); - PERFORM pg_notify('odk-events', js::text); - RETURN NEW; - END; + DECLARE + js jsonb; + action_type text; + result_data jsonb; + BEGIN + -- Serialize the NEW row into JSONB + SELECT to_jsonb(NEW.*) INTO js; + + -- Add the DML action (INSERT/UPDATE) + js := jsonb_set(js, '{dml_action}', to_jsonb(TG_OP)); + + -- Extract the action type from the NEW row + action_type := NEW.action; + + -- Handle different action types with a CASE statement + CASE action_type + WHEN 'entity.update.version' THEN + SELECT entity_defs.data + INTO result_data + FROM entity_defs + WHERE entity_defs.id = (NEW.details->>'entityDefId')::int; + + -- Merge the additional data into the original JSON + js := jsonb_set(js, '{data}', result_data, true); + + -- Notify the odk-events queue + PERFORM pg_notify('odk-events', js::text); + + WHEN 'submission.create' THEN + SELECT jsonb_build_object('xml', submission_defs.xml) + INTO result_data + FROM submission_defs + WHERE submission_defs.id = (NEW.details->>'submissionDefId')::int; + + -- Merge the additional data into the original JSON + js := jsonb_set(js, '{data}', result_data, true); + + -- Notify the odk-events queue + PERFORM pg_notify('odk-events', js::text); + + ELSE + -- Skip pg_notify for unsupported actions & insert as normal + RETURN NEW; + END CASE; + + RETURN NEW; + END; $$ LANGUAGE 'plpgsql'; ` diff --git a/db/trigger_test.go b/db/trigger_test.go index 594f0ab..f010d4f 100644 --- a/db/trigger_test.go +++ b/db/trigger_test.go @@ -7,19 +7,164 @@ import ( "os" "sync" "testing" + "time" "github.com/matryer/is" ) -// NB: these tests assume you have a postgres server listening on localhost:5432 -// with username postgres and password postgres. You can trivially set this up -// with Docker with the following: +// Note: these tests assume you have a postgres server listening on db:5432 +// with username odk and password odk. // -// docker run --rm --name postgres -p 5432:5432 \ -// -e POSTGRES_PASSWORD=postgres postgres +// The easiest way to ensure this is to run the tests with docker compose: +// docker compose run --rm odkhook -func TestTrigger(t *testing.T) { - dbUri := "postgresql://odk:odk@db:5432/odkhook?sslmode=disable" +func TestEntityTrigger(t *testing.T) { + dbUri := os.Getenv("ODK_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + // Default + dbUri = "postgresql://odk:odk@db:5432/odkhook?sslmode=disable" + } + + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + pool, err := InitPool(ctx, log, dbUri) + is.NoErr(err) + + // Get connection and defer close + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Create entity_defs table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) + is.NoErr(err) + entityTableCreateSql := ` + CREATE TABLE entity_defs ( + id int4, + "entityId" int4, + "createdAt" timestamptz, + "current" bool, + "data" jsonb, + "creatorId" int4, + "label" text + ); + ` + _, err = conn.Exec(ctx, entityTableCreateSql) + is.NoErr(err) + + // Create audits_test table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) + is.NoErr(err) + auditTableCreateSql := ` + CREATE TABLE audits_test ( + "actorId" int, + action varchar, + details jsonb + ); + ` + _, err = conn.Exec(ctx, auditTableCreateSql) + is.NoErr(err) + + // Insert an entity record + entityInsertSql := ` + INSERT INTO public.entity_defs ( + id, "entityId","createdAt","current","data","creatorId","label" + ) VALUES ( + 1001, + 900, + '2025-01-10 16:23:40.073', + true, + '{"status": "0", "task_id": "26", "version": "1"}', + 5, + 'Task 26 Feature 904487737' + ); + ` + _, err = conn.Exec(ctx, entityInsertSql) + is.NoErr(err) + + // Create audit trigger + err = CreateTrigger(ctx, pool, "audits_test") + is.NoErr(err) + + // Create listener + listener := NewListener(pool) + err = listener.Connect(ctx) + is.NoErr(err) + + // Create notifier + n := NewNotifier(log, listener) + wg.Add(1) + go func() { + n.Run(ctx) + wg.Done() + }() + sub := n.Listen("odk-events") + + // Insert an audit record + auditInsertSql := ` + INSERT INTO audits_test ("actorId", action, details) + VALUES (1, 'entity.update.version', '{"entityDefId": 1001}'); + ` + _, err = conn.Exec(ctx, auditInsertSql) + is.NoErr(err) + + // Validate the notification content + wg.Add(1) + out := make(chan string) + go func() { + <-sub.EstablishedC() + msg := <-sub.NotificationC() // Get the notification + + log.Info("Notification received", "raw", msg) + + out <- string(msg) // Send it to the output channel + close(out) + wg.Done() + }() + + // Process the notification + var notification map[string]interface{} + for msg := range out { + err := json.Unmarshal([]byte(msg), ¬ification) + is.NoErr(err) // Ensure the JSON payload is valid + log.Info("Parsed notification", "notification", notification) + } + + // Validate the JSON content + is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct + is.Equal(notification["action"], "entity.update.version") // Ensure action is correct + is.True(notification["details"] != nil) // Ensure details key exists + is.True(notification["data"] != nil) // Ensure data key exists + + // Check nested JSON value for entityDefId in details + details, ok := notification["details"].(map[string]interface{}) + is.True(ok) // Ensure details is a valid map + is.Equal(details["entityDefId"], float64(1001)) // Ensure entityDefId has the correct value + + // Check nested JSON value for status in data + data, ok := notification["data"].(map[string]interface{}) + is.True(ok) // Ensure data is a valid map + is.Equal(data["status"], "0") // Ensure `status` has the correct value + + // Cleanup + conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) + conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) + cancel() + sub.Unlisten(ctx) // uses background ctx anyway + listener.Close(ctx) + wg.Wait() +} + +// Test a new submission event type +func TestSubmissionTrigger(t *testing.T) { + dbUri := os.Getenv("ODK_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + // Default + dbUri = "postgresql://odk:odk@db:5432/odkhook?sslmode=disable" + } is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) @@ -34,16 +179,55 @@ func TestTrigger(t *testing.T) { is.NoErr(err) defer conn.Release() + // Create submission_defs table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs;`) + is.NoErr(err) + submissionTableCreateSql := ` + CREATE TABLE submission_defs ( + id int4, + "submissionId" int4, + xml text, + "formDefId" int4, + "submitterId" int4, + "createdAt" timestamptz + ); + ` + _, err = conn.Exec(ctx, submissionTableCreateSql) + is.NoErr(err) + // Create audits_test table - auditCreateSql := ` + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) + is.NoErr(err) + auditTableCreateSql := ` CREATE TABLE audits_test ( "actorId" int, action varchar, details jsonb ); ` - // Ignore if table already exists - conn.Exec(ctx, auditCreateSql) + _, err = conn.Exec(ctx, auditTableCreateSql) + is.NoErr(err) + + // Insert an submission record + submissionInsertSql := ` + INSERT INTO submission_defs ( + id, + "submissionId", + xml, + "formDefId", + "submitterId", + "createdAt" + ) VALUES ( + 1, + 2, + '', + 7, + 5, + '2025-01-10 16:23:40.073' + ); + ` + _, err = conn.Exec(ctx, submissionInsertSql) + is.NoErr(err) // Create audit trigger err = CreateTrigger(ctx, pool, "audits_test") @@ -66,7 +250,7 @@ func TestTrigger(t *testing.T) { // Insert an audit record auditInsertSql := ` INSERT INTO audits_test ("actorId", action, details) - VALUES (1, 'entity.update.version', '{"var1": "test"}'); + VALUES (5, 'submission.create', '{"submissionDefId": 1}'); ` _, err = conn.Exec(ctx, auditInsertSql) is.NoErr(err) @@ -94,18 +278,112 @@ func TestTrigger(t *testing.T) { } // Validate the JSON content - is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct - is.True(notification["details"] != nil) // Ensure details key exists + is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct + is.Equal(notification["action"], "submission.create") // Ensure action is correct + is.True(notification["details"] != nil) // Ensure details key exists + is.True(notification["data"] != nil) // Ensure data key exists - // Check nested JSON value + // Check nested JSON value for submissionDefId in details details, ok := notification["details"].(map[string]interface{}) - is.True(ok) // Ensure details is a valid map - is.Equal(details["var1"], "test") // Ensure var1 has the correct value + is.True(ok) // Ensure details is a valid map + is.Equal(details["submissionDefId"], float64(1)) // Ensure submissionDefId has the correct value + + // Check nested JSON value for status in data + data, ok := notification["data"].(map[string]interface{}) + is.True(ok) // Ensure data is a valid map + is.Equal(data["xml"], ``) // Ensure `xml` has the correct value // Cleanup + conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs;`) conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) cancel() sub.Unlisten(ctx) // uses background ctx anyway listener.Close(ctx) wg.Wait() } + +// Test an unsupported event type and ensure nothing is triggered +func TestNoTrigger(t *testing.T) { + dbUri := os.Getenv("ODK_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + // Default + dbUri = "postgresql://odk:odk@db:5432/odkhook?sslmode=disable" + } + + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + pool, err := InitPool(ctx, log, dbUri) + is.NoErr(err) + + // Get connection and defer close + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Create audits_test table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) + is.NoErr(err) + auditTableCreateSql := ` + CREATE TABLE audits_test ( + "actorId" int, + action varchar, + details jsonb + ); + ` + _, err = conn.Exec(ctx, auditTableCreateSql) + is.NoErr(err) + + // Create audit trigger + err = CreateTrigger(ctx, pool, "audits_test") + is.NoErr(err) + + // Create listener + listener := NewListener(pool) + err = listener.Connect(ctx) + is.NoErr(err) + + // Create notifier + n := NewNotifier(log, listener) + go func() { + n.Run(ctx) + }() + sub := n.Listen("odk-events") + + // Insert an audit record + auditInsertSql := ` + INSERT INTO audits_test ("actorId", action, details) + VALUES (1, 'invalid.event', '{"submissionDefId": 5}'); + ` + _, err = conn.Exec(ctx, auditInsertSql) + is.NoErr(err) + + // Ensure that no event is fired for incorrect event type + out := make(chan string) + go func() { + <-sub.EstablishedC() + msg := <-sub.NotificationC() // Get the notification + + log.Info("Notification received", "raw", msg) + + out <- string(msg) // Send it to the output channel + close(out) + }() + + // Validate that no event was triggered for invalid event type + select { + case msg := <-out: + // If a message was received, we failed the test since no event should be fired + t.Fatalf("Unexpected message received: %s", msg) + case <-time.After(1 * time.Second): + // No message should have been received within the timeout + log.Info("No event triggered for invalid event type") + } + + // Cleanup + conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) + cancel() + sub.Unlisten(ctx) // uses background ctx anyway + listener.Close(ctx) +} diff --git a/main.go b/main.go index fa8c7c3..aa92b46 100644 --- a/main.go +++ b/main.go @@ -1,4 +1,4 @@ -// Wrapper for the tool functionality +// Wrapper for the main tool functionality package main @@ -8,12 +8,15 @@ import ( "fmt" "log/slog" "os" + "os/signal" "path/filepath" "strings" + "syscall" "github.com/jackc/pgx/v5/pgxpool" "github.com/hotosm/odk-webhook/db" + "github.com/hotosm/odk-webhook/parser" "github.com/hotosm/odk-webhook/webhook" ) @@ -34,17 +37,11 @@ func getDefaultLogger(lvl slog.Level) *slog.Logger { })) } -// triggerableActions options: -// - entity.update.version (entity edit) -// - submission.create (submission creation) -// -// See ODK docs for all options func SetupWebhook( log *slog.Logger, ctx context.Context, dbPool *pgxpool.Pool, - webhookUrl string, - triggerableActions map[string]bool, + entityUrl, submissionUrl string, ) error { // setup the listener listener := db.NewListener(dbPool) @@ -54,7 +51,7 @@ func SetupWebhook( } // init the trigger function - db.CreateTrigger(ctx, dbPool, "odk-events") + db.CreateTrigger(ctx, dbPool, "audits") // setup the notifier notifier := db.NewNotifier(log, listener) @@ -69,26 +66,24 @@ func SetupWebhook( <-sub.EstablishedC() for { select { - case <-ctx.Done(): sub.Unlisten(ctx) log.Info("done listening for notifications") return case data := <-sub.NotificationC(): - dataString := string(data) - log.Debug("got notification: %s \n", "data", dataString) + eventData := string(data) + log.Debug("got notification", "data", eventData) - parsedData, err := webhook.ParseEventJson(log, ctx, []byte(data)) + parsedData, err := parser.ParseEventJson(log, ctx, []byte(eventData)) if err != nil { log.Error("Failed to parse notification", "error", err) continue // Skip processing this notification } - if triggerableActions[parsedData.Action] { - webhook.SendRequest(log, ctx, webhookUrl, *parsedData) - } else { - log.Debug("Event type is not set to trigger webhook", "type", parsedData.Action) + if parsedData != nil { + // Only send the request for correctly parsed (supported) events + webhook.SendRequest(log, ctx, entityUrl, *parsedData) } } } @@ -100,30 +95,22 @@ func SetupWebhook( // sub.Unlisten(ctx) // }() - select {} -} + stopCtx, cancel := context.WithCancel(ctx) + defer cancel() -func parseTriggerFlag(trigger string) (map[string]bool, error) { - trigger = strings.ToLower(strings.TrimSpace(trigger)) - triggerableActions := make(map[string]bool) - - switch trigger { - // case "all": - // triggerableActions["entity.update.version"] = true - // triggerableActions["submission.create"] = true - // // TODO add more options here - case "entities": - triggerableActions["entity.update.version"] = true - case "submissions": - triggerableActions["submission.create"] = true - case "submissions,entities", "entities,submissions": - triggerableActions["entity.update.version"] = true - triggerableActions["submission.create"] = true - default: - return nil, fmt.Errorf("invalid trigger value: %s", trigger) - } + // Listen for termination signals (e.g., SIGINT/SIGTERM) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + <-c + log.Info("Received shutdown signal") + cancel() + }() + + <-stopCtx.Done() + log.Info("Application shutting down") - return triggerableActions, nil + return nil } func printStartupMsg() { @@ -141,33 +128,51 @@ func printStartupMsg() { func main() { ctx := context.Background() - log := getDefaultLogger(slog.LevelInfo) + + // Read environment variables + defaultDbUri := os.Getenv("ODK_WEBHOOK_DB_URI") + defaultEntityUrl := os.Getenv("ODK_WEBHOOK_ENTITY_URL") + defaultSubmissionUrl := os.Getenv("ODK_WEBHOOK_SUBMISSION_URL") + defaultLogLevel := os.Getenv("ODK_WEBHOOK_LOG_LEVEL") var dbUri string - flag.StringVar(&dbUri, "db", "", "DB host (postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable)") + flag.StringVar(&dbUri, "db", defaultDbUri, "DB host (postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable)") - var webhookUri string - flag.StringVar(&webhookUri, "webhook", "", "Webhook URL to call") + var entityUrl string + flag.StringVar(&entityUrl, "entityUrl", defaultEntityUrl, "Webhook URL for entity events") - var trigger string - flag.StringVar(&trigger, "trigger", "submissions,entities", "Trigger actions (submissions, entities, or 'submissions,entities' for both)") + var submissionUrl string + flag.StringVar(&submissionUrl, "submissionUrl", defaultSubmissionUrl, "Webhook URL for submission events") + + var debug bool + flag.BoolVar(&debug, "debug", false, "Enable debug logging") flag.Parse() - if dbUri == "" || webhookUri == "" { - fmt.Fprintf(os.Stderr, "missing required flags\n") + // Set logging level + var logLevel slog.Level + if debug { + logLevel = slog.LevelDebug + } else if strings.ToLower(defaultLogLevel) == "debug" { + logLevel = slog.LevelDebug + } else { + logLevel = slog.LevelInfo + } + log := getDefaultLogger(logLevel) + + if dbUri == "" { + fmt.Fprintf(os.Stderr, "DB URI is required\n") flag.PrintDefaults() os.Exit(1) - return } - - triggerableActions, err := parseTriggerFlag(trigger) - if err != nil { - fmt.Fprintf(os.Stderr, "error parsing trigger flag: %v\n", err) + + if entityUrl == "" && submissionUrl == "" { + fmt.Fprintf(os.Stderr, "At least one of entityUrl or submissionUrl is required\n") + flag.PrintDefaults() os.Exit(1) } - // get a connection pool + // Get a connection pool dbPool, err := db.InitPool(ctx, log, dbUri) if err != nil { fmt.Fprintf(os.Stderr, "could not connect to database: %v", err) @@ -175,7 +180,7 @@ func main() { } printStartupMsg() - err = SetupWebhook(log, ctx, dbPool, webhookUri, triggerableActions) + err = SetupWebhook(log, ctx, dbPool, entityUrl, submissionUrl) if err != nil { fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err) os.Exit(1) diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..021babf --- /dev/null +++ b/main_test.go @@ -0,0 +1,131 @@ +package main + +// // TODO FIXME +// import ( +// "context" +// "encoding/json" +// "log/slog" +// "os" +// "sync" +// "testing" +// "net/http" +// "net/http/httptest" +// "time" + +// "github.com/matryer/is" + +// "github.com/hotosm/odk-webhook/parser" +// "github.com/hotosm/odk-webhook/db" +// ) + +// func TestSetupWebhook(t *testing.T) { +// dbUri := os.Getenv("ODK_WEBHOOK_DB_URI") +// if len(dbUri) == 0 { +// // Default +// dbUri = "postgresql://odk:odk@db:5432/odkhook?sslmode=disable" +// } + +// is := is.New(t) +// log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) +// ctx := context.Background() +// ctx, cancel := context.WithCancel(ctx) +// wg := sync.WaitGroup{} +// dbPool, err := db.InitPool(ctx, log, dbUri) +// is.NoErr(err) + +// // Get connection and defer close +// conn, err := dbPool.Acquire(ctx) +// is.NoErr(err) +// defer conn.Release() + +// // Create entity_defs table +// _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) +// is.NoErr(err) +// entityTableCreateSql := ` +// CREATE TABLE entity_defs ( +// id int4, +// "entityId" int4, +// "createdAt" timestamptz, +// "current" bool, +// "data" jsonb, +// "creatorId" int4, +// "label" text +// ); +// ` +// _, err = conn.Exec(ctx, entityTableCreateSql) +// is.NoErr(err) + +// // Create audits_test table +// _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) +// is.NoErr(err) +// auditTableCreateSql := ` +// CREATE TABLE audits_test ( +// "actorId" int, +// action varchar, +// details jsonb +// ); +// ` +// _, err = conn.Exec(ctx, auditTableCreateSql) +// is.NoErr(err) + +// // Insert an entity record +// entityInsertSql := ` +// INSERT INTO public.entity_defs ( +// id, "entityId","createdAt","current","data","creatorId","label" +// ) VALUES ( +// 1001, +// 900, +// '2025-01-10 16:23:40.073', +// true, +// '{"status": "0", "task_id": "26", "version": "1"}', +// 5, +// 'Task 26 Feature 904487737' +// ); +// ` +// _, err = conn.Exec(ctx, entityInsertSql) +// is.NoErr(err) + +// // Mock webhook server +// webhookReceived := make(chan bool, 1) +// mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// defer r.Body.Close() + +// var payload parser.ProcessedEvent +// err := json.NewDecoder(r.Body).Decode(&payload) +// // This is where the actual payload is inspected +// is.NoErr(err) +// is.Equal("xxx", payload.ID) // Check if the ID matches +// is.Equal(`{"status": "0", "task_id": "26", "version": "1"}`, payload.Data) // Check the data + +// webhookReceived <- true +// w.WriteHeader(http.StatusOK) +// })) +// defer mockServer.Close() + +// // Run Webhook trigger in background +// go func() { +// err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL) +// is.NoErr(err) +// }() + +// // Insert an audit record (trigger event) +// auditInsertSql := ` +// INSERT INTO audits_test ("actorId", action, details) +// VALUES (1, 'entity.update.version', '{"entityDefId": 1001, "entityId": 1000, "entity": {"uuid": "xxx", "dataset": "test"}}'); +// ` +// _, err = conn.Exec(ctx, auditInsertSql) +// is.NoErr(err) + +// // Wait for the webhook to be received +// select { +// case <-webhookReceived: +// // Success +// case <-time.After(3 * time.Second): +// t.Fatalf("Test timed out waiting for webhook") +// } + +// // Cleanup +// conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) +// cancel() +// wg.Wait() +// } diff --git a/parser/audit.go b/parser/audit.go new file mode 100644 index 0000000..5e0e240 --- /dev/null +++ b/parser/audit.go @@ -0,0 +1,115 @@ +package parser + +import ( + "context" + "encoding/json" + "errors" + "log/slog" +) + +// Define the nested structs for the details field +type OdkEntityRef struct { + Uuid string `json:"uuid"` // Use string for UUID + Dataset string `json:"dataset"` +} + +type OdkEntityDetails struct { + Entity OdkEntityRef `json:"entity"` + EntityId int `json:"entityId"` + EntityDefId int `json:"entityDefId"` +} + +type OdkNewSubmissionDetails struct { + InstanceId string `json:"instanceId"` // Use string for UUID + SubmissionId int `json:"submissionId"` + SubmissionDefId int `json:"submissionDefId"` +} + +// OdkAuditLog represents the main structure for the audit log (returned by pg_notify) +type OdkAuditLog struct { + Notes *string `json:"notes"` // Pointer to handle null values + Action string `json:"action"` + ActeeID string `json:"acteeId"` // Use string for UUID + ActorID int `json:"actorId"` // Integer for the actor ID + Claimed *bool `json:"claimed"` // Pointer for nullable boolean + Details interface{} `json:"details"` // Use an interface to handle different detail types + Data interface{} `json:"data"` // Use an interface to handle different data types +} + +// ProcessedEvent represents the final parsed event structure (to send to the webhook API) +type ProcessedEvent struct { + ID string `json:"id"` // Entity UUID or Submission InstanceID + Data interface{} `json:"data"` // The actual entity data or wrapped submission XML +} + +// ParseJsonString converts the pg_notify string to OdkAuditLog +func ParseJsonString(log *slog.Logger, data []byte) (*OdkAuditLog, error) { + if len(data) == 0 { + return nil, errors.New("empty input data") + } + + var parsedData OdkAuditLog + if err := json.Unmarshal(data, &parsedData); err != nil { + log.Error("Failed to parse JSON data", "error", err, "data", string(data)) + return nil, err + } + log.Debug("Parsed notification data", "data", parsedData) + return &parsedData, nil +} + +// ParseEventJson parses the JSON data and extracts the relevant ID and data fields +func ParseEventJson(log *slog.Logger, ctx context.Context, data []byte) (*ProcessedEvent, error) { + // Convert the raw pg_notify string to an OdkAuditLog + rawLog, err := ParseJsonString(log, data) + if err != nil { + return nil, err + } + + // Prepare the result structure + var processedEvent ProcessedEvent + + // Parse the details field based on the action + switch rawLog.Action { + case "entity.update.version": + var entityDetails OdkEntityDetails + if err := parseDetails(rawLog.Details, &entityDetails); err != nil { + log.Error("Failed to parse entity.update.version details", "error", err) + return nil, err + } + processedEvent.ID = entityDetails.Entity.Uuid + processedEvent.Data = rawLog.Data + + case "submission.create": + var submissionDetails OdkNewSubmissionDetails + if err := parseDetails(rawLog.Details, &submissionDetails); err != nil { + log.Error("Failed to parse submission.create details", "error", err) + return nil, err + } + processedEvent.ID = submissionDetails.InstanceId + + // Parse the raw XML data + rawData, ok := rawLog.Data.(map[string]interface{}) + if !ok { + log.Error("Invalid data type for submission.create", "data", rawLog.Data) + return nil, errors.New("invalid data type for submission.create") + } + processedEvent.Data = rawData + + default: + // No nothing if the event type is not supported + log.Warn("Unsupported action type", "action", rawLog.Action) + return nil, nil + } + + log.Debug("Parsed event successfully", "processedEvent", processedEvent) + return &processedEvent, nil +} + +// parseDetails helps to unmarshal the details field into the appropriate structure +func parseDetails(details interface{}, target interface{}) error { + detailsBytes, err := json.Marshal(details) + if err != nil { + return err + } + return json.Unmarshal(detailsBytes, target) +} diff --git a/parser/audit_test.go b/parser/audit_test.go new file mode 100644 index 0000000..3c64cd3 --- /dev/null +++ b/parser/audit_test.go @@ -0,0 +1,89 @@ +package parser + +import ( + "context" + "io" + "log/slog" + "testing" + + "github.com/matryer/is" +) + +func TestParseJsonString(t *testing.T) { + is := is.New(t) + log := slog.New(slog.NewJSONHandler(io.Discard, nil)) + + t.Run("Valid JSON", func(t *testing.T) { + input := []byte(`{"id":"123","action":"entity.update.version","actorId":1,"details":{"entity":{"uuid":"abc","dataset":"test"}},"data":{}}`) + result, err := ParseJsonString(log, input) + is.NoErr(err) + is.Equal("123", result.ID) + is.Equal("entity.update.version", result.Action) + }) + + t.Run("Empty Input", func(t *testing.T) { + input := []byte("") + result, err := ParseJsonString(log, input) + is.Equal(result, nil) + is.True(err != nil) + is.Equal("empty input data", err.Error()) + }) + + t.Run("Invalid JSON", func(t *testing.T) { + input := []byte(`invalid`) + result, err := ParseJsonString(log, input) + is.Equal(result, nil) + is.True(err != nil) + }) +} + +func TestParseEventJson(t *testing.T) { + is := is.New(t) + log := slog.New(slog.NewJSONHandler(io.Discard, nil)) + ctx := context.Background() + + t.Run("Entity Update Version", func(t *testing.T) { + input := []byte(`{ + "id":"123", + "action":"entity.update.version", + "actorId":1, + "details":{"entity":{"uuid":"abc","dataset":"test"}}, + "data":{} + }`) + result, err := ParseEventJson(log, ctx, input) + is.NoErr(err) + is.Equal("abc", result.ID) + is.Equal(map[string]interface{}{}, result.Data) + }) + + t.Run("Submission Create", func(t *testing.T) { + input := []byte(`{ + "id":"456", + "action":"submission.create", + "actorId":2, + "details":{"instanceId":"sub-123","submissionId":789,"submissionDefId":101112}, + "data":{"xml":""} + }`) + result, err := ParseEventJson(log, ctx, input) + is.NoErr(err) + is.Equal("sub-123", result.ID) + + wrappedData, ok := result.Data.(map[string]interface{}) + is.True(ok) + is.Equal("", wrappedData["xml"]) + }) + + t.Run("Unsupported Action", func(t *testing.T) { + input := []byte(`{ + "id":"789", + "action":"unknown.action", + "actorId":3, + "details":{}, + "data":{} + }`) + result, err := ParseEventJson(log, ctx, input) + is.Equal(result, nil) + is.True(err != nil) + is.Equal("unsupported action type", err.Error()) + }) +} diff --git a/webhook/auth.go b/webhook/auth.go new file mode 100644 index 0000000..d770c2c --- /dev/null +++ b/webhook/auth.go @@ -0,0 +1 @@ +package webhook diff --git a/webhook/request.go b/webhook/request.go index 15ea56f..bbe0604 100644 --- a/webhook/request.go +++ b/webhook/request.go @@ -7,57 +7,35 @@ import ( "log/slog" "net/http" "time" -) - -type OdkAuditDetails struct { - UserAgent string `json:"userAgent"` - Failures int `json:"failures"` - LoggedAt string `json:"loggedAt"` - Processed string `json:"processed"` - LastFailure *string `json:"lastFailure"` // Pointer for optional/nullable values -} - -type OdkAuditLog struct { - ID int `json:"id"` - Notes *string `json:"notes"` // Pointer to handle null values - Action string `json:"action"` - ActeeID string `json:"acteeId"` // Use string for UUID - ActorID int `json:"actorId"` // Integer for the actor ID - Claimed *bool `json:"claimed"` // Pointer for nullable boolean - Details OdkAuditDetails `json:"details"` // Nested struct -} -func ParseEventJson(log *slog.Logger, ctx context.Context, data []byte) (*OdkAuditLog, error) { - var parsedData OdkAuditLog - if err := json.Unmarshal([]byte(data), &parsedData); err != nil { - log.Error("Failed to parse JSON data", "error", err, "data", data) - return nil, err - } - log.Debug("Parsed notification data", "data", parsedData) - return &parsedData, nil // Return a pointer to parsedData -} + "github.com/hotosm/odk-webhook/parser" +) // SendRequest parses the request content JSON from the PostgreSQL notification // and sends the JSON payload to an external API endpoint. -func SendRequest(log *slog.Logger, ctx context.Context, apiEndpoint string, parsedData OdkAuditLog) { - // Marshal the parsed data back to JSON for sending - payload, err := json.Marshal(parsedData) +func SendRequest( + log *slog.Logger, + ctx context.Context, + apiEndpoint string, + eventJson parser.ProcessedEvent, +) { + // Marshal the payload to JSON + marshaledPayload, err := json.Marshal(eventJson) if err != nil { - log.Error("Failed to marshal parsed data to JSON", "error", err) + log.Error("Failed to marshal payload to JSON", "error", err) return } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiEndpoint, bytes.NewBuffer(payload)) + // Create the HTTP request + req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiEndpoint, bytes.NewBuffer(marshaledPayload)) if err != nil { log.Error("Failed to create HTTP request", "error", err) return } - req.Header.Set("Content-Type", "application/json") - client := &http.Client{ - Timeout: 10 * time.Second, - } + // Send the request + client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { log.Error("Failed to send HTTP request", "error", err) @@ -65,9 +43,10 @@ func SendRequest(log *slog.Logger, ctx context.Context, apiEndpoint string, pars } defer resp.Body.Close() - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - log.Info("Webhook called successfully", "status", resp.Status) + // Check the response status + if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { + log.Info("Webhook called successfully", "status", resp.StatusCode) } else { - log.Error("Failed to call webhook", "status", resp.Status) + log.Error("Failed to call webhook", "status", resp.StatusCode) } } diff --git a/webhook/request_test.go b/webhook/request_test.go new file mode 100644 index 0000000..083c666 --- /dev/null +++ b/webhook/request_test.go @@ -0,0 +1,82 @@ +package webhook + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/matryer/is" + + "github.com/hotosm/odk-webhook/parser" +) + +func TestSendRequest(t *testing.T) { + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + + // Set up a mock server + var receivedPayload parser.OdkAuditLog + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify content type + is.Equal("application/json", r.Header.Get("Content-Type")) + + // Read and parse request body + body, err := io.ReadAll(r.Body) + is.NoErr(err) + defer r.Body.Close() + + err = json.Unmarshal(body, &receivedPayload) + is.NoErr(err) + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + // Define test cases + testCases := []struct { + name string + event parser.ProcessedEvent + expectedID string + expectedData interface{} + }{ + { + name: "Submission Create Event", + event: parser.ProcessedEvent{ + ID: "23dc865a-4757-431e-b182-67e7d5581c81", + Data: "XML Data", + }, + expectedID: "23dc865a-4757-431e-b182-67e7d5581c81", + expectedData: "XML Data", + }, + { + name: "Entity Update Event", + event: parser.ProcessedEvent{ + ID: "45fgh789-e32c-56d2-a765-427654321abc", + Data: "{\"field\":\"value\"}", + }, + expectedID: "45fgh789-e32c-56d2-a765-427654321abc", + expectedData: "{\"field\":\"value\"}", + }, + } + + // Execute test cases + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Call the SendRequest function + SendRequest(log, ctx, server.URL, tc.event) + + // Validate the received payload + is.Equal(tc.expectedID, receivedPayload.ID) + is.Equal(tc.expectedData, receivedPayload.Data) + }) + } +}