[WIP] Add watchlist support for filters.#457
Conversation
There was a problem hiding this comment.
Pull request overview
Adds “watchlist” support across the API and filter workers so filters can be bound to watchlist_* catalogs, enforcing per-user access control and routing matched alerts to watchlist-named Kafka topics.
Changes:
- Introduces watchlist ACLs on
Userand enforces watchlist visibility/access across catalog + query endpoints. - Extends filters with optional
watchlistbinding, injects crossmatch gating in filter pipelines, and routes output per watchlist/topic. - Updates filter worker plumbing/tests to handle per-topic fan-out.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_ztf.rs | Adjusts assertions for new “alerts grouped by topic” filter output shape. |
| tests/test_lsst.rs | Adjusts assertions for new “alerts grouped by topic” filter output shape. |
| tests/test_filter.rs | Updates build_loaded_filter calls to include a default Kafka topic parameter. |
| src/utils/testing.rs | Updates test filter insertion to include watchlist: None. |
| src/filter/ztf.rs | Updates ZTF worker to load filters with a default topic and return alerts grouped by destination topic. |
| src/filter/lsst.rs | Updates LSST worker to load filters with a default topic and return alerts grouped by destination topic. |
| src/filter/mod.rs | Re-exports group_alerts_by_topic helper. |
| src/filter/base.rs | Adds Filter.watchlist, LoadedFilter.output_topic, watchlist $match injection at load time, per-topic grouping helper, and per-topic Kafka send loop. |
| src/bin/api.rs | Registers the new PATCH /users/{id}/watchlist_access route. |
| src/bin/add_filter.rs | Adds CLI support for binding a filter to a watchlist (basic prefix + existence validation). |
| src/api/routes/users.rs | Adds watchlist_access to User, implements catalog access helper, and introduces admin endpoint to set watchlist ACLs. |
| src/api/routes/filters.rs | Validates watchlist bindings on filter creation (prefix, existence/access, crossmatch config) and persists watchlist on filters. |
| src/api/routes/catalogs.rs | Enforces catalog visibility/access for listing/indexes/sample; hides unauthorized watchlists. |
| src/api/routes/queries/pipeline.rs | Enforces catalog access checks with the authenticated user. |
| src/api/routes/queries/find.rs | Enforces catalog access checks with the authenticated user. |
| src/api/routes/queries/count.rs | Enforces catalog access checks with the authenticated user. |
| src/api/routes/queries/cone_search.rs | Switches to catalog_accessible but currently uses unauthenticated access mode (None). |
| src/api/db.rs | Ensures admin user initialization preserves/sets watchlist_access. |
| src/api/catalogs.rs | Adds watchlist prefix constant and centralizes “visible vs accessible” catalog checks (including ACL gating). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Throughput results (
|
…pdate documentation
…nnotations.watchlist array, threaded through the ZTF/LSST filter workers.
|
Throughput results (
Baseline run: |
…e accessible to authorized users.
…isn't in config.crossmatch.<survey>, instead of silently inserting a filter that would never match.
|
Throughput results (
Baseline run: |
|
Throughput results (
Baseline run: |
1 similar comment
|
Throughput results (
Baseline run: |
A watchlist is a Mongo collection prefixed with
watchlist_. It acts as:Access is controlled via per-user ACLs.
How crossmatch works
When a new alert is ingested, it is crossmatched against every watchlist
configured under
crossmatch.<survey>. On a positional match (within theconfigured radius), the alert's
objectIdis$addToSet-ed onto thematched watchlist document, under
matching_<survey>_objects:So the watchlist itself accumulates the list of survey objects that fell
within its entries. This is the array a bound filter later
$lookupsagainst (see Behavior). Alerts that existed before a watchlist was added
are not matched by live ingest, backfill them with the
reprocess_crossmatchbinary (Setup step 4).
Access Control
watchlist_*catalogs → accessible only if:User.watchlist_accessUnauthorized users receive 404. (Watchlists are queryable via the
find/count/pipelineendpoints, but not viacone_search.)Setup
Create a Mongo collection:
watchlist_<name>Add it to
config.yamlundercrossmatch.<survey>, with aprojectionselecting the fields to surface on matched alertsRestart alert workers
Backfill existing alerts: run the
reprocess_crossmatchbinary with thewatchlist name(s) passed via
--catalogs(comma-separated; each must bedeclared under
crossmatch.<survey>). It loops over the watchlist entriesand
$addToSets theobject_idsof everyalerts_auxrecord within radiusonto the watchlist document (
matching_<survey>_objects) — idempotent, sosafe to re-run alongside live ingest:
Grant access:
PATCH /users/{user_id}/watchlist_accessUsing Watchlists in Filters
Filters can bind to a watchlist:
{ "watchlist": "watchlist_<name>" }Validation (at filter creation) ensures:
watchlist_prefixBehavior
When a filter binds a watchlist, the loader injects:
A
$lookupjoining the alertobjectIdagainst the watchlistdocument's
matching_<survey>_objectsarrayA
$matchkeeping only alerts with at least one matchA
$setsurfacing the matched watchlist docs (projected to theconfigured fields) under
annotations.watchlist:→ only watchlist-matched alerts pass, each carrying its watchlist data.
Kafka Routing
All filter output goes to the standard results topics
(
ZTF_alerts_results/LSST_alerts_results) regardless of watchlist.There is no dedicated per-watchlist topic; isolation is handled at the
data level (the
$match) and via ACL at filter creation.Summary
Watchlists provide:
$lookup/$match)annotations.watchlist)All with a single convention:
watchlist_*.