Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/alert/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,6 @@ pub trait AlertWorker {
.update_one(find_doc, update_doc)
.await?;
if update_result.matched_count == 0 {
warn!(
"Concurrent modification detected for object_id {}. Using DB-only update.",
object_id
);
return Err(AlertError::ConcurrentAuxUpdate(object_id.to_string()));
}
Ok(())
Expand Down
13 changes: 7 additions & 6 deletions src/alert/decam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use flare::Time;
use mongodb::bson::{doc, Document};
use serde::{Deserialize, Deserializer, Serialize};
use serde_with::{serde_as, skip_serializing_none};
use tracing::{instrument, warn};
use tracing::{debug, error, instrument, warn};

pub const STREAM_NAME: &str = "DECAM";
pub const DECAM_DEC_RANGE: (f64, f64) = (-90.0, 33.5);
Expand Down Expand Up @@ -279,10 +279,7 @@ impl DecamAlertWorker {
.await
}

#[instrument(
skip(self, prv_candidates, fp_hists, survey_matches, existing_alert_aux),
err
)]
#[instrument(skip(self, prv_candidates, fp_hists, survey_matches, existing_alert_aux))]
async fn update_aux_inner(
&mut self,
object_id: &str,
Expand Down Expand Up @@ -342,9 +339,13 @@ impl DecamAlertWorker {
.await
{
Ok(_) => Ok(()),
Err(_) => {
Err(e) => {
// if we get a concurrent modification error or an error preparing the lightcurves update,
// we fallback to a full in-DB update, safe against concurrency and "self-healing", but less efficient
match &e {
AlertError::ConcurrentAuxUpdate(_) => debug!(error = %e),
_ => error!(error = %e),
}
self.update_aux_fallback(object_id, prv_candidates, fp_hists, survey_matches, now)
.await
}
Expand Down
13 changes: 7 additions & 6 deletions src/alert/lsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use mongodb::bson::{doc, Document};
use serde::{Deserialize, Deserializer, Serialize};
use serde_with::{serde_as, skip_serializing_none};
use std::collections::HashMap;
use tracing::{instrument, warn};
use tracing::{debug, error, instrument, warn};
use utoipa::ToSchema;

pub const STREAM_NAME: &str = "LSST";
Expand Down Expand Up @@ -1022,10 +1022,7 @@ impl LsstAlertWorker {
.await
}

#[instrument(
skip(self, prv_candidates, fp_hists, survey_matches, existing_alert_aux),
err
)]
#[instrument(skip(self, prv_candidates, fp_hists, survey_matches, existing_alert_aux))]
async fn update_aux_inner(
&mut self,
object_id: &str,
Expand Down Expand Up @@ -1089,9 +1086,13 @@ impl LsstAlertWorker {
.await
{
Ok(_) => Ok(()),
Err(_) => {
Err(e) => {
// if we get a concurrent modification error or an error preparing the lightcurves update,
// we fallback to a full in-DB update, safe against concurrency and "self-healing", but less efficient
match &e {
AlertError::ConcurrentAuxUpdate(_) => debug!(error = %e),
_ => error!(error = %e),
}
self.update_aux_fallback(object_id, prv_candidates, fp_hists, survey_matches, now)
.await
}
Expand Down
27 changes: 14 additions & 13 deletions src/alert/ztf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use mongodb::bson::{doc, Document};
use serde::{Deserialize, Deserializer, Serialize};
use serde_with::{serde_as, skip_serializing_none};
use std::collections::HashMap;
use tracing::{instrument, warn};
use tracing::{debug, error, instrument, warn};
use utoipa::ToSchema;

pub const STREAM_NAME: &str = "ZTF";
Expand Down Expand Up @@ -791,17 +791,14 @@ impl ZtfAlertWorker {
.await
}

#[instrument(
skip(
self,
prv_candidates,
prv_nondetections,
fp_hists,
survey_matches,
existing_alert_aux
),
err
)]
#[instrument(skip(
self,
prv_candidates,
prv_nondetections,
fp_hists,
survey_matches,
existing_alert_aux
))]
async fn update_aux_inner(
&mut self,
object_id: &str,
Expand Down Expand Up @@ -875,9 +872,13 @@ impl ZtfAlertWorker {
.await
{
Ok(_) => Ok(()),
Err(_) => {
Err(e) => {
// if we get a concurrent modification error or an error preparing the lightcurves update,
// we fallback to a full in-DB update, safe against concurrency and "self-healing", but less efficient
match &e {
AlertError::ConcurrentAuxUpdate(_) => debug!(error = %e),
_ => error!(error = %e),
}
self.update_aux_fallback(
object_id,
prv_candidates,
Expand Down
Loading