Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions src/api/routes/babamul/surveys/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,29 @@ pub struct EnrichedLsstAlert {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)]
struct AlertsQuery {
object_id: Option<String>,
#[schema(minimum = 0.0, maximum = 360.0)]
ra: Option<f64>,
#[schema(minimum = -90.0, maximum = 90.0)]
dec: Option<f64>,
#[schema(minimum = 0.0, maximum = 600.0)]
radius_arcsec: Option<f64>,
start_jd: Option<f64>,
end_jd: Option<f64>,
min_magpsf: Option<f64>,
max_magpsf: Option<f64>,
#[serde(alias = "min_reliability")]
#[schema(minimum = 0.0, maximum = 1.0)]
min_drb: Option<f64>,
#[serde(alias = "max_reliability")]
#[schema(minimum = 0.0, maximum = 1.0)]
max_drb: Option<f64>,
is_rock: Option<bool>,
is_star: Option<bool>,
is_near_brightstar: Option<bool>,
is_stationary: Option<bool>,
#[schema(minimum = 1, maximum = 100_000)]
limit: Option<u32>,
#[schema(minimum = 0)]
skip: Option<u64>,
}

Expand Down Expand Up @@ -105,8 +112,8 @@ pub async fn get_alerts(
};
let survey = path.into_inner();

let limit = query.limit.unwrap_or(100000);
if limit == 0 || limit > 100000 {
let limit = query.limit.unwrap_or(100_000);
if limit == 0 || limit > 100_000 {
return response::bad_request("Invalid limit, must be between 1 and 100000");
}
let skip = query.skip.unwrap_or(0);
Expand Down Expand Up @@ -312,19 +319,28 @@ pub async fn get_alerts(
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)]
struct AlertsConeSearchQuery {
coordinates: HashMap<String, [f64; 2]>,
#[schema(minimum = 0.0, maximum = 600.0)]
radius_arcsec: f64,
start_jd: Option<f64>,
end_jd: Option<f64>,
min_magpsf: Option<f64>,
max_magpsf: Option<f64>,
#[serde(alias = "min_reliability")]
#[schema(minimum = 0.0, maximum = 1.0)]
min_drb: Option<f64>,
#[serde(alias = "max_reliability")]
#[schema(minimum = 0.0, maximum = 1.0)]
max_drb: Option<f64>,
is_rock: Option<bool>,
is_star: Option<bool>,
is_near_brightstar: Option<bool>,
is_stationary: Option<bool>,
/// Maximum number of alerts to return per coordinate pair, default and maximum is 1,000
#[schema(minimum = 1, maximum = 1000)]
limit: u32,
/// Number of alerts to skip for pagination, default is 0
#[schema(minimum = 0)]
skip: Option<u64>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize, ToSchema)]
Expand Down Expand Up @@ -377,6 +393,17 @@ pub async fn cone_search_alerts(
}
let radius_radians = (radius_arcsec / 3600.0).to_radians();

let mut find_options = mongodb::options::FindOptions::default();
if query.limit == 0 || query.limit > 1000 {
return response::bad_request(
"Invalid limit, must be a positive integer less than or equal to 1000",
);
}
find_options.limit = Some(query.limit as i64);
if let Some(skip) = query.skip {
find_options.skip = Some(skip);
}

