Skip to content

Commit

Permalink
feat: three webhook events: entity update, submission review, new sub…
Browse files Browse the repository at this point in the history
…mission
  • Loading branch information
spwoodcock committed Jan 24, 2025
1 parent f6427ea commit f6cb117
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 36 deletions.
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ Then run with:
```bash
./centralwebhook \
-db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \
-entityUrl 'https://your.domain.com/some/webhook' \
-submissionUrl 'https://your.domain.com/some/webhook'
-updateEntityUrl 'https://your.domain.com/some/webhook' \
-newSubmissionUrl 'https://your.domain.com/some/webhook' \
-reviewSubmissionUrl 'https://your.domain.com/some/webhook'
```

> [!TIP]
Expand All @@ -32,16 +33,18 @@ Then run with:
```bash
docker run -d ghcr.io/hotosm/central-webhook:latest \
-db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \
-entityUrl 'https://your.domain.com/some/webhook' \
-submissionUrl 'https://your.domain.com/some/webhook'
-updateEntityUrl 'https://your.domain.com/some/webhook' \
-newSubmissionUrl 'https://your.domain.com/some/webhook' \
-reviewSubmissionUrl 'https://your.domain.com/some/webhook'
```

Environment variables are also supported:

```dotenv
CENTRAL_WEBHOOK_DB_URI=postgresql://user:pass@localhost:5432/db_name?sslmode=disable
CENTRAL_WEBHOOK_ENTITY_URL=https://your.domain.com/some/webhook
CENTRAL_WEBHOOK_SUBMISSION_URL=https://your.domain.com/some/webhook
CENTRAL_WEBHOOK_UPDATE_ENTITY_URL=https://your.domain.com/some/webhook
CENTRAL_WEBHOOK_REVIEW_SUBMISSION_URL=https://your.domain.com/some/webhook
CENTRAL_WEBHOOK_NEW_SUBMISSION_URL=https://your.domain.com/some/webhook
CENTRAL_WEBHOOK_API_KEY=ksdhfiushfiosehf98e3hrih39r8hy439rh389r3hy983y
CENTRAL_WEBHOOK_LOG_LEVEL=DEBUG
```
Expand Down Expand Up @@ -79,6 +82,7 @@ err = SetupWebhook(
dbPool,
"https://your.domain.com/some/entity/webhook",
"https://your.domain.com/some/submission/webhook",
"https://your.domain.com/some/review/webhook",
)
if err != nil {
fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err)
Expand Down Expand Up @@ -107,6 +111,6 @@ Example:
```bash
./centralwebhook \
-db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \
-entityUrl 'https://your.domain.com/some/webhook' \
-updateEntityUrl 'https://your.domain.com/some/webhook' \
-apiKey 'ksdhfiushfiosehf98e3hrih39r8hy439rh389r3hy983y'
```
16 changes: 14 additions & 2 deletions db/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func CreateTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string)
FROM entity_defs
WHERE entity_defs.id = (NEW.details->>'entityDefId')::int;
-- Merge the additional data into the original JSON
-- Merge the entity details into the JSON data key
js := jsonb_set(js, '{data}', result_data, true);
-- Notify the odk-events queue
Expand All @@ -57,12 +57,24 @@ func CreateTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string)
FROM submission_defs
WHERE submission_defs.id = (NEW.details->>'submissionDefId')::int;
-- Merge the additional data into the original JSON
-- Merge the submission XML into the JSON data key
js := jsonb_set(js, '{data}', result_data, true);
-- Notify the odk-events queue
PERFORM pg_notify('odk-events', js::text);
WHEN 'submission.update' THEN
SELECT jsonb_build_object('instanceId', submission_defs."instanceId")
INTO result_data
FROM submission_defs
WHERE submission_defs.id = (NEW.details->>'submissionDefId')::int;
-- Merge the instanceId into the existing 'details' key in JSON
js := jsonb_set(js, '{details}', (js->'details') || result_data, true);
-- Notify the odk-events queue
PERFORM pg_notify('odk-events', js::text);
ELSE
-- Skip pg_notify for unsupported actions & insert as normal
RETURN NEW;
Expand Down
135 changes: 133 additions & 2 deletions db/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestEntityTrigger(t *testing.T) {
}

