Skip to content

Commit

Permalink
feat: all working except context handling in main test
Browse files Browse the repository at this point in the history
  • Loading branch information
spwoodcock committed Jan 24, 2025
1 parent d2f5fd0 commit ce2f089
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 154 deletions.
14 changes: 7 additions & 7 deletions db/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestEntityTrigger(t *testing.T) {
<-sub.EstablishedC()
msg := <-sub.NotificationC() // Get the notification

log.Info("Notification received", "raw", msg)
log.Info("notification received", "raw", msg)

out <- string(msg) // Send it to the output channel
close(out)
Expand All @@ -130,7 +130,7 @@ func TestEntityTrigger(t *testing.T) {
for msg := range out {
err := json.Unmarshal([]byte(msg), &notification)
is.NoErr(err) // Ensure the JSON payload is valid
log.Info("Parsed notification", "notification", notification)
log.Info("parsed notification", "notification", notification)
}

// Validate the JSON content
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestSubmissionTrigger(t *testing.T) {
<-sub.EstablishedC()
msg := <-sub.NotificationC() // Get the notification

log.Info("Notification received", "raw", msg)
log.Info("notification received", "raw", msg)

out <- string(msg) // Send it to the output channel
close(out)
Expand All @@ -274,7 +274,7 @@ func TestSubmissionTrigger(t *testing.T) {
for msg := range out {
err := json.Unmarshal([]byte(msg), &notification)
is.NoErr(err) // Ensure the JSON payload is valid
log.Info("Parsed notification", "notification", notification)
log.Info("parsed notification", "notification", notification)
}

// Validate the JSON content
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestNoTrigger(t *testing.T) {
<-sub.EstablishedC()
msg := <-sub.NotificationC() // Get the notification

log.Info("Notification received", "raw", msg)
log.Info("notification received", "raw", msg)

out <- string(msg) // Send it to the output channel
close(out)
Expand All @@ -375,10 +375,10 @@ func TestNoTrigger(t *testing.T) {
select {
case msg := <-out:
// If a message was received, we failed the test since no event should be fired
t.Fatalf("Unexpected message received: %s", msg)
t.Fatalf("pnexpected message received: %s", msg)
case <-time.After(1 * time.Second):
// No message should have been received within the timeout
log.Info("No event triggered for invalid event type")
log.Info("no event triggered for invalid event type")
}

// Cleanup
Expand Down
16 changes: 11 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,19 @@ func SetupWebhook(

parsedData, err := parser.ParseEventJson(log, ctx, []byte(eventData))
if err != nil {
log.Error("Failed to parse notification", "error", err)
log.Error("failed to parse notification", "error", err)
continue // Skip processing this notification
}

// Only send the request for correctly parsed (supported) events
if parsedData != nil {
// Only send the request for correctly parsed (supported) events
webhook.SendRequest(log, ctx, entityUrl, *parsedData)
if parsedData.Type == "entity.update.version" && entityUrl != "" {
webhook.SendRequest(log, ctx, entityUrl, *parsedData)
} else if parsedData.Type == "submission.create" && submissionUrl != "" {
webhook.SendRequest(log, ctx, submissionUrl, *parsedData)
} else {
log.Warn("unknown event type or no webhook URL provided", "eventType", parsedData.Type)
}
}
}
}
Expand All @@ -103,12 +109,12 @@ func SetupWebhook(
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
log.Info("Received shutdown signal")
log.Info("received shutdown signal")
cancel()
}()

<-stopCtx.Done()
log.Info("Application shutting down")
log.Info("application shutting down")

return nil
}
Expand Down
283 changes: 154 additions & 129 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -1,131 +1,156 @@
package main

// // TODO FIXME
// import (
// "context"
// "encoding/json"
// "log/slog"
// "os"
// "sync"
// "testing"
// "net/http"
// "net/http/httptest"
// "time"

// "github.com/matryer/is"

// "github.com/hotosm/central-webhook/parser"
// "github.com/hotosm/central-webhook/db"
// )

// func TestSetupWebhook(t *testing.T) {
// dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI")
// if len(dbUri) == 0 {
// // Default
// dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable"
// }

// is := is.New(t)
// log := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// ctx := context.Background()
// ctx, cancel := context.WithCancel(ctx)
// wg := sync.WaitGroup{}
// dbPool, err := db.InitPool(ctx, log, dbUri)
// is.NoErr(err)

// // Get connection and defer close
// conn, err := dbPool.Acquire(ctx)
// is.NoErr(err)
// defer conn.Release()

// // Create entity_defs table
// _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`)
// is.NoErr(err)
// entityTableCreateSql := `
// CREATE TABLE entity_defs (
// id int4,
// "entityId" int4,
// "createdAt" timestamptz,
// "current" bool,
// "data" jsonb,
// "creatorId" int4,
// "label" text
// );
// `
// _, err = conn.Exec(ctx, entityTableCreateSql)
// is.NoErr(err)

// // Create audits_test table
// _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`)
// is.NoErr(err)
// auditTableCreateSql := `
// CREATE TABLE audits_test (
// "actorId" int,
// action varchar,
// details jsonb
// );
// `
// _, err = conn.Exec(ctx, auditTableCreateSql)
// is.NoErr(err)

// // Insert an entity record
// entityInsertSql := `
// INSERT INTO public.entity_defs (
// id, "entityId","createdAt","current","data","creatorId","label"
// ) VALUES (
// 1001,
// 900,
// '2025-01-10 16:23:40.073',
// true,
// '{"status": "0", "task_id": "26", "version": "1"}',
// 5,
// 'Task 26 Feature 904487737'
// );
// `
// _, err = conn.Exec(ctx, entityInsertSql)
// is.NoErr(err)

// // Mock webhook server
// webhookReceived := make(chan bool, 1)
// mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// defer r.Body.Close()

// var payload parser.ProcessedEvent
// err := json.NewDecoder(r.Body).Decode(&payload)
// // This is where the actual payload is inspected
// is.NoErr(err)
// is.Equal("xxx", payload.ID) // Check if the ID matches
// is.Equal(`{"status": "0", "task_id": "26", "version": "1"}`, payload.Data) // Check the data

// webhookReceived <- true
// w.WriteHeader(http.StatusOK)
// }))
// defer mockServer.Close()

// // Run Webhook trigger in background
// go func() {
// err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL)
// is.NoErr(err)
// }()

// // Insert an audit record (trigger event)
// auditInsertSql := `
// INSERT INTO audits_test ("actorId", action, details)
// VALUES (1, 'entity.update.version', '{"entityDefId": 1001, "entityId": 1000, "entity": {"uuid": "xxx", "dataset": "test"}}');
// `
// _, err = conn.Exec(ctx, auditInsertSql)
// is.NoErr(err)

// // Wait for the webhook to be received
// select {
// case <-webhookReceived:
// // Success
// case <-time.After(3 * time.Second):
// t.Fatalf("Test timed out waiting for webhook")
// }

// // Cleanup
// conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`)
// cancel()
// wg.Wait()
// }
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"

"github.com/matryer/is"

"github.com/hotosm/central-webhook/db"
"github.com/hotosm/central-webhook/parser"
)

func TestSetupWebhook(t *testing.T) {
dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI")
if len(dbUri) == 0 {
dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable"
}

is := is.New(t)
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

dbPool, err := db.InitPool(ctx, log, dbUri)
is.NoErr(err)
defer dbPool.Close()

conn, err := dbPool.Acquire(ctx)
is.NoErr(err)
defer conn.Release()

// Create test tables
conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`)
conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`)
createTables := []string{
`CREATE TABLE IF NOT EXISTS entity_defs (
id SERIAL PRIMARY KEY,
"entityId" INT,
"createdAt" TIMESTAMPTZ,
"current" BOOL,
"data" JSONB,
"creatorId" INT,
"label" TEXT
);`,
`CREATE TABLE IF NOT EXISTS audits (
"actorId" INT,
action VARCHAR,
details JSONB
);`,
}
for _, sql := range createTables {
_, err := conn.Exec(ctx, sql)
is.NoErr(err)
}

// Insert an entity record
log.Info("inserting entity details record")
_, err = conn.Exec(ctx, `
INSERT INTO public.entity_defs (
id, "entityId","createdAt","current","data","creatorId","label"
) VALUES (
1001,
900,
'2025-01-10 16:23:40.073',
true,
'{"status": "0", "task_id": "26", "version": "1"}',
5,
'Task 26 Feature 904487737'
);
`)
is.NoErr(err)
log.Info("entity record inserted")

// Mock webhook server
webhookReceived := make(chan bool, 1)
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var payload parser.ProcessedEvent
err := json.NewDecoder(r.Body).Decode(&payload)
is.NoErr(err)

log.Info("payload received", "payload", payload)
is.Equal(payload.ID, "xxx") // Validate Entity ID

// Convert the payload.Data to map[string]string for comparison
actualData, ok := payload.Data.(map[string]interface{})
is.True(ok) // Ensure the type assertion succeeded

expectedData := map[string]interface{}{
"status": "0",
"task_id": "26",
"version": "1",
}
is.Equal(actualData, expectedData) // Validate Entity data

webhookReceived <- true
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}))
defer mockServer.Close()

// Start webhook listener
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
log.Info("starting webhook listener")
err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL)
if err != nil && ctx.Err() == nil {
log.Error("webhook listener error", "error", err)
}
}()

// Wait for the listener to initialize
log.Info("waiting for listener to initialize")
time.Sleep(300 * time.Millisecond) // Wait for the listener to be fully set up

// Insert an audit log to trigger the webhook
log.Info("inserting audit log")
_, err = conn.Exec(ctx, `
INSERT INTO audits ("actorId", action, details)
VALUES (
1,
'entity.update.version',
'{"entityDefId": 1001, "entityId": 1000, "entity": {"uuid": "xxx", "dataset": "test"}}'
);
`)
is.NoErr(err)

// Wait for webhook response or timeout
select {
case <-webhookReceived:
log.Info("webhook received successfully")
case <-time.After(3 * time.Second):
t.Fatalf("test timed out waiting for webhook")
}

// Allow some time for final webhook processing
time.Sleep(100 * time.Millisecond)

// Cleanup
log.Info("cleaning up...")
cancel()
wg.Wait()
conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`)
conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`)
}
Loading

0 comments on commit ce2f089

Please sign in to comment.