Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Asteroid] Fink-FAT 1.0 extension #327

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion fink_science/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "5.9.0"
__version__ = "5.10.0"
172 changes: 149 additions & 23 deletions fink_science/asteroids/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,70 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import pandas_udf

from fink_science import __file__
import os

import pandas as pd
import numpy as np

from fink_science.tester import spark_unit_tests
from pyspark.sql.types import (
IntegerType,
ArrayType,
FloatType,
StructType,
StructField,
StringType,
)

@pandas_udf(IntegerType(), PandasUDFType.SCALAR)
def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1):
""" Determine if an alert is a potential Solar System object (SSO) using two criteria:
from fink_science.tester import spark_unit_tests
from fink_fat.streaming_associations.fink_fat_associations import fink_fat_association


roid_schema = StructType(
[
StructField(
"roid",
IntegerType(),
True,
),
StructField(
"ffdistnr",
ArrayType(FloatType()),
True,
),
StructField(
"estimator_id",
ArrayType(StringType()),
True,
),
]
)


@pandas_udf(roid_schema)
def roid_catcher(
ra,
dec,
jd,
magpsf,
candid,
cjd,
cmagpsf,
fid,
ndethist,
sgscore1,
ssdistnr,
distpsnr1,
error_radius,
mag_criterion_same_fid,
mag_criterion_diff_fid,
orbit_tw,
orbit_error,
confirmed_sso,
):
"""Determine if an alert is a potential Solar System object (SSO) using two criteria:

1. The alert has been flagged as an SSO by ZTF (MPC) within 5"
2. The alert satisfies Fink criteria for a SSO
Expand All @@ -35,18 +85,31 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1):
4. If 2 detections, observations must be done within 30 min.

The alerts are labeled using:

[3] if the alert has been flagged by ZTF as SSO candidate
[2] if the alert has been flagged by Fink as SSO candidate
[1] if is the first time ZTF sees this object
[0] if it is likely not a SSO
* [5] if the alert has been associated with a candidate trajectory using an orbit estimator
* [4] if the alert has been associated with a candidate trajectory using a polyfit estimator
* [3] if the alert has been flagged by ZTF as SSO candidate
* [2] if the alert has been flagged by Fink as SSO candidate
* [1] if is the first time ZTF sees this object
* [0] if it is likely not a SSO

Parameters
----------
ra: Spark DataFrame Column
right ascension
dec: Spark DataFrame Column
declination
jd: Spark DataFrame Column
Observation Julian date at start of exposure [days]
magpsf: Spark DataFrame Column
Magnitude from PSF-fit photometry [mag]
candid: Spark DataFrame Column
alert identifier
cjd : Spark DataFrame Column
julian date history of the alerts
cmagpsf : Spark DataFrame Column
magnitude history of the alerts
fid: Spark DataFrame Column
filter identifier (for ZTF, 1 = g band and 2 = r band)
ndethist: Spark DataFrame Column
Number of spatially-coincident detections falling within 1.5 arcsec
going back to beginning of survey; only detections that fell on the
Expand All @@ -63,19 +126,46 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1):
distpsnr1: Spark DataFrame Column
Distance of closest source from PS1 catalog;
if exists within 30 arcsec [arcsec]
error_radius: Spark DataFrame Column
error radius used to associates the alerts with a candidate trajectory
mag_criterion_same_fid: Spark DataFrame Column
keep the association where the difference of magnitude between two measurements of the
same filter are below this threshold.
mag_criterion_diff_fid: Spark DataFrame Column
keep the association where the difference of magnitude
between two measurements of differents filter are below this threshold.
orbit_tw : int
time window used to filter the orbit
orbit_error: float
error radius to associates the alerts with the orbits
confirmed_sso: Spark DataFrame Column
if true, associates alerts with a flag equals to 3,
choose alerts with a flag equals to 1 or 2 otherwise.

Returns
----------
out: integer
roid: integer
5 if the alert has been associated with a candidate trajectory using an orbit estimator
4 if the alert has been associated with a candidate trajectory using a polyfit estimator
3 if the alert has been flagged by ZTF as SSO
2 if the alert has been flagged by Fink as SSO
1 if it is the first time ZTF sees this object
0 if it is likely not a SSO
ffdistnr : float list
distance from the trajectory prediction
- in arcmin if flag == 4
- in arcsecond if flag == 5
estimator_id: string list
The fink_fat trajectory identifier associated with the alerts (only if roid is 4 or 5)
- Is a integer if associated with a trajectory candidate
- is a string if associated with an orbit

Examples
----------
>>> from fink_utils.spark.utils import concat_col
>>> from pyspark.sql import functions as F
>>> from fink_science.tester import add_roid_datatest
>>> add_roid_datatest(spark, True)

>>> df = spark.read.load(ztf_alert_sample)

Expand All @@ -92,24 +182,33 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1):

