From f6cb11719500c69d0637f6962ac51d94f8128950 Mon Sep 17 00:00:00 2001 From: spwoodcock Date: Fri, 24 Jan 2025 21:22:01 +0000 Subject: [PATCH] feat: three webhook events: entity update, submission review, new submission --- README.md | 18 +++--- db/trigger.go | 16 ++++- db/trigger_test.go | 135 +++++++++++++++++++++++++++++++++++++++- main.go | 34 +++++----- main_test.go | 10 +-- parser/audit.go | 36 ++++++++++- parser/audit_test.go | 13 +++- webhook/request_test.go | 27 ++++++-- 8 files changed, 253 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 947c541..8ef4924 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,9 @@ Then run with: ```bash ./centralwebhook \ -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ - -entityUrl 'https://your.domain.com/some/webhook' \ - -submissionUrl 'https://your.domain.com/some/webhook' + -updateEntityUrl 'https://your.domain.com/some/webhook' \ + -newSubmissionUrl 'https://your.domain.com/some/webhook' \ + -reviewSubmissionUrl 'https://your.domain.com/some/webhook' ``` > [!TIP] @@ -32,16 +33,18 @@ Then run with: ```bash docker run -d ghcr.io/hotosm/central-webhook:latest \ -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ - -entityUrl 'https://your.domain.com/some/webhook' \ - -submissionUrl 'https://your.domain.com/some/webhook' + -updateEntityUrl 'https://your.domain.com/some/webhook' \ + -newSubmissionUrl 'https://your.domain.com/some/webhook' \ + -reviewSubmissionUrl 'https://your.domain.com/some/webhook' ``` Environment variables are also supported: ```dotenv CENTRAL_WEBHOOK_DB_URI=postgresql://user:pass@localhost:5432/db_name?sslmode=disable -CENTRAL_WEBHOOK_ENTITY_URL=https://your.domain.com/some/webhook -CENTRAL_WEBHOOK_SUBMISSION_URL=https://your.domain.com/some/webhook +CENTRAL_WEBHOOK_UPDATE_ENTITY_URL=https://your.domain.com/some/webhook +CENTRAL_WEBHOOK_REVIEW_SUBMISSION_URL=https://your.domain.com/some/webhook +CENTRAL_WEBHOOK_NEW_SUBMISSION_URL=https://your.domain.com/some/webhook CENTRAL_WEBHOOK_API_KEY=ksdhfiushfiosehf98e3hrih39r8hy439rh389r3hy983y CENTRAL_WEBHOOK_LOG_LEVEL=DEBUG ``` @@ -79,6 +82,7 @@ err = SetupWebhook( dbPool, "https://your.domain.com/some/entity/webhook", "https://your.domain.com/some/submission/webhook", + "https://your.domain.com/some/review/webhook", ) if err != nil { fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err) @@ -107,6 +111,6 @@ Example: ```bash ./centralwebhook \ -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ - -entityUrl 'https://your.domain.com/some/webhook' \ + -updateEntityUrl 'https://your.domain.com/some/webhook' \ -apiKey 'ksdhfiushfiosehf98e3hrih39r8hy439rh389r3hy983y' ``` diff --git a/db/trigger.go b/db/trigger.go index cdf6b22..07998ec 100644 --- a/db/trigger.go +++ b/db/trigger.go @@ -45,7 +45,7 @@ func CreateTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string) FROM entity_defs WHERE entity_defs.id = (NEW.details->>'entityDefId')::int; - -- Merge the additional data into the original JSON + -- Merge the entity details into the JSON data key js := jsonb_set(js, '{data}', result_data, true); -- Notify the odk-events queue @@ -57,12 +57,24 @@ func CreateTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string) FROM submission_defs WHERE submission_defs.id = (NEW.details->>'submissionDefId')::int; - -- Merge the additional data into the original JSON + -- Merge the submission XML into the JSON data key js := jsonb_set(js, '{data}', result_data, true); -- Notify the odk-events queue PERFORM pg_notify('odk-events', js::text); + WHEN 'submission.update' THEN + SELECT jsonb_build_object('instanceId', submission_defs."instanceId") + INTO result_data + FROM submission_defs + WHERE submission_defs.id = (NEW.details->>'submissionDefId')::int; + + -- Merge the instanceId into the existing 'details' key in JSON + js := jsonb_set(js, '{details}', (js->'details') || result_data, true); + + -- Notify the odk-events queue + PERFORM pg_notify('odk-events', js::text); + ELSE -- Skip pg_notify for unsupported actions & insert as normal RETURN NEW; diff --git a/db/trigger_test.go b/db/trigger_test.go index 64f5990..d502ca2 100644 --- a/db/trigger_test.go +++ b/db/trigger_test.go @@ -159,7 +159,7 @@ func TestEntityTrigger(t *testing.T) { } // Test a new submission event type -func TestSubmissionTrigger(t *testing.T) { +func TestNewSubmissionTrigger(t *testing.T) { dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") if len(dbUri) == 0 { // Default @@ -288,7 +288,6 @@ func TestSubmissionTrigger(t *testing.T) { is.True(ok) // Ensure details is a valid map is.Equal(details["submissionDefId"], float64(1)) // Ensure submissionDefId has the correct value - // Check nested JSON value for status in data data, ok := notification["data"].(map[string]interface{}) is.True(ok) // Ensure data is a valid map is.Equal(data["xml"], ``) // Ensure `xml` has the correct value @@ -302,6 +301,138 @@ func TestSubmissionTrigger(t *testing.T) { wg.Wait() } +// Test a new submission event type +func TestReviewSubmissionTrigger(t *testing.T) { + dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + // Default + dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable" + } + + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + pool, err := InitPool(ctx, log, dbUri) + is.NoErr(err) + + // Get connection and defer close + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Create submission_defs table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs;`) + is.NoErr(err) + submissionTableCreateSql := ` + CREATE TABLE submission_defs ( + id int4, + "submissionId" int4, + "instanceId" uuid + ); + ` + _, err = conn.Exec(ctx, submissionTableCreateSql) + is.NoErr(err) + + // Create audits_test table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) + is.NoErr(err) + auditTableCreateSql := ` + CREATE TABLE audits_test ( + "actorId" int, + action varchar, + details jsonb + ); + ` + _, err = conn.Exec(ctx, auditTableCreateSql) + is.NoErr(err) + + // Insert an submission record + submissionInsertSql := ` + INSERT INTO submission_defs ( + id, + "submissionId", + "instanceId" + ) VALUES ( + 1, + 2, + '33448049-0df1-4426-9392-d3a294d638ad' + ); + ` + _, err = conn.Exec(ctx, submissionInsertSql) + is.NoErr(err) + + // Create audit trigger + err = CreateTrigger(ctx, pool, "audits_test") + is.NoErr(err) + + // Create listener + listener := NewListener(pool) + err = listener.Connect(ctx) + is.NoErr(err) + + // Create notifier + n := NewNotifier(log, listener) + wg.Add(1) + go func() { + n.Run(ctx) + wg.Done() + }() + sub := n.Listen("odk-events") + + // Insert an audit record + auditInsertSql := ` + INSERT INTO audits_test ("actorId", action, details) + VALUES (5, 'submission.update', '{"submissionDefId": 1, "reviewState": "approved"}'); + ` + _, err = conn.Exec(ctx, auditInsertSql) + is.NoErr(err) + + // Validate the notification content + wg.Add(1) + out := make(chan string) + go func() { + <-sub.EstablishedC() + msg := <-sub.NotificationC() // Get the notification + + log.Info("notification received", "raw", msg) + + out <- string(msg) // Send it to the output channel + close(out) + wg.Done() + }() + + // Process the notification + var notification map[string]interface{} + for msg := range out { + err := json.Unmarshal([]byte(msg), ¬ification) + is.NoErr(err) // Ensure the JSON payload is valid + log.Info("parsed notification", "notification", notification) + } + + // Validate the JSON content + is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct + is.Equal(notification["action"], "submission.update") // Ensure action is correct + is.True(notification["details"] != nil) // Ensure details key exists + is.True(notification["data"] == nil) // Data key should be null! + + // Check nested JSON value for submissionDefId and reviewState in details + details, ok := notification["details"].(map[string]interface{}) + is.True(ok) // Ensure details is a valid map + is.Equal(details["submissionDefId"], float64(1)) // Ensure submissionDefId has the correct value + is.Equal(details["instanceId"], "33448049-0df1-4426-9392-d3a294d638ad") // Ensure instanceId has the correct value + is.Equal(details["reviewState"], "approved") // Ensure reviewState has the correct value + + // Cleanup + conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs;`) + conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) + cancel() + sub.Unlisten(ctx) // uses background ctx anyway + listener.Close(ctx) + wg.Wait() +} + // Test an unsupported event type and ensure nothing is triggered func TestNoTrigger(t *testing.T) { dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") diff --git a/main.go b/main.go index f83abae..a8beafc 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,7 @@ func SetupWebhook( log *slog.Logger, ctx context.Context, dbPool *pgxpool.Pool, - entityUrl, submissionUrl string, + updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl string, ) error { // setup the listener listener := db.NewListener(dbPool) @@ -83,10 +83,12 @@ func SetupWebhook( // Only send the request for correctly parsed (supported) events if parsedData != nil { - if parsedData.Type == "entity.update.version" && entityUrl != "" { - webhook.SendRequest(log, ctx, entityUrl, *parsedData) - } else if parsedData.Type == "submission.create" && submissionUrl != "" { - webhook.SendRequest(log, ctx, submissionUrl, *parsedData) + if parsedData.Type == "entity.update.version" && updateEntityUrl != "" { + webhook.SendRequest(log, ctx, updateEntityUrl, *parsedData) + } else if parsedData.Type == "submission.create" && newSubmissionUrl != "" { + webhook.SendRequest(log, ctx, newSubmissionUrl, *parsedData) + } else if parsedData.Type == "submission.update" && reviewSubmissionUrl != "" { + webhook.SendRequest(log, ctx, reviewSubmissionUrl, *parsedData) } else { log.Warn("unknown event type or no webhook URL provided", "eventType", parsedData.Type) } @@ -137,18 +139,22 @@ func main() { // Read environment variables defaultDbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") - defaultEntityUrl := os.Getenv("CENTRAL_WEBHOOK_ENTITY_URL") - defaultSubmissionUrl := os.Getenv("CENTRAL_WEBHOOK_SUBMISSION_URL") + defaultUpdateEntityUrl := os.Getenv("CENTRAL_WEBHOOK_UPDATE_ENTITY_URL") + defaultNewSubmissionUrl := os.Getenv("CENTRAL_WEBHOOK_NEW_SUBMISSION_URL") + defaultReviewSubmissionUrl := os.Getenv("CENTRAL_WEBHOOK_REVIEW_SUBMISSION_URL") defaultLogLevel := os.Getenv("CENTRAL_WEBHOOK_LOG_LEVEL") var dbUri string flag.StringVar(&dbUri, "db", defaultDbUri, "DB host (postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable)") - var entityUrl string - flag.StringVar(&entityUrl, "entityUrl", defaultEntityUrl, "Webhook URL for entity events") + var updateEntityUrl string + flag.StringVar(&updateEntityUrl, "updateEntityUrl", defaultUpdateEntityUrl, "Webhook URL for update entity events") - var submissionUrl string - flag.StringVar(&submissionUrl, "submissionUrl", defaultSubmissionUrl, "Webhook URL for submission events") + var newSubmissionUrl string + flag.StringVar(&newSubmissionUrl, "newSubmissionUrl", defaultNewSubmissionUrl, "Webhook URL for new submission events") + + var reviewSubmissionUrl string + flag.StringVar(&reviewSubmissionUrl, "reviewSubmissionUrl", defaultReviewSubmissionUrl, "Webhook URL for review submission events") var debug bool flag.BoolVar(&debug, "debug", false, "Enable debug logging") @@ -172,8 +178,8 @@ func main() { os.Exit(1) } - if entityUrl == "" && submissionUrl == "" { - fmt.Fprintf(os.Stderr, "At least one of entityUrl or submissionUrl is required\n") + if updateEntityUrl == "" && newSubmissionUrl == "" && reviewSubmissionUrl == "" { + fmt.Fprintf(os.Stderr, "At least one of updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl is required\n") flag.PrintDefaults() os.Exit(1) } @@ -186,7 +192,7 @@ func main() { } printStartupMsg() - err = SetupWebhook(log, ctx, dbPool, entityUrl, submissionUrl) + err = SetupWebhook(log, ctx, dbPool, updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl) if err != nil { fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err) os.Exit(1) diff --git a/main_test.go b/main_test.go index 5feb943..650cba6 100644 --- a/main_test.go +++ b/main_test.go @@ -24,11 +24,12 @@ func TestSetupWebhook(t *testing.T) { } is := is.New(t) + wg := sync.WaitGroup{} log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug, })) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() dbPool, err := db.InitPool(ctx, log, dbUri) is.NoErr(err) @@ -109,12 +110,11 @@ func TestSetupWebhook(t *testing.T) { defer mockServer.Close() // Start webhook listener - wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() log.Info("starting webhook listener") - err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL) + err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL, mockServer.URL) if err != nil && ctx.Err() == nil { log.Error("webhook listener error", "error", err) } @@ -153,4 +153,6 @@ func TestSetupWebhook(t *testing.T) { wg.Wait() conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`) + conn.Release() + dbPool.Close() } diff --git a/parser/audit.go b/parser/audit.go index a974870..93ba99d 100644 --- a/parser/audit.go +++ b/parser/audit.go @@ -4,9 +4,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "log/slog" ) +// ** Entities ** // // Define the nested structs for the details field type OdkEntityRef struct { Uuid string `json:"uuid"` // Use string for UUID @@ -19,12 +21,33 @@ type OdkEntityDetails struct { EntityDefId int `json:"entityDefId"` } +// ** New Submissions ** // + type OdkNewSubmissionDetails struct { InstanceId string `json:"instanceId"` // Use string for UUID SubmissionId int `json:"submissionId"` SubmissionDefId int `json:"submissionDefId"` } +// ** Review Submissions ** // + +// Define the reviewState enum options +type ReviewState string + +const ( + ReviewStateApproved ReviewState = "approved" + ReviewStateHasIssues ReviewState = "hasIssues" + ReviewStateRejected ReviewState = "rejected" +) + +type OdkReviewSubmissionDetails struct { + InstanceId string `json:"instanceId"` // Use string for UUID + ReviewState ReviewState `json:"reviewState"` + SubmissionDefId int `json:"submissionDefId"` +} + +// ** High level wrapper structs ** // + // OdkAuditLog represents the main structure for the audit log (returned by pg_notify) type OdkAuditLog struct { Notes *string `json:"notes"` // Pointer to handle null values @@ -98,10 +121,21 @@ func ParseEventJson(log *slog.Logger, ctx context.Context, data []byte) (*Proces } processedEvent.Data = rawData + case "submission.update": + var submissionDetails OdkReviewSubmissionDetails + if err := parseDetails(rawLog.Details, &submissionDetails); err != nil { + log.Error("failed to parse submission.update details", "error", err) + return nil, err + } + processedEvent.Type = "submission.update" + processedEvent.ID = submissionDetails.InstanceId + // submission.update has no 'data' key, but instead only a reviewState + processedEvent.Data = submissionDetails.ReviewState + default: // No nothing if the event type is not supported log.Warn("unsupported action type", "action", rawLog.Action) - return nil, nil + return nil, fmt.Errorf("unsupported action type") } log.Debug("parsed event successfully", "processedEvent", processedEvent) diff --git a/parser/audit_test.go b/parser/audit_test.go index 3c64cd3..98afe82 100644 --- a/parser/audit_test.go +++ b/parser/audit_test.go @@ -17,8 +17,8 @@ func TestParseJsonString(t *testing.T) { input := []byte(`{"id":"123","action":"entity.update.version","actorId":1,"details":{"entity":{"uuid":"abc","dataset":"test"}},"data":{}}`) result, err := ParseJsonString(log, input) is.NoErr(err) - is.Equal("123", result.ID) is.Equal("entity.update.version", result.Action) + is.Equal(1, result.ActorID) }) t.Run("Empty Input", func(t *testing.T) { @@ -73,6 +73,17 @@ func TestParseEventJson(t *testing.T) { is.Equal("", wrappedData["xml"]) }) + t.Run("Submission Review", func(t *testing.T) { + input := []byte(`{ + "id":"456", + "action":"submission.update", + "details":{"reviewState":"approved","submissionId":789,"submissionDefId":101112} + }`) + result, err := ParseEventJson(log, ctx, input) + is.NoErr(err) + is.Equal(ReviewStateApproved, result.Data) + }) + t.Run("Unsupported Action", func(t *testing.T) { input := []byte(`{ "id":"789", diff --git a/webhook/request_test.go b/webhook/request_test.go index 48721ec..9427549 100644 --- a/webhook/request_test.go +++ b/webhook/request_test.go @@ -21,7 +21,7 @@ func TestSendRequest(t *testing.T) { log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) // Set up a mock server - var receivedPayload parser.OdkAuditLog + var receivedPayload parser.ProcessedEvent server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Verify content type is.Equal("application/json", r.Header.Get("Content-Type")) @@ -42,27 +42,43 @@ func TestSendRequest(t *testing.T) { testCases := []struct { name string event parser.ProcessedEvent - expectedID string + expectedId string + expectedType string expectedData interface{} }{ { name: "Submission Create Event", event: parser.ProcessedEvent{ ID: "23dc865a-4757-431e-b182-67e7d5581c81", + Type: "submission.create", Data: "XML Data", }, - expectedID: "23dc865a-4757-431e-b182-67e7d5581c81", + expectedId: "23dc865a-4757-431e-b182-67e7d5581c81", + expectedType: "submission.create", expectedData: "XML Data", }, { name: "Entity Update Event", event: parser.ProcessedEvent{ ID: "45fgh789-e32c-56d2-a765-427654321abc", + Type: "entity.update.version", Data: "{\"field\":\"value\"}", }, - expectedID: "45fgh789-e32c-56d2-a765-427654321abc", + expectedId: "45fgh789-e32c-56d2-a765-427654321abc", + expectedType: "entity.update.version", expectedData: "{\"field\":\"value\"}", }, + { + name: "Submission Review Event", + event: parser.ProcessedEvent{ + ID: "45fgh789-e32c-56d2-a765-427654321abc", + Type: "submission.update", + Data: "approved", + }, + expectedId: "45fgh789-e32c-56d2-a765-427654321abc", + expectedType: "submission.update", + expectedData: "approved", + }, } // Execute test cases @@ -75,7 +91,8 @@ func TestSendRequest(t *testing.T) { SendRequest(log, ctx, server.URL, tc.event) // Validate the received payload - is.Equal(tc.expectedID, receivedPayload.ID) + is.Equal(tc.expectedId, receivedPayload.ID) + is.Equal(tc.expectedType, receivedPayload.Type) is.Equal(tc.expectedData, receivedPayload.Data) }) }