// Test a new submission event type
func TestSubmissionTrigger(t *testing.T) {
func TestNewSubmissionTrigger(t *testing.T) {
dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI")
if len(dbUri) == 0 {
// Default
Expand Down Expand Up @@ -288,7 +288,6 @@ func TestSubmissionTrigger(t *testing.T) {
is.True(ok) // Ensure details is a valid map
is.Equal(details["submissionDefId"], float64(1)) // Ensure submissionDefId has the correct value

// Check nested JSON value for status in data
data, ok := notification["data"].(map[string]interface{})
is.True(ok) // Ensure data is a valid map
is.Equal(data["xml"], `<data id="xxx">`) // Ensure `xml` has the correct value
Expand All @@ -302,6 +301,138 @@ func TestSubmissionTrigger(t *testing.T) {
wg.Wait()
}

// Test a new submission event type
func TestReviewSubmissionTrigger(t *testing.T) {
dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI")
if len(dbUri) == 0 {
// Default
dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable"
}

is := is.New(t)
log := slog.New(slog.NewJSONHandler(os.Stdout, nil))
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
pool, err := InitPool(ctx, log, dbUri)
is.NoErr(err)

// Get connection and defer close
conn, err := pool.Acquire(ctx)
is.NoErr(err)
defer conn.Release()

// Create submission_defs table
_, err = conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs;`)
is.NoErr(err)
submissionTableCreateSql := `
CREATE TABLE submission_defs (
id int4,
"submissionId" int4,
"instanceId" uuid
);
`
_, err = conn.Exec(ctx, submissionTableCreateSql)
is.NoErr(err)

// Create audits_test table
_, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`)
is.NoErr(err)
auditTableCreateSql := `
CREATE TABLE audits_test (
"actorId" int,
action varchar,
details jsonb
);
`
_, err = conn.Exec(ctx, auditTableCreateSql)
is.NoErr(err)

// Insert an submission record
submissionInsertSql := `
INSERT INTO submission_defs (
id,
"submissionId",
"instanceId"
) VALUES (
1,
2,
'33448049-0df1-4426-9392-d3a294d638ad'
);
`
_, err = conn.Exec(ctx, submissionInsertSql)
is.NoErr(err)

// Create audit trigger
err = CreateTrigger(ctx, pool, "audits_test")
is.NoErr(err)

// Create listener
listener := NewListener(pool)
err = listener.Connect(ctx)
is.NoErr(err)

// Create notifier
n := NewNotifier(log, listener)
wg.Add(1)
go func() {
n.Run(ctx)
wg.Done()
}()
sub := n.Listen("odk-events")

// Insert an audit record
auditInsertSql := `
INSERT INTO audits_test ("actorId", action, details)
VALUES (5, 'submission.update', '{"submissionDefId": 1, "reviewState": "approved"}');
`
_, err = conn.Exec(ctx, auditInsertSql)
is.NoErr(err)

// Validate the notification content
wg.Add(1)
out := make(chan string)
go func() {
<-sub.EstablishedC()
msg := <-sub.NotificationC() // Get the notification

log.Info("notification received", "raw", msg)

out <- string(msg) // Send it to the output channel
close(out)
wg.Done()
}()

// Process the notification
var notification map[string]interface{}
for msg := range out {
err := json.Unmarshal([]byte(msg), &notification)
is.NoErr(err) // Ensure the JSON payload is valid
log.Info("parsed notification", "notification", notification)
}

// Validate the JSON content
is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct
is.Equal(notification["action"], "submission.update") // Ensure action is correct
is.True(notification["details"] != nil) // Ensure details key exists
is.True(notification["data"] == nil) // Data key should be null!

// Check nested JSON value for submissionDefId and reviewState in details
details, ok := notification["details"].(map[string]interface{})
is.True(ok) // Ensure details is a valid map
is.Equal(details["submissionDefId"], float64(1)) // Ensure submissionDefId has the correct value
is.Equal(details["instanceId"], "33448049-0df1-4426-9392-d3a294d638ad") // Ensure instanceId has the correct value
is.Equal(details["reviewState"], "approved") // Ensure reviewState has the correct value

// Cleanup
conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs;`)
conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test;`)
cancel()
sub.Unlisten(ctx) // uses background ctx anyway
listener.Close(ctx)
wg.Wait()
}

// Test an unsupported event type and ensure nothing is triggered
func TestNoTrigger(t *testing.T) {
dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI")
Expand Down
34 changes: 20 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func SetupWebhook(
log *slog.Logger,
ctx context.Context,
dbPool *pgxpool.Pool,
entityUrl, submissionUrl string,
updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl string,
) error {
// setup the listener
listener := db.NewListener(dbPool)
Expand Down Expand Up @@ -83,10 +83,12 @@ func SetupWebhook(

// Only send the request for correctly parsed (supported) events
if parsedData != nil {
if parsedData.Type == "entity.update.version" && entityUrl != "" {
webhook.SendRequest(log, ctx, entityUrl, *parsedData)
} else if parsedData.Type == "submission.create" && submissionUrl != "" {
webhook.SendRequest(log, ctx, submissionUrl, *parsedData)
if parsedData.Type == "entity.update.version" && updateEntityUrl != "" {
webhook.SendRequest(log, ctx, updateEntityUrl, *parsedData)
} else if parsedData.Type == "submission.create" && newSubmissionUrl != "" {
webhook.SendRequest(log, ctx, newSubmissionUrl, *parsedData)
} else if parsedData.Type == "submission.update" && reviewSubmissionUrl != "" {
webhook.SendRequest(log, ctx, reviewSubmissionUrl, *parsedData)
} else {
log.Warn("unknown event type or no webhook URL provided", "eventType", parsedData.Type)
}
Expand Down Expand Up @@ -137,18 +139,22 @@ func main() {

// Read environment variables
defaultDbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI")
defaultEntityUrl := os.Getenv("CENTRAL_WEBHOOK_ENTITY_URL")
defaultSubmissionUrl := os.Getenv("CENTRAL_WEBHOOK_SUBMISSION_URL")
defaultUpdateEntityUrl := os.Getenv("CENTRAL_WEBHOOK_UPDATE_ENTITY_URL")
defaultNewSubmissionUrl := os.Getenv("CENTRAL_WEBHOOK_NEW_SUBMISSION_URL")
defaultReviewSubmissionUrl := os.Getenv("CENTRAL_WEBHOOK_REVIEW_SUBMISSION_URL")
defaultLogLevel := os.Getenv("CENTRAL_WEBHOOK_LOG_LEVEL")

var dbUri string
flag.StringVar(&dbUri, "db", defaultDbUri, "DB host (postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable)")

var entityUrl string
flag.StringVar(&entityUrl, "entityUrl", defaultEntityUrl, "Webhook URL for entity events")
var updateEntityUrl string
flag.StringVar(&updateEntityUrl, "updateEntityUrl", defaultUpdateEntityUrl, "Webhook URL for update entity events")

var submissionUrl string
flag.StringVar(&submissionUrl, "submissionUrl", defaultSubmissionUrl, "Webhook URL for submission events")
var newSubmissionUrl string
flag.StringVar(&newSubmissionUrl, "newSubmissionUrl", defaultNewSubmissionUrl, "Webhook URL for new submission events")

var reviewSubmissionUrl string
flag.StringVar(&reviewSubmissionUrl, "reviewSubmissionUrl", defaultReviewSubmissionUrl, "Webhook URL for review submission events")

var debug bool
flag.BoolVar(&debug, "debug", false, "Enable debug logging")
Expand All @@ -172,8 +178,8 @@ func main() {
os.Exit(1)
}

if entityUrl == "" && submissionUrl == "" {
fmt.Fprintf(os.Stderr, "At least one of entityUrl or submissionUrl is required\n")
if updateEntityUrl == "" && newSubmissionUrl == "" && reviewSubmissionUrl == "" {
fmt.Fprintf(os.Stderr, "At least one of updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl is required\n")
flag.PrintDefaults()
os.Exit(1)
}
Expand All @@ -186,7 +192,7 @@ func main() {
}

printStartupMsg()
err = SetupWebhook(log, ctx, dbPool, entityUrl, submissionUrl)
err = SetupWebhook(log, ctx, dbPool, updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl)
if err != nil {
fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err)
os.Exit(1)
Expand Down
10 changes: 6 additions & 4 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ func TestSetupWebhook(t *testing.T) {
}

is := is.New(t)
wg := sync.WaitGroup{}
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dbPool, err := db.InitPool(ctx, log, dbUri)
is.NoErr(err)
Expand Down Expand Up @@ -109,12 +110,11 @@ func TestSetupWebhook(t *testing.T) {
defer mockServer.Close()

// Start webhook listener
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
log.Info("starting webhook listener")
err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL)
err := SetupWebhook(log, ctx, dbPool, mockServer.URL, mockServer.URL, mockServer.URL)
if err != nil && ctx.Err() == nil {
log.Error("webhook listener error", "error", err)
}
Expand Down Expand Up @@ -153,4 +153,6 @@ func TestSetupWebhook(t *testing.T) {
wg.Wait()
conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`)
conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`)
conn.Release()
dbPool.Close()
}
Loading

0 comments on commit f6cb117

Please sign in to comment.