From ce2f0894bdcb3ed0b4f5bd887f52284d8e28d686 Mon Sep 17 00:00:00 2001 From: spwoodcock Date: Fri, 24 Jan 2025 17:09:57 +0000 Subject: [PATCH] feat: all working except context handling in main test --- db/trigger_test.go | 14 +-- main.go | 16 ++- main_test.go | 283 ++++++++++++++++++++++++--------------------- parser/audit.go | 19 +-- webhook/request.go | 10 +- 5 files changed, 188 insertions(+), 154 deletions(-) diff --git a/db/trigger_test.go b/db/trigger_test.go index f71de1f..64f5990 100644 --- a/db/trigger_test.go +++ b/db/trigger_test.go @@ -118,7 +118,7 @@ func TestEntityTrigger(t *testing.T) { <-sub.EstablishedC() msg := <-sub.NotificationC() // Get the notification - log.Info("Notification received", "raw", msg) + log.Info("notification received", "raw", msg) out <- string(msg) // Send it to the output channel close(out) @@ -130,7 +130,7 @@ func TestEntityTrigger(t *testing.T) { for msg := range out { err := json.Unmarshal([]byte(msg), ¬ification) is.NoErr(err) // Ensure the JSON payload is valid - log.Info("Parsed notification", "notification", notification) + log.Info("parsed notification", "notification", notification) } // Validate the JSON content @@ -262,7 +262,7 @@ func TestSubmissionTrigger(t *testing.T) { <-sub.EstablishedC() msg := <-sub.NotificationC() // Get the notification - log.Info("Notification received", "raw", msg) + log.Info("notification received", "raw", msg) out <- string(msg) // Send it to the output channel close(out) @@ -274,7 +274,7 @@ func TestSubmissionTrigger(t *testing.T) { for msg := range out { err := json.Unmarshal([]byte(msg), ¬ification) is.NoErr(err) // Ensure the JSON payload is valid - log.Info("Parsed notification", "notification", notification) + log.Info("parsed notification", "notification", notification) } // Validate the JSON content @@ -365,7 +365,7 @@ func TestNoTrigger(t *testing.T) { <-sub.EstablishedC() msg := <-sub.NotificationC() // Get the notification - log.Info("Notification received", "raw", msg) + log.Info("notification received", "raw", msg) out <- string(msg) // Send it to the output channel close(out) @@ -375,10 +375,10 @@ func TestNoTrigger(t *testing.T) { select { case msg := <-out: // If a message was received, we failed the test since no event should be fired - t.Fatalf("Unexpected message received: %s", msg) + t.Fatalf("pnexpected message received: %s", msg) case <-time.After(1 * time.Second): // No message should have been received within the timeout - log.Info("No event triggered for invalid event type") + log.Info("no event triggered for invalid event type") } // Cleanup diff --git a/main.go b/main.go index 70486b2..f83abae 100644 --- a/main.go +++ b/main.go @@ -77,13 +77,19 @@ func SetupWebhook( parsedData, err := parser.ParseEventJson(log, ctx, []byte(eventData)) if err != nil { - log.Error("Failed to parse notification", "error", err) + log.Error("failed to parse notification", "error", err) continue // Skip processing this notification } + // Only send the request for correctly parsed (supported) events if parsedData != nil { - // Only send the request for correctly parsed (supported) events - webhook.SendRequest(log, ctx, entityUrl, *parsedData) + if parsedData.Type == "entity.update.version" && entityUrl != "" { + webhook.SendRequest(log, ctx, entityUrl, *parsedData) + } else if parsedData.Type == "submission.create" && submissionUrl != "" { + webhook.SendRequest(log, ctx, submissionUrl, *parsedData) + } else { + log.Warn("unknown event type or no webhook URL provided", "eventType", parsedData.Type) + } } } } @@ -103,12 +109,12 @@ func SetupWebhook( c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) <-c - log.Info("Received shutdown signal") + log.Info("received shutdown signal") cancel() }() <-stopCtx.Done() - log.Info("Application shutting down") + log.Info("application shutting down") return nil } diff --git a/main_test.go b/main_test.go index fb46a9e..5feb943 100644 --- a/main_test.go +++ b/main_test.go @@ -1,131 +1,156 @@ package main -// // TODO FIXME -// import ( -// "context" -// "encoding/json" -// "log/slog" -// "os" -// "sync" -// "testing" -// "net/http" -// "net/http/httptest" -// "time" - -// "github.com/matryer/is" - -// "github.com/hotosm/central-webhook/parser" -// "github.com/hotosm/central-webhook/db" -// ) - -// func TestSetupWebhook(t *testing.T) { -// dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") -// if len(dbUri) == 0 { -// // Default -// dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable" -// } - -// is := is.New(t) -// log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) -// ctx := context.Background() -// ctx, cancel := context.WithCancel(ctx) -// wg := sync.WaitGroup{} -// dbPool, err := db.InitPool(ctx, log, dbUri) -// is.NoErr(err) - -// // Get connection and defer close -// conn, err := dbPool.Acquire(ctx) -// is.NoErr(err) -// defer conn.Release() - -// // Create entity_defs table -// _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) -// is.NoErr(err) -// entityTableCreateSql := ` -// CREATE TABLE entity_defs ( -// id int4, -// "entityId" int4, -// "createdAt" timestamptz, -// "current" bool, -// "data" jsonb, -// "creatorId" int4, -// "label" text -// ); -// ` -// _, err = conn.Exec(ctx, entityTableCreateSql) -// is.NoErr(err) - -// // Create audits_test table -// _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) -// is.NoErr(err) -// auditTableCreateSql := ` -// CREATE TABLE audits_test ( -// "actorId" int, -// action varchar, -// details jsonb -// ); -// ` -// _, err = conn.Exec(ctx, auditTableCreateSql) -// is.NoErr(err) - -// // Insert an entity record -// entityInsertSql := ` -// INSERT INTO public.entity_defs ( -// id, "entityId","createdAt","current","data","creatorId","label" -// ) VALUES ( -// 1001, -// 900, -// '2025-01-10 16:23:40.073', -// true, -// '{"status": "0", "task_id": "26", "version": "1"}', -// 5, -// 'Task 26 Feature 904487737' -// ); -// ` -// _, err = conn.Exec(ctx, entityInsertSql) -// is.NoErr(err) - -// // Mock webhook server -// webhookReceived := make(chan bool, 1) -// mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { -// defer r.Body.Close() - -// var payload parser.ProcessedEvent -// err := json.NewDecoder(r.Body).Decode(&payload) -// // This is where the actual payload is inspected -// is.NoErr(err) -// is.Equal("xxx", payload.ID) // Check if the ID matches -// is.Equal(`{"status": "0", "task_id": "26", "version": "1"}`, payload.Data) // Check the data - -// webhookReceived <- true -// w.WriteHeader(http.StatusOK) -// })) -// defer mockServer.Close() - -// // Run Webhook trigger in background -// go func() { -// err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL) -// is.NoErr(err) -// }() - -// // Insert an audit record (trigger event) -// auditInsertSql := ` -// INSERT INTO audits_test ("actorId", action, details) -// VALUES (1, 'entity.update.version', '{"entityDefId": 1001, "entityId": 1000, "entity": {"uuid": "xxx", "dataset": "test"}}'); -// ` -// _, err = conn.Exec(ctx, auditInsertSql) -// is.NoErr(err) - -// // Wait for the webhook to be received -// select { -// case <-webhookReceived: -// // Success -// case <-time.After(3 * time.Second): -// t.Fatalf("Test timed out waiting for webhook") -// } - -// // Cleanup -// conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`) -// cancel() -// wg.Wait() -// } +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "sync" + "testing" + "time" + + "github.com/matryer/is" + + "github.com/hotosm/central-webhook/db" + "github.com/hotosm/central-webhook/parser" +) + +func TestSetupWebhook(t *testing.T) { + dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable" + } + + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + dbPool, err := db.InitPool(ctx, log, dbUri) + is.NoErr(err) + defer dbPool.Close() + + conn, err := dbPool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Create test tables + conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) + conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`) + createTables := []string{ + `CREATE TABLE IF NOT EXISTS entity_defs ( + id SERIAL PRIMARY KEY, + "entityId" INT, + "createdAt" TIMESTAMPTZ, + "current" BOOL, + "data" JSONB, + "creatorId" INT, + "label" TEXT + );`, + `CREATE TABLE IF NOT EXISTS audits ( + "actorId" INT, + action VARCHAR, + details JSONB + );`, + } + for _, sql := range createTables { + _, err := conn.Exec(ctx, sql) + is.NoErr(err) + } + + // Insert an entity record + log.Info("inserting entity details record") + _, err = conn.Exec(ctx, ` + INSERT INTO public.entity_defs ( + id, "entityId","createdAt","current","data","creatorId","label" + ) VALUES ( + 1001, + 900, + '2025-01-10 16:23:40.073', + true, + '{"status": "0", "task_id": "26", "version": "1"}', + 5, + 'Task 26 Feature 904487737' + ); + `) + is.NoErr(err) + log.Info("entity record inserted") + + // Mock webhook server + webhookReceived := make(chan bool, 1) + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + var payload parser.ProcessedEvent + err := json.NewDecoder(r.Body).Decode(&payload) + is.NoErr(err) + + log.Info("payload received", "payload", payload) + is.Equal(payload.ID, "xxx") // Validate Entity ID + + // Convert the payload.Data to map[string]string for comparison + actualData, ok := payload.Data.(map[string]interface{}) + is.True(ok) // Ensure the type assertion succeeded + + expectedData := map[string]interface{}{ + "status": "0", + "task_id": "26", + "version": "1", + } + is.Equal(actualData, expectedData) // Validate Entity data + + webhookReceived <- true + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + })) + defer mockServer.Close() + + // Start webhook listener + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + log.Info("starting webhook listener") + err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL) + if err != nil && ctx.Err() == nil { + log.Error("webhook listener error", "error", err) + } + }() + + // Wait for the listener to initialize + log.Info("waiting for listener to initialize") + time.Sleep(300 * time.Millisecond) // Wait for the listener to be fully set up + + // Insert an audit log to trigger the webhook + log.Info("inserting audit log") + _, err = conn.Exec(ctx, ` + INSERT INTO audits ("actorId", action, details) + VALUES ( + 1, + 'entity.update.version', + '{"entityDefId": 1001, "entityId": 1000, "entity": {"uuid": "xxx", "dataset": "test"}}' + ); + `) + is.NoErr(err) + + // Wait for webhook response or timeout + select { + case <-webhookReceived: + log.Info("webhook received successfully") + case <-time.After(3 * time.Second): + t.Fatalf("test timed out waiting for webhook") + } + + // Allow some time for final webhook processing + time.Sleep(100 * time.Millisecond) + + // Cleanup + log.Info("cleaning up...") + cancel() + wg.Wait() + conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) + conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`) +} diff --git a/parser/audit.go b/parser/audit.go index 5e0e240..a974870 100644 --- a/parser/audit.go +++ b/parser/audit.go @@ -27,7 +27,7 @@ type OdkNewSubmissionDetails struct { // OdkAuditLog represents the main structure for the audit log (returned by pg_notify) type OdkAuditLog struct { - Notes *string `json:"notes"` // Pointer to handle null values + Notes *string `json:"notes"` // Pointer to handle null values Action string `json:"action"` ActeeID string `json:"acteeId"` // Use string for UUID ActorID int `json:"actorId"` // Integer for the actor ID @@ -38,6 +38,7 @@ type OdkAuditLog struct { // ProcessedEvent represents the final parsed event structure (to send to the webhook API) type ProcessedEvent struct { + Type string `json:"type"` // The event type, entity update or new submission ID string `json:"id"` // Entity UUID or Submission InstanceID Data interface{} `json:"data"` // The actual entity data or wrapped submission XML } @@ -50,10 +51,10 @@ func ParseJsonString(log *slog.Logger, data []byte) (*OdkAuditLog, error) { var parsedData OdkAuditLog if err := json.Unmarshal(data, &parsedData); err != nil { - log.Error("Failed to parse JSON data", "error", err, "data", string(data)) + log.Error("failed to parse JSON data", "error", err, "data", string(data)) return nil, err } - log.Debug("Parsed notification data", "data", parsedData) + log.Debug("parsed notification data", "data", parsedData) return &parsedData, nil } @@ -73,35 +74,37 @@ func ParseEventJson(log *slog.Logger, ctx context.Context, data []byte) (*Proces case "entity.update.version": var entityDetails OdkEntityDetails if err := parseDetails(rawLog.Details, &entityDetails); err != nil { - log.Error("Failed to parse entity.update.version details", "error", err) + log.Error("failed to parse entity.update.version details", "error", err) return nil, err } + processedEvent.Type = "entity.update.version" processedEvent.ID = entityDetails.Entity.Uuid processedEvent.Data = rawLog.Data case "submission.create": var submissionDetails OdkNewSubmissionDetails if err := parseDetails(rawLog.Details, &submissionDetails); err != nil { - log.Error("Failed to parse submission.create details", "error", err) + log.Error("failed to parse submission.create details", "error", err) return nil, err } + processedEvent.Type = "submission.create" processedEvent.ID = submissionDetails.InstanceId // Parse the raw XML data rawData, ok := rawLog.Data.(map[string]interface{}) if !ok { - log.Error("Invalid data type for submission.create", "data", rawLog.Data) + log.Error("invalid data type for submission.create", "data", rawLog.Data) return nil, errors.New("invalid data type for submission.create") } processedEvent.Data = rawData default: // No nothing if the event type is not supported - log.Warn("Unsupported action type", "action", rawLog.Action) + log.Warn("unsupported action type", "action", rawLog.Action) return nil, nil } - log.Debug("Parsed event successfully", "processedEvent", processedEvent) + log.Debug("parsed event successfully", "processedEvent", processedEvent) return &processedEvent, nil } diff --git a/webhook/request.go b/webhook/request.go index a90e933..2942c79 100644 --- a/webhook/request.go +++ b/webhook/request.go @@ -22,14 +22,14 @@ func SendRequest( // Marshal the payload to JSON marshaledPayload, err := json.Marshal(eventJson) if err != nil { - log.Error("Failed to marshal payload to JSON", "error", err) + log.Error("failed to marshal payload to JSON", "error", err) return } // Create the HTTP request req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiEndpoint, bytes.NewBuffer(marshaledPayload)) if err != nil { - log.Error("Failed to create HTTP request", "error", err) + log.Error("failed to create HTTP request", "error", err) return } req.Header.Set("Content-Type", "application/json") @@ -38,15 +38,15 @@ func SendRequest( client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { - log.Error("Failed to send HTTP request", "error", err) + log.Error("failed to send HTTP request", "error", err) return } defer resp.Body.Close() // Check the response status if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { - log.Info("Webhook called successfully", "status", resp.StatusCode) + log.Info("webhook called successfully", "status", resp.StatusCode, "endpoint", apiEndpoint) } else { - log.Error("Failed to call webhook", "status", resp.StatusCode) + log.Error("failed to call webhook", "status", resp.StatusCode, "endpoint", apiEndpoint) } }