Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom date logic to extend effective range of date type #39

Merged
merged 16 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- SQL UDF `days_since_epoch` to parse a date representing a string to the number of days since `1970-01-01` [#39](https://github.com/ADBond/splinkclickhouse/pull/39)
- Custom Clickhouse `ColumnExpression` with additional transform `parse_date_to_int` to parse string to days since epoch [#39](https://github.com/ADBond/splinkclickhouse/pull/39)
- Custom date comparison and comparison levels working with integer type representing days since epoch [#39](https://github.com/ADBond/splinkclickhouse/pull/39)

## [0.3.1] - 2024-10-14

### Added
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,20 @@ This package is 'unofficial', in that it is not directly supported by the Splink
### Datetime parsing

Clickhouse offers several different date formats.
The basic `Date` format cannot handle dates before the epoch (1970-01-01), which makes it unsuitable for many use-cases for holding date-of-births.
The basic `Date` format cannot handle dates before the Unix epoch (1970-01-01), which makes it unsuitable for many use-cases for holding date-of-births.

The parsing function `parseDateTime` (and variants) which support providing custom formats return a `DateTime`, which also has the above limited range.
In `splinkclickhouse` we use the function `parseDateTime64BestEffortOrNull` so that we can use the extended-range `DateTime64` data type, which supports dates back to 1900-01-01, but does not allow custom date formats. Currently no `DateTime64` equivalent of `parseDateTime` exists.

If you require different behaviour (for instance if you have an unusual date format and know that you do not need dates outside of the `DateTime` range) you will either need to derive a new column in your source data, or construct the relevant SQL expression manually.

There is not currently a way in Clickhouse to deal directly with date values before 1900 - if you require such values you will have to manually process these to a different type, and construct the relevant SQL logic.
#### Extended Dates

There is not currently a way in Clickhouse to deal directly with date values before 1900. However, `splinkclickhouse` offers some tools to help with this.
It creates a SQL UDF (which can be opted-out of) `days_since_epoch`, to convert a date string (in `YYYY-MM-DD` format) into an integer, representing the number of days since `1970-01-01` to handle dates well outside the range of `DateTime64`, based on the proleptic Gregorian calendar.

This can be used with column expression extension `splinkclickhouse.column_expression.ColumnExpression` via the transform `.parse_date_to_int()`, or using custom versions of Splink library functions `cll.AbsoluteDateDifferenceLevel`, `cl.AbsoluteDateDifferenceAtThresholds`, and `cl.DateOfBirthComparison`.
These functions can be used with string columns (which will be wrapped in the above parsing function), or integer columns if the conversion via `days_since_epoch` is already done in the data-preparation stage.

### `NULL` values in `chdb`

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dev-dependencies = [
# probably not ideal having this doubled here,
# but saves having to pass --all_extras for 'uv run'
"chdb >= 2.0.1",
"pyarrow>=17.0.0",
]
package = true

Expand Down
17 changes: 17 additions & 0 deletions splinkclickhouse/chdb/database_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd
from splink.internals.database_api import DatabaseAPI

from ..custom_sql import days_since_epoch_sql
from ..dialect import ClickhouseDialect
from .dataframe import ChDBDataFrame

Expand All @@ -17,6 +18,7 @@ def __init__(
self,
con: chdb_dbapi.Connection,
schema: str = "splink",
register_custom_udfs: bool = True,
):
super().__init__()

Expand All @@ -25,6 +27,8 @@ def __init__(

self._create_splink_schema()
self._create_random_function()
if register_custom_udfs:
self._register_custom_udfs()

def _table_registration(self, input, table_name):
if isinstance(input, dict):
Expand Down Expand Up @@ -117,3 +121,16 @@ def _create_random_function(self) -> None:
cursor.execute(sql)
finally:
self._reset_cursor(cursor)

def _register_custom_udfs(self) -> None:
sql = f"""
CREATE FUNCTION IF NOT EXISTS
days_since_epoch AS
(date_string) -> {days_since_epoch_sql}
"""

cursor = self._get_cursor()
try:
cursor.execute(sql)
finally:
self._reset_cursor(cursor)
13 changes: 13 additions & 0 deletions splinkclickhouse/clickhouse/database_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from clickhouse_connect.driver.client import Client
from splink.internals.database_api import DatabaseAPI

from ..custom_sql import days_since_epoch_sql
from ..dialect import ClickhouseDialect
from .dataframe import ClickhouseDataFrame

Expand All @@ -16,12 +17,15 @@ class ClickhouseAPI(DatabaseAPI[None]):
def __init__(
self,
client: Client,
register_custom_udfs: bool = True,
):
super().__init__()

self.client = client
self.set_union_default_mode()
self._create_random_function()
if register_custom_udfs:
self._register_custom_udfs()

def _table_registration(self, input, table_name) -> None:
if isinstance(input, pd.DataFrame):
Expand Down Expand Up @@ -91,6 +95,15 @@ def set_union_default_mode(self) -> None:
def _create_random_function(self) -> None:
self.client.command("CREATE FUNCTION IF NOT EXISTS random AS () -> rand()")

def _register_custom_udfs(self) -> None:
self.client.command(
f"""
CREATE FUNCTION IF NOT EXISTS
days_since_epoch AS
(date_string) -> {days_since_epoch_sql}
"""
)

def _create_table_from_pandas_frame(self, df: pd.DataFrame, table_name: str) -> str:
sql = f"CREATE OR REPLACE TABLE {table_name} ("

Expand Down
40 changes: 40 additions & 0 deletions splinkclickhouse/column_expression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from copy import copy

from splink.internals.column_expression import (
ColumnExpression as SplinkColumnExpression,
)
from splink.internals.dialects import SplinkDialect

from .dialect import ClickhouseDialect


class ColumnExpression(SplinkColumnExpression):
def __init__(self, sql_expression: str):
super().__init__(sql_expression=sql_expression, sql_dialect=ClickhouseDialect())

def _parse_date_to_int_dialected(
self, name: str, sql_dialect: SplinkDialect
) -> str:
# need to have sql_dialect passed, even if it does nothing
# parent requires this to be in function signature

return f"days_since_epoch({name})"

def parse_date_to_int(self) -> "ColumnExpression":
"""
Parses date string to an integer, representing days since
the Unix epoch (1970-01-01).
"""
clone = self._clone()
clone.operations.append(clone._parse_date_to_int_dialected)
return clone

@staticmethod
def from_base_expression(
column_expression: SplinkColumnExpression,
) -> "ColumnExpression":
new_expression = ColumnExpression(
sql_expression=column_expression.raw_sql_expression
)
new_expression.operations = copy(column_expression.operations)
return new_expression
69 changes: 68 additions & 1 deletion splinkclickhouse/comparison_level_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@

from splink import ColumnExpression
from splink.internals.comparison_level_creator import ComparisonLevelCreator

from splink.internals.comparison_level_library import (
AbsoluteTimeDifferenceLevel as SplinkAbsoluteTimeDifferenceLevel,
)
from splink.internals.comparison_level_library import (
DateMetricType,
)

from .column_expression import ColumnExpression as CHColumnExpression
from .dialect import ClickhouseDialect, SplinkDialect


Expand Down Expand Up @@ -97,3 +104,63 @@ def create_sql(self, sql_dialect: SplinkDialect) -> str:

def create_label_for_charts(self) -> str:
return f"Distance less than {self.km_threshold}km"


class AbsoluteDateDifferenceLevel(SplinkAbsoluteTimeDifferenceLevel):
def __init__(
self,
col_name: str | ColumnExpression,
*,
input_is_string: bool,
threshold: float,
metric: DateMetricType,
):
"""
Computes the absolute time difference between two dates (total duration).
For more details see Splink docs.

In database this represents data as an integer counting number of days since
1970-01-01 (Unix epoch).
The input data can be either a string in YYYY-MM-DD format, or an
integer of the number days since the epoch.

Args:
col_name (str): The name of the column to compare.
input_is_string (bool): If True, the input dates are treated as strings
and parsed to integers, and must be in ISO 8601 format.
threshold (int): The maximum allowed difference between the two dates,
in units specified by `date_metric`.
metric (str): The unit of time to use when comparing the dates.
Can be 'second', 'minute', 'hour', 'day', 'month', or 'year'.
"""
super().__init__(
col_name,
input_is_string=input_is_string,
threshold=threshold,
metric=metric,
)
# need this to help mypy:
self.col_expression: ColumnExpression

@property
def datetime_parsed_column_expression(self) -> CHColumnExpression:
# convert existing ColumnExpression to our version,
# and then apply parsing operation
return CHColumnExpression.from_base_expression(
self.col_expression
).parse_date_to_int()

def create_sql(self, sql_dialect: SplinkDialect) -> str:
self.col_expression.sql_dialect = sql_dialect
if self.input_is_string:
self.col_expression = self.datetime_parsed_column_expression

col = self.col_expression

# work in seconds as that's what parent uses, and we want to keep that machinery
seconds_in_day = 86_400
sql = (
f"abs({col.name_l} - {col.name_r}) * {seconds_in_day} "
f"<= {self.time_threshold_seconds}"
)
return sql
Loading