# Perform the fit + classification (default model)
>>> args = [
... 'candidate.ra', 'candidate.dec',
... 'candidate.jd', 'candidate.magpsf',
... 'candidate.candid',
... 'cjd', 'cmagpsf',
... 'candidate.fid',
... 'candidate.ndethist', 'candidate.sgscore1',
... 'candidate.ssdistnr', 'candidate.distpsnr1']
... 'candidate.ssdistnr', 'candidate.distpsnr1',
... F.lit(2), F.lit(2), F.lit(30), F.lit(15.0), F.lit(True)
... ]
>>> df = df.withColumn('roid', roid_catcher(*args))

# Drop temp columns
>>> df = df.drop(*what_prefix)

>>> df.filter(df['roid'] == 2).count()
3

>>> df.filter(df['roid'] == 3).count()
>>> df.filter(df['roid.roid'] == 2).count()
175
>>> df.filter(df['roid.roid'] == 3).count()
6694
>>> df.filter(df['roid.roid'] == 4).count()
2
>>> df.filter(df['roid.roid'] == 5).count()
3
"""
flags = np.zeros_like(ndethist.values, dtype=int)

# remove NaN
nalerthist = magpsf.apply(lambda x: np.sum(np.array(x) == np.array(x)))
nalerthist = cmagpsf.apply(lambda x: np.sum(np.array(x) == np.array(x)))

# first detection
f0 = ndethist == 1
Expand All @@ -128,7 +227,7 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1):

# Remove long trend (within the observation)
f3 = nalerthist == 2
f4 = jd[f3].apply(lambda x: np.diff(x)[-1]) > (30. / (24. * 60.))
f4 = cjd[f3].apply(lambda x: np.diff(x)[-1]) > (30.0 / (24.0 * 60.0))
flags[f3 & f4] = 0

# Remove very long trend (outside the current observation)
Expand All @@ -148,18 +247,45 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1):
f_ndethist = ndethist <= 5
f_nalerthist = nalerthist <= 5

mask_roid = f_distance1 & f_distance2 & f_relative_distance & f_ndethist & f_nalerthist
mask_roid = (
f_distance1 & f_distance2 & f_relative_distance & f_ndethist & f_nalerthist
)
flags[mask_roid] = 3

return pd.Series(flags)
# fink_fat associations
flags, estimator_id, ffdistnr = fink_fat_association(
ra,
dec,
magpsf,
fid,
jd,
candid,
flags,
confirmed_sso,
error_radius,
mag_criterion_same_fid,
mag_criterion_diff_fid,
orbit_tw,
orbit_error,
)

return pd.DataFrame(
{
"roid": flags,
"ffdistnr": ffdistnr,
"estimator_id": estimator_id,
}
)


if __name__ == "__main__":
""" Execute the test suite """
"""Execute the test suite"""

globs = globals()
path = os.path.dirname(__file__)
ztf_alert_sample = 'file://{}/data/alerts/datatest'.format(path)
ztf_alert_sample = "file://{}/data/alerts/roid_datatest/alerts_sample_roid".format(
path
)
globs["ztf_alert_sample"] = ztf_alert_sample

# Run the test suite
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added fink_science/data/alerts/roid_datatest/kalman.pkl
Binary file not shown.
Binary file not shown.
26 changes: 14 additions & 12 deletions fink_science/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import doctest
import numpy as np


def regular_unit_tests(global_args: dict = None, verbose: bool = False):
""" Base commands for the regular unit test suite
"""Base commands for the regular unit test suite

Include this routine in the main of a module, and execute:
python3 mymodule.py
Expand Down Expand Up @@ -51,8 +52,9 @@ def regular_unit_tests(global_args: dict = None, verbose: bool = False):

sys.exit(doctest.testmod(globs=global_args, verbose=verbose)[0])


def spark_unit_tests(global_args: dict = None, verbose: bool = False):
""" Base commands for the Spark unit test suite
"""Base commands for the Spark unit test suite

Include this routine in the main of a module, and execute:
python3 mymodule.py
Expand Down Expand Up @@ -80,14 +82,14 @@ def spark_unit_tests(global_args: dict = None, verbose: bool = False):
spark = SparkSession.builder.getOrCreate()

conf = SparkConf()
confdic = {
"spark.python.daemon.module": "coverage_daemon"
}
confdic = {"spark.python.daemon.module": "coverage_daemon"}

if spark.version.startswith('2'):
if spark.version.startswith("2"):
confdic.update(
{
"spark.jars.packages": 'org.apache.spark:spark-avro_2.11:{}'.format(spark.version)
"spark.jars.packages": "org.apache.spark:spark-avro_2.11:{}".format(
spark.version
)
}
)
elif spark.version.startswith('3'):
Expand All @@ -97,15 +99,15 @@ def spark_unit_tests(global_args: dict = None, verbose: bool = False):
"spark.jars.packages": 'org.apache.spark:spark-avro_2.12:{},{}'.format(spark.version, py4j_mod)
}
)
conf.setMaster("local[2]")
conf.setMaster("local[1]")
conf.setAppName("fink_science_test")
for k, v in confdic.items():
conf.set(key=k, value=v)
spark = SparkSession\
.builder\
.appName("fink_science_test")\
.config(conf=conf)\
spark = (
SparkSession.builder.appName("fink_science_test")
.config(conf=conf)
.getOrCreate()
)

global_args["spark"] = spark

Expand Down
Loading