let mut base_filter_doc = if survey == Survey::Ztf {
doc! {"candidate.programid": 1} // Babamul only returns public ZTF alerts
} else {
Expand Down
13 changes: 8 additions & 5 deletions src/api/routes/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,9 @@ async fn build_test_filter_pipeline(
.into_iter()
.filter(|s| !s.is_empty())
.collect();
if candid_ids.len() > 100000 {
if candid_ids.len() > 100_000 {
return Err(FilterError::InvalidFilterPipeline(
"maximum of 100000 candids allowed for filter test".to_string(),
"maximum of 100,000 candids allowed for filter test".to_string(),
));
}
if !candid_ids.is_empty() {
Expand Down Expand Up @@ -629,10 +629,11 @@ pub struct FilterTestRequest {
pub end_jd: Option<f64>,
#[schema(max_items = 1000)]
pub object_ids: Option<Vec<String>>,
#[schema(max_items = 100000)]
#[schema(max_items = 100_000)]
pub candids: Option<Vec<String>>,
pub sort_by: Option<String>,
pub sort_order: Option<SortOrder>,
#[schema(minimum = 1, maximum = 100_000)]
pub limit: Option<u32>,
}

Expand Down Expand Up @@ -721,8 +722,10 @@ pub async fn post_filter_test(

// Add limit stage if specified, at the very end of the pipeline
if let Some(limit) = body.limit {
if limit == 0 {
return response::bad_request("limit must be greater than 0");
if limit <= 0 || limit > 100_000 {
return response::bad_request(
"Limit must be a positive integer less than or equal to 100,000",
);
}
let limit_stage = doc! { "$limit": limit as i64 };
test_pipeline.push(limit_stage);
Expand Down
18 changes: 15 additions & 3 deletions src/api/routes/queries/cone_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ struct ConeSearchQuery {
radius: f64,
unit: Unit,
object_coordinates: HashMap<String, [f64; 2]>, // Map of catalog name to coordinates [RA, Dec]
limit: Option<i64>,
limit: u32,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do something like this for the docs:

Suggested change
limit: u32,
#[utoipa(schema(minimum = 1, maximum = 100_000))]
limit: u32,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can also do something like #[validate(range(min = 1, max = 100_000))] and call body.validate() in the endpoint function to catch validation errors. Might be nicer to declare on the structs rather than putting the logic in the functions.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used utoipa for that before. I like setting a min max in the schema for documentation. But for validation, I had terrible results in another project. The user gets these standardized but opaque deserialization error, and they don't understand what went wrong in most cases. I definitely want to use #[utoipa(schema(...))] everywhere in the API (might deserve its own ticket but let me give a go at it now), but the validation isn't a good idea in my opinion, as elegant as it sounds like from the developer's perspective.

skip: Option<u64>,
sort: Option<serde_json::Value>,
max_time_ms: Option<u64>,
Expand All @@ -86,9 +86,13 @@ impl ConeSearchQuery {
}
}
}
if let Some(limit) = self.limit {
options.limit = Some(limit);
// assert that limit is a positive integer < 100_000
if self.limit == 0 || self.limit > 100_000 {
return Err(
"Limit must be a positive integer less than or equal to 100,000".to_string(),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not set a default limit as 100,000?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because when pagination uses defaults, it means the user may get up to default and think we got all the data, if they don't know their is pagination (and my experience showed me they never ever THINK about it).

It's too prone to error. This forces them to be aware of it, because they have to use it.

);
}
options.limit = Some(self.limit as i64);
if let Some(skip) = self.skip {
options.skip = Some(skip);
}
Expand All @@ -102,6 +106,8 @@ impl ConeSearchQuery {
}
if let Some(max_time_ms) = self.max_time_ms {
options.max_time = Some(std::time::Duration::from_millis(max_time_ms));
} else {
options.max_time = Some(std::time::Duration::from_secs(30)); // Default max time
}
Ok(options)
}
Expand Down Expand Up @@ -146,6 +152,12 @@ pub async fn post_cone_search_query(
Unit::Radians => {}
}
let object_coordinates = &body.object_coordinates;
if object_coordinates.is_empty() || object_coordinates.len() > 10_000 {
return response::bad_request(
"Invalid number of coordinate pairs, must be between 1 and 10,000",
);
}
Comment on lines +155 to +159

Copilot AI Feb 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the new caps (10k coordinate pairs and limit up to 100k), this endpoint can still attempt to materialize a very large response in memory (Vec<Document> per coordinate, collected before responding). Consider adding a tighter guardrail on the combined result size (e.g., object_coordinates.len() * limit), lowering the max limit for cone search, and/or changing the response strategy (streaming/pagination per object) to reduce OOM risk.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'm been thinking about this quite a bit... the conesearches in bulk are tricky! I think I'll open another PR for that one specifically, I still need to figure out what's an implementation that is safe for the server AND doesn't confuse the client


let mut docs: HashMap<String, Vec<mongodb::bson::Document>> = HashMap::new();
let filter = match parse_optional_filter(&body.filter) {
Ok(f) => f,
Expand Down
12 changes: 9 additions & 3 deletions src/api/routes/queries/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct FindQuery {
catalog_name: String,
filter: serde_json::Value,
projection: Option<serde_json::Value>,
limit: Option<i64>,
limit: u32,
skip: Option<u64>,
sort: Option<serde_json::Value>,
max_time_ms: Option<u64>,
Expand All @@ -33,9 +33,13 @@ impl FindQuery {
}
}
}
if let Some(limit) = self.limit {
options.limit = Some(limit);
// assert that limit is a positive integer < 100_000
if self.limit == 0 || self.limit > 100_000 {
return Err(
"Limit must be a positive integer less than or equal to 100,000".to_string(),
);
}
options.limit = Some(self.limit as i64);
if let Some(skip) = self.skip {
options.skip = Some(skip);
}
Expand All @@ -49,6 +53,8 @@ impl FindQuery {
}
if let Some(max_time_ms) = self.max_time_ms {
options.max_time = Some(std::time::Duration::from_millis(max_time_ms));
} else {
options.max_time = Some(std::time::Duration::from_secs(30)); // Default max time
}
Comment thread
Theodlz marked this conversation as resolved.
Ok(options)
}
Expand Down
25 changes: 23 additions & 2 deletions src/api/routes/queries/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ use utoipa::ToSchema;
struct PipelineQuery {
catalog_name: String,
pipeline: serde_json::Value,
limit: u32,
skip: Option<u64>,
max_time_ms: Option<u64>,
Comment thread
Theodlz marked this conversation as resolved.
}
Comment thread
Theodlz marked this conversation as resolved.
impl PipelineQuery {
/// Convert to MongoDB Find options
/// Convert to MongoDB Aggregation options
fn to_pipeline_options(&self) -> mongodb::options::AggregateOptions {
let mut options = mongodb::options::AggregateOptions::default();
if let Some(max_time_ms) = self.max_time_ms {
options.max_time = Some(std::time::Duration::from_millis(max_time_ms));
} else {
options.max_time = Some(std::time::Duration::from_secs(30)); // Default max time
}
options
}
Expand Down Expand Up @@ -52,8 +56,25 @@ pub async fn post_pipeline_query(
// Find documents with the provided filter
let pipeline = match parse_pipeline(&body.pipeline) {
Ok(pipeline) => pipeline,
Err(e) => return response::bad_request(&format!("Invalid filter: {}", e)),
Err(e) => return response::bad_request(&format!("Invalid pipeline: {}", e)),
};
let mut pipeline = pipeline;

// add a skip stage to the pipeline if skip is set (must come before $limit)
if let Some(skip) = body.skip {
let skip_stage = doc! { "$skip": skip as i64 };
pipeline.push(skip_stage);
}
// add a limit stage to the pipeline after validating that the limit is a positive integer < 100_000
if body.limit == 0 || body.limit > 100_000 {
return response::bad_request(
"Limit must be a positive integer less than or equal to 100,000",
);
}

let limit_stage = doc! { "$limit": body.limit };
pipeline.push(limit_stage);
Comment on lines +63 to +76

Copilot AI Feb 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appending $skip/$limit stages unconditionally can lead to incorrect pagination or confusing semantics when the user-supplied pipeline already contains $skip and/or $limit stages (e.g., client $limit at the end will be overridden by the new $limit, and a client $limit before the injected $skip changes behavior). Consider either rejecting pipelines containing these stages, or inserting/replacing them deterministically (e.g., remove existing $skip/$limit and append the enforced ones).

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worst case scenario, we apply a stricter limit and that's fine. Otherwise we have a looser limit and the one enforced by the user is stricter. I agree we could be more clever about this but I see no "problem" with the current implementation.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is true though, is we may want to start banning some stages that can easily cascade is super expensive queries: $lookups aren't great for instance.


let pipeline_options = body.to_pipeline_options();
let mut cursor = match collection
.aggregate(pipeline)
Expand Down
Loading
Loading