From 4951e7470643f6da36b15a79b8b01890668995a9 Mon Sep 17 00:00:00 2001 From: Luis Date: Wed, 6 Oct 2021 10:23:19 -0400 Subject: [PATCH 01/31] Refactor 'decimal' type to 'float' --- optimus/engines/base/basedataframe.py | 2 +- optimus/engines/base/columns.py | 2 +- optimus/engines/base/constants.py | 20 +++++++++---------- optimus/engines/base/functions.py | 6 +++--- optimus/engines/spark/constants.py | 2 +- optimus/helpers/columns_expression.py | 2 +- optimus/helpers/constants.py | 6 +++--- optimus/infer.py | 2 +- optimus/infer_spark.py | 2 +- optimus/profiler/templates/one_column.html | 2 +- .../profiler/templates/out/one_column.html | 2 +- 11 files changed, 24 insertions(+), 24 deletions(-) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index 22bc42ed4..694f6e59d 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -1166,7 +1166,7 @@ def report(self, df, cols="*", buckets=MAX_BUCKETS, infer=False, relative_error= "hist_hours": hist_hour, "hist_minutes": hist_minute} elif col["column_data_type"] == "int" or col["column_data_type"] == "string" or col[ - "column_data_type"] == "decimal": + "column_data_type"] == "float": hist = plot_hist({col_name: hist_dict}, output="base64") hist_pic = {"hist_numeric_string": hist} if "frequency" in col: diff --git a/optimus/engines/base/columns.py b/optimus/engines/base/columns.py index a681028d0..f7fe63fa1 100644 --- a/optimus/engines/base/columns.py +++ b/optimus/engines/base/columns.py @@ -3478,7 +3478,7 @@ def infer_type(self, cols="*", sample=INFER_PROFILER_ROWS, tidy=True) -> dict: dtype_i = 0 if len(dtypes) > 1: - if dtypes[0] == ProfilerDataTypes.INT.value and dtypes[1] == ProfilerDataTypes.DECIMAL.value: + if dtypes[0] == ProfilerDataTypes.INT.value and dtypes[1] == ProfilerDataTypes.FLOAT.value: dtype_i = 1 if dtypes[0] == ProfilerDataTypes.ZIP_CODE.value and dtypes[1] == ProfilerDataTypes.INT.value: diff --git a/optimus/engines/base/constants.py b/optimus/engines/base/constants.py index 1b1343cff..596f258b7 100644 --- a/optimus/engines/base/constants.py +++ b/optimus/engines/base/constants.py @@ -6,7 +6,7 @@ class BaseConstants: # inferred/input to internal OPTIMUS_TO_INTERNAL = {ProfilerDataTypes.INT.value: "int", - ProfilerDataTypes.DECIMAL.value: "float", + ProfilerDataTypes.FLOAT.value: "float", ProfilerDataTypes.STRING.value: "str", ProfilerDataTypes.BOOL.value: "bool", ProfilerDataTypes.BOOLEAN.value: "bool", @@ -52,13 +52,13 @@ class BaseConstants: "uint64": ProfilerDataTypes.INT.value, "binary": ProfilerDataTypes.INT.value, "large_binary": ProfilerDataTypes.INT.value, - "numeric": ProfilerDataTypes.DECIMAL.value, - "float": ProfilerDataTypes.DECIMAL.value, - "float16": ProfilerDataTypes.DECIMAL.value, - "float32": ProfilerDataTypes.DECIMAL.value, - "float64": ProfilerDataTypes.DECIMAL.value, - "float_": ProfilerDataTypes.DECIMAL.value, - "double": ProfilerDataTypes.DECIMAL.value, + "numeric": ProfilerDataTypes.FLOAT.value, + "float": ProfilerDataTypes.FLOAT.value, + "float16": ProfilerDataTypes.FLOAT.value, + "float32": ProfilerDataTypes.FLOAT.value, + "float64": ProfilerDataTypes.FLOAT.value, + "float_": ProfilerDataTypes.FLOAT.value, + "double": ProfilerDataTypes.FLOAT.value, "bool_": ProfilerDataTypes.BOOL.value, "date": ProfilerDataTypes.DATETIME.value, "date32": ProfilerDataTypes.DATETIME.value, @@ -105,7 +105,7 @@ def INT_INTERNAL_TYPES(self): @property def NUMERIC_INTERNAL_TYPES(self): - types = [ProfilerDataTypes.INT.value, ProfilerDataTypes.DECIMAL.value] + types = [ProfilerDataTypes.INT.value, ProfilerDataTypes.FLOAT.value] return types + [item[0] for item in self.INTERNAL_TO_OPTIMUS.items() if item[1] in types] @property @@ -124,7 +124,7 @@ def INT_TYPES(self): @property def NUMERIC_TYPES(self): - types = [ProfilerDataTypes.INT.value, ProfilerDataTypes.DECIMAL.value] + types = [ProfilerDataTypes.INT.value, ProfilerDataTypes.FLOAT.value] return self.ANY_TYPES + types +\ [item[0] for item in self.INTERNAL_TO_OPTIMUS.items() if item[1] in types] diff --git a/optimus/engines/base/functions.py b/optimus/engines/base/functions.py index 2ea36955c..0fd0d723c 100644 --- a/optimus/engines/base/functions.py +++ b/optimus/engines/base/functions.py @@ -13,7 +13,7 @@ from optimus.helpers.constants import ProfilerDataTypes from optimus.helpers.core import one_tuple_to_val, val_to_list from optimus.infer import is_datetime_str, is_list, is_list_of_list, is_null, is_bool, \ - is_credit_card_number, is_zip_code, is_decimal, is_datetime, is_valid_datetime_format, \ + is_credit_card_number, is_zip_code, is_float, is_datetime, is_valid_datetime_format, \ is_object_value, is_ip, is_url, is_missing, is_gender, is_list_of_int, is_list_of_str, \ is_str, is_phone_number, is_int_like @@ -876,8 +876,8 @@ def infer_data_types(self, value, cols_data_types): dtype = ProfilerDataTypes.ZIP_CODE.value elif is_int_like(value): dtype = ProfilerDataTypes.INT.value - elif is_decimal(value): - dtype = ProfilerDataTypes.DECIMAL.value + elif is_float(value): + dtype = ProfilerDataTypes.FLOAT.value elif is_datetime(value): dtype = ProfilerDataTypes.DATETIME.value elif is_missing(value): diff --git a/optimus/engines/spark/constants.py b/optimus/engines/spark/constants.py index 4318bb962..b468d903d 100644 --- a/optimus/engines/spark/constants.py +++ b/optimus/engines/spark/constants.py @@ -75,7 +75,7 @@ class Constants(BaseConstants): STRING_TYPES = ["str"] ARRAY_TYPES = ["array"] - DTYPES_TO_INFERRED = {"int": ["smallint", "tinyint", "bigint", "int"], "decimal": ["float", "double"], + DTYPES_TO_INFERRED = {"int": ["smallint", "tinyint", "bigint", "int"], "float": ["float", "double"], "string": ["string"], "date": ["date", "timestamp"], "boolean": ["boolean"], "binary": ["binary"], "array": ["array"], "object": ["object"], "null": ["null"], "missing": ["missing"]} diff --git a/optimus/helpers/columns_expression.py b/optimus/helpers/columns_expression.py index f4bc9dfab..fed09a2ae 100644 --- a/optimus/helpers/columns_expression.py +++ b/optimus/helpers/columns_expression.py @@ -158,7 +158,7 @@ def hist_date(): if data_type is not None: col_data_type = data_type[col_name]["data_type"] - if col_data_type == "int" or col_data_type == "decimal": + if col_data_type == "int" or col_data_type == "float": exprs = hist_numeric(min_max, buckets) elif col_data_type == "string": exprs = hist_string(buckets) diff --git a/optimus/helpers/constants.py b/optimus/helpers/constants.py index c5d7e676b..251096621 100644 --- a/optimus/helpers/constants.py +++ b/optimus/helpers/constants.py @@ -143,7 +143,7 @@ class ProfilerDataTypesQuality(Enum): class ProfilerDataTypes(Enum): INT = "int" - DECIMAL = "decimal" + FLOAT = "float" STRING = "str" BOOL = "bool" BOOLEAN = "boolean" @@ -186,7 +186,7 @@ def list(): return list(map(lambda c: c.value, Schemas)) -PROFILER_NUMERIC_DTYPES = [ProfilerDataTypes.INT.value, ProfilerDataTypes.DECIMAL.value] +PROFILER_NUMERIC_DTYPES = [ProfilerDataTypes.INT.value, ProfilerDataTypes.FLOAT.value] PROFILER_STRING_DTYPES = [ProfilerDataTypes.STRING.value, ProfilerDataTypes.BOOLEAN.value, ProfilerDataTypes.DATETIME.value, ProfilerDataTypes.ARRAY.value, ProfilerDataTypes.OBJECT.value, ProfilerDataTypes.GENDER.value, @@ -405,7 +405,7 @@ def print_check_point_config(filesystem): PYTHON_TYPES = {"string": str, "int": int, "float": float, "boolean": bool} PROFILER_COLUMN_TYPES = {"categorical", "numeric", "date", "null", "array", "binary"} PYTHON_TO_PROFILER = {"string": "categorical", "boolean": "categorical", "int": "numeric", "float": "numeric", - "decimal": "numeric", "date": "date", "array": "array", "binary": "binary", "null": "null"} + "float": "numeric", "date": "date", "array": "array", "binary": "binary", "null": "null"} PROFILER_CATEGORICAL_DTYPES = [ProfilerDataTypes.BOOL.value, ProfilerDataTypes.BOOLEAN.value, ProfilerDataTypes.ZIP_CODE.value, diff --git a/optimus/infer.py b/optimus/infer.py index f6cf96fbd..d8fa417a1 100644 --- a/optimus/infer.py +++ b/optimus/infer.py @@ -636,7 +636,7 @@ def is_dask_future(value): return isinstance(value, distributed.client.Future) -def is_decimal(value): +def is_float(value): return fastnumbers.isfloat(value, allow_nan=True) diff --git a/optimus/infer_spark.py b/optimus/infer_spark.py index c3b71d234..f235ae6bd 100644 --- a/optimus/infer_spark.py +++ b/optimus/infer_spark.py @@ -18,7 +18,7 @@ "bigint": LongType(), "date": DateType(), "byte": ByteType(), "short": ShortType(), "datetime": TimestampType(), "binary": BinaryType(), "null": NullType() } -SPARK_DTYPES_TO_INFERRED = {"int": ["smallint", "tinyint", "bigint", "int"], "decimal": ["float", "double"], +SPARK_DTYPES_TO_INFERRED = {"int": ["smallint", "tinyint", "bigint", "int"], "float": ["float", "double"], "string": "string", "date": {"date", "timestamp"}, "boolean": "boolean", "binary": "binary", "array": "array", "object": "object", "null": "null", "missing": "missing"} PYSPARK_NUMERIC_TYPES = ["byte", "short", "big", "int", "double", "float"] diff --git a/optimus/profiler/templates/one_column.html b/optimus/profiler/templates/one_column.html index 2faf18302..925db7075 100644 --- a/optimus/profiler/templates/one_column.html +++ b/optimus/profiler/templates/one_column.html @@ -2,7 +2,7 @@
- {% if data.column_type=="categorical" or data.column_type=="decimal" or data.column_type=="numeric" or data.column_type=="date" or + {% if data.column_type=="categorical" or data.column_type=="float" or data.column_type=="numeric" or data.column_type=="date" or data.column_type=="bool" or data.column_type=="array" or data.column_type=="null" or data.column_type=="timestamp"%}
diff --git a/optimus/profiler/templates/out/one_column.html b/optimus/profiler/templates/out/one_column.html index b8a72c3ce..e2483ccff 100644 --- a/optimus/profiler/templates/out/one_column.html +++ b/optimus/profiler/templates/out/one_column.html @@ -2,7 +2,7 @@
- {% if data.column_type=="categorical" or data.column_type=="decimal" or data.column_type=="numeric" or data.column_type=="date" or + {% if data.column_type=="categorical" or data.column_type=="float" or data.column_type=="numeric" or data.column_type=="date" or data.column_type=="bool" or data.column_type=="array" or data.column_type=="null" or data.column_type=="timestamp"%}
From ce9d5e1700d2ebc656bda7b44638fe66f65575c9 Mon Sep 17 00:00:00 2001 From: Luis Date: Fri, 8 Oct 2021 10:13:53 -0400 Subject: [PATCH 02/31] Remove decimal --- optimus/engines/base/columns.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/optimus/engines/base/columns.py b/optimus/engines/base/columns.py index f7fe63fa1..eb36deb2b 100644 --- a/optimus/engines/base/columns.py +++ b/optimus/engines/base/columns.py @@ -3377,9 +3377,6 @@ def quality(self, cols="*", flush=False, compute=True) -> dict: cols_types = self.root.cols.infer_type(cols, tidy=False)["infer_type"] result = {} - profiler_to_mask_func = { - "decimal": "float" - } quality_props = ["match", "missing", "mismatch"] @@ -3401,8 +3398,6 @@ def quality(self, cols="*", flush=False, compute=True) -> dict: dtype = df.constants.INTERNAL_TO_OPTIMUS.get(dtype, dtype) - dtype = profiler_to_mask_func.get(dtype, dtype) - matches_mismatches = getattr(df[col_name].mask, dtype)( col_name).cols.frequency() From 4b6a50889a3d444851dda1f6686b3da0caf2ddd8 Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 11 Oct 2021 11:02:06 -0400 Subject: [PATCH 03/31] Fix float type inferring bugs --- optimus/engines/base/functions.py | 4 ++-- optimus/engines/base/pandas/functions.py | 3 ++- optimus/infer.py | 7 +++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/optimus/engines/base/functions.py b/optimus/engines/base/functions.py index 0fd0d723c..6b433de8a 100644 --- a/optimus/engines/base/functions.py +++ b/optimus/engines/base/functions.py @@ -13,7 +13,7 @@ from optimus.helpers.constants import ProfilerDataTypes from optimus.helpers.core import one_tuple_to_val, val_to_list from optimus.infer import is_datetime_str, is_list, is_list_of_list, is_null, is_bool, \ - is_credit_card_number, is_zip_code, is_float, is_datetime, is_valid_datetime_format, \ + is_credit_card_number, is_zip_code, is_float_like, is_datetime, is_valid_datetime_format, \ is_object_value, is_ip, is_url, is_missing, is_gender, is_list_of_int, is_list_of_str, \ is_str, is_phone_number, is_int_like @@ -876,7 +876,7 @@ def infer_data_types(self, value, cols_data_types): dtype = ProfilerDataTypes.ZIP_CODE.value elif is_int_like(value): dtype = ProfilerDataTypes.INT.value - elif is_float(value): + elif is_float_like(value): dtype = ProfilerDataTypes.FLOAT.value elif is_datetime(value): dtype = ProfilerDataTypes.DATETIME.value diff --git a/optimus/engines/base/pandas/functions.py b/optimus/engines/base/pandas/functions.py index 8fce5a520..bd680df2a 100644 --- a/optimus/engines/base/pandas/functions.py +++ b/optimus/engines/base/pandas/functions.py @@ -30,7 +30,8 @@ def is_integer(self, series): @staticmethod def is_float(series): - return np.vectorize(isfloat)(series).flatten() + # use isreal to allow strings like "0" + return np.vectorize(isreal)(series).flatten() def is_numeric(self, series): if str(series.dtype) in self.constants.DATETIME_INTERNAL_TYPES: diff --git a/optimus/infer.py b/optimus/infer.py index d8fa417a1..99aad7a24 100644 --- a/optimus/infer.py +++ b/optimus/infer.py @@ -657,6 +657,13 @@ def is_int_like(value): """ return fastnumbers.isintlike(value) +def is_float_like(value): + """ + Check if a var is a float + :param value: + :return: + """ + return fastnumbers.isfloat(value) def is_url(value): regex = re.compile( From 599be722cd8dce3d4e114f0c50ac3767186111a4 Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 11 Oct 2021 13:13:37 -0400 Subject: [PATCH 04/31] Fix quality on user set data types bug --- optimus/engines/base/basedataframe.py | 11 +++++------ optimus/engines/base/columns.py | 13 +++++++++---- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index 694f6e59d..284318ee8 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -850,13 +850,12 @@ def calculate_profile(self, cols="*", bins: int = MAX_BUCKETS, flush: bool = Fal cols_data_types = {} cols_to_infer = [*cols_to_profile] - if not flush: - for col_name in cols_to_profile: - _props = Meta.get(df.meta, f"columns_data_types.{col_name}") + for col_name in cols_to_profile: + col_data_type = Meta.get(df.meta, f"columns_data_types.{col_name}") - if _props is not None: - cols_data_types[col_name] = _props - cols_to_infer.remove(col_name) + if col_data_type is not None: + cols_data_types[col_name] = col_data_type + cols_to_infer.remove(col_name) if cols_to_infer: cols_data_types = {**cols_data_types, **df.cols.infer_type(cols_to_infer, tidy=False)["infer_type"]} diff --git a/optimus/engines/base/columns.py b/optimus/engines/base/columns.py index eb36deb2b..4f61b1c4f 100644 --- a/optimus/engines/base/columns.py +++ b/optimus/engines/base/columns.py @@ -578,7 +578,7 @@ def parse_inferred_types(self, col_data_type): columns[k] = result_default return columns - def inferred_data_type(self, cols="*", use_internal=False, tidy=True): + def inferred_data_type(self, cols="*", use_internal=False, calculate=False, tidy=True): """ Get the inferred data types from the meta data. @@ -595,10 +595,16 @@ def inferred_data_type(self, cols="*", use_internal=False, tidy=True): for col_name in cols: data_type = Meta.get(df.meta, f"columns_data_types.{col_name}.data_type") + if data_type is None: data_type = Meta.get(df.meta, f"profile.columns.{col_name}.stats.inferred_data_type.data_type") + if data_type is None: data_type = result.get(col_name, None) + + if calculate and data_type is None: + data_type = df.cols.infer_type(col_name) + result.update({col_name: data_type}) result = {"inferred_data_type": result} @@ -3374,7 +3380,7 @@ def quality(self, cols="*", flush=False, compute=True) -> dict: if is_dict(cols): cols_types = cols else: - cols_types = self.root.cols.infer_type(cols, tidy=False)["infer_type"] + cols_types = self.root.cols.inferred_data_type(cols, calculate=True, tidy=False)["inferred_data_type"] result = {} @@ -3393,8 +3399,7 @@ def quality(self, cols="*", flush=False, compute=True) -> dict: "mismatch": cached_props.get("mismatch")} continue - # Match the profiler dtype with the function. The only function that need to be remapped are decimal and int - dtype = props["data_type"] + dtype = props if is_str(props) else props["data_type"] dtype = df.constants.INTERNAL_TO_OPTIMUS.get(dtype, dtype) From bbd030dfda829d109cad327027452fd6ec6ee02c Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 11 Oct 2021 15:50:46 -0400 Subject: [PATCH 05/31] Fix dask.distributed bug --- optimus/engines/base/dask/dataframe.py | 4 ++-- optimus/engines/base/remote.py | 16 ++++++++-------- optimus/engines/dask/dask.py | 2 +- optimus/engines/dask/engine.py | 4 ++-- optimus/engines/dask_cudf/dask_cudf.py | 2 +- optimus/engines/dask_cudf/engine.py | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/optimus/engines/base/dask/dataframe.py b/optimus/engines/base/dask/dataframe.py index c6516b09e..0bc87aa87 100644 --- a/optimus/engines/base/dask/dataframe.py +++ b/optimus/engines/base/dask/dataframe.py @@ -6,7 +6,7 @@ import humanize import numpy as np import pandas as pd -from dask.distributed import Variable +from distributed import Variable from dask.utils import parse_bytes from optimus.engines.base.basedataframe import BaseDataFrame @@ -232,7 +232,7 @@ def repartition(self, n=None, *args, **kwargs): if n == "auto": # Follow a heuristic for partitioning a mentioned # https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions - client = dask.distributed.get_client() + client = distributed.get_client() worker_memory = parse_bytes(client.cluster.worker_spec[0]["options"]["memory_limit"]) nthreads = client.cluster.worker_spec[0]["options"]["nthreads"] diff --git a/optimus/engines/base/remote.py b/optimus/engines/base/remote.py index a102fbee8..a759fbec8 100644 --- a/optimus/engines/base/remote.py +++ b/optimus/engines/base/remote.py @@ -112,7 +112,7 @@ def _create_actor(self, engine): self.engine = engine def _init(_engine): - from dask.distributed import get_worker + from distributed import get_worker worker = get_worker() worker.actor = RemoteOptimus(_engine) return f"Created remote Optimus instance using \"{_engine}\"" @@ -121,7 +121,7 @@ def _init(_engine): def submit(self, func, *args, priority=0, pure=False, **kwargs): def _remote(_func, *args, **kwargs): - from dask.distributed import get_worker + from distributed import get_worker actor = get_worker().actor return actor.submit(_func, priority=priority, pure=pure, *args, **kwargs) @@ -140,7 +140,7 @@ def run(self, func, *args, **kwargs): def list_vars(self, client_timeout=MAX_TIMEOUT): def _list_vars(): - from dask.distributed import get_worker + from distributed import get_worker op = get_worker().actor.op return op.list_vars() @@ -148,7 +148,7 @@ def _list_vars(): def clear_vars(self, keep=[], client_timeout=MAX_TIMEOUT): def _clear_vars(keep=[]): - from dask.distributed import get_worker + from distributed import get_worker op = get_worker().actor.op return op.clear_vars(keep) @@ -156,7 +156,7 @@ def _clear_vars(keep=[]): def update_vars(self, values, client_timeout=MAX_TIMEOUT): def _update_vars(values): - from dask.distributed import get_worker + from distributed import get_worker op = get_worker().actor.op return op.update_vars(values) @@ -164,7 +164,7 @@ def _update_vars(values): def del_var(self, name, client_timeout=MAX_TIMEOUT): def _del_var(name): - from dask.distributed import get_worker + from distributed import get_worker op = get_worker().actor.op return op.del_var(name) @@ -172,7 +172,7 @@ def _del_var(name): def set_var(self, name, value, client_timeout=MAX_TIMEOUT): def _set_var(name): - from dask.distributed import get_worker + from distributed import get_worker op = get_worker().actor.op return op.set_var(name, value) @@ -180,7 +180,7 @@ def _set_var(name): def get_var(self, name, client_timeout=MAX_TIMEOUT): def _get_var(name): - from dask.distributed import get_worker + from distributed import get_worker op = get_worker().actor.op return op.get_var(name) diff --git a/optimus/engines/dask/dask.py b/optimus/engines/dask/dask.py index f1d9aca72..6de20272c 100644 --- a/optimus/engines/dask/dask.py +++ b/optimus/engines/dask/dask.py @@ -1,4 +1,4 @@ -from dask.distributed import Client +from distributed import Client from optimus.engines.base.dask.constants import STARTING_DASK from optimus.helpers.constants import JUST_CHECKING diff --git a/optimus/engines/dask/engine.py b/optimus/engines/dask/engine.py index ca5d4aeac..f93a1553a 100644 --- a/optimus/engines/dask/engine.py +++ b/optimus/engines/dask/engine.py @@ -1,5 +1,5 @@ import dask -from dask.distributed import Client, get_client +from distributed import Client, get_client from optimus.engines.dask.create import Create from optimus.engines.base.engine import BaseEngine @@ -164,4 +164,4 @@ def submit(self, func, *args, priority=0, pure=False, **kwargs): from optimus.engines.base.remote import RemoteDummyAttribute if isinstance(func, (RemoteDummyAttribute,)): return func(client_submit=True, *args, **kwargs) - return dask.distributed.get_client().submit(func, priority=priority, pure=pure, *args, **kwargs) + return distributed.get_client().submit(func, priority=priority, pure=pure, *args, **kwargs) diff --git a/optimus/engines/dask_cudf/dask_cudf.py b/optimus/engines/dask_cudf/dask_cudf.py index 2ed4bf658..ae4aede88 100644 --- a/optimus/engines/dask_cudf/dask_cudf.py +++ b/optimus/engines/dask_cudf/dask_cudf.py @@ -1,4 +1,4 @@ -from dask.distributed import Client +from distributed import Client # from dask_cuda import LocalCUDACluster from optimus.engines.base.dask.constants import STARTING_DASK diff --git a/optimus/engines/dask_cudf/engine.py b/optimus/engines/dask_cudf/engine.py index 2bb7b7c38..4be483c8e 100644 --- a/optimus/engines/dask_cudf/engine.py +++ b/optimus/engines/dask_cudf/engine.py @@ -1,5 +1,5 @@ import dask -from dask.distributed import Client, get_client +from distributed import Client, get_client from optimus.engines.base.engine import BaseEngine from optimus.engines.base.remote import MAX_TIMEOUT, RemoteOptimusInterface, RemoteDummyVariable, RemoteDummyDataFrame @@ -173,7 +173,7 @@ def submit(self, func, *args, priority=0, pure=False, **kwargs): from optimus.engines.base.remote import RemoteDummyAttribute if isinstance(func, (RemoteDummyAttribute,)): return func(client_submit=True, *args, **kwargs) - return dask.distributed.get_client().submit(func, priority=priority, pure=pure, *args, **kwargs) + return distributed.get_client().submit(func, priority=priority, pure=pure, *args, **kwargs) @property def engine_label(self): From 9c605d3ba2b79129536a478a7bc7aafce2a3ec16 Mon Sep 17 00:00:00 2001 From: Luis Date: Wed, 13 Oct 2021 13:20:18 -0400 Subject: [PATCH 06/31] Fix partial profiling bug --- optimus/engines/base/columns.py | 3 ++- optimus/engines/base/profile.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/optimus/engines/base/columns.py b/optimus/engines/base/columns.py index 4f61b1c4f..3c255765f 100644 --- a/optimus/engines/base/columns.py +++ b/optimus/engines/base/columns.py @@ -196,7 +196,8 @@ def select(self, cols="*", regex=None, data_type=None, invert=False, accepts_mis cols = parse_columns(df, cols if regex is None else regex, is_regex=regex is not None, filter_by_column_types=data_type, invert=invert, accepts_missing_cols=accepts_missing_cols) - meta = Meta.select_columns(df.meta, cols) + meta = df.meta + meta = Meta.select_columns(meta, cols) dfd = df.data if cols is not None: dfd = dfd[cols] diff --git a/optimus/engines/base/profile.py b/optimus/engines/base/profile.py index 78f0e3b03..1e8914672 100644 --- a/optimus/engines/base/profile.py +++ b/optimus/engines/base/profile.py @@ -77,7 +77,7 @@ def __call__(self, cols="*", bins: int = MAX_BUCKETS, output: str = None, flush: calculate = True if calculate: - df = df[cols].calculate_profile(cols, bins, flush, size) + df = df.calculate_profile(cols, bins, flush, size) profile = Meta.get(df.meta, "profile") self.root.meta = df.meta profile["columns"] = {key: profile["columns"][key] for key in cols} From 2d6970c0dfc56ca6622c36c6081ae818684f1062 Mon Sep 17 00:00:00 2001 From: Luis Date: Wed, 13 Oct 2021 17:06:25 -0400 Subject: [PATCH 07/31] Fix sort bug on Pandas --- optimus/engines/base/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimus/engines/base/functions.py b/optimus/engines/base/functions.py index 6b433de8a..8104dbf1a 100644 --- a/optimus/engines/base/functions.py +++ b/optimus/engines/base/functions.py @@ -135,7 +135,7 @@ def sort_df(dfd, cols, ascending): :param ascending: :return: """ - return dfd.sort_values(cols, ascending=ascending) + return dfd.sort_values(cols, ascending=ascending).reset_index(drop=True) @staticmethod def reverse_df(dfd): From fe1a4b4fc0f5d7453a1b86465aff5d05fd4464c1 Mon Sep 17 00:00:00 2001 From: Luis Date: Thu, 14 Oct 2021 11:18:24 -0400 Subject: [PATCH 08/31] Include exclusive joins --- optimus/engines/base/basedataframe.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index 284318ee8..40809a25c 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -1014,7 +1014,7 @@ def join(self, df_right: 'DataFrameType', how="left", on=None, left_on=None, rig """ Join 2 dataframes SQL style :param df_right: - :param how{‘left’, ‘right’, ‘outer’, ‘inner’}, default ‘left’ + :param how{‘left’, ‘right’, ‘outer’, ‘inner’, ‘exclusive’, ‘exclusive left’, ‘exclusive right’}, default ‘left’ :param on: :param left_on: :param right_on: @@ -1051,8 +1051,29 @@ def join(self, df_right: 'DataFrameType', how="left", on=None, left_on=None, rig left_names = df_left.cols.names() right_names = df_right.cols.names() - df = self.root.new(df_left.data.merge(df_right.data, how=how, left_on=left_on, right_on=right_on, - suffixes=(suffix_left, suffix_right))) + + if how in ['exclusive', 'exclusive left', 'exclusive right']: + _how = 'outer' + indicator = True + else: + _how = how + indicator = False + + dfd = df_left.data.merge(df_right.data, how=_how, left_on=left_on, + right_on=right_on, suffixes=(suffix_left, suffix_right), + indicator=indicator) + + if how == 'exclusive': + dfd = dfd[(dfd["_merge"] == "left_only") | (dfd["_merge"] == "right_only")] + elif how == 'exclusive left': + dfd = dfd[dfd["_merge"] == "left_only"] + elif how == 'exclusive right': + dfd = dfd[dfd["_merge"] == "right_only"] + + if indicator: + dfd = dfd.drop(["_merge"], axis=1) + + df = self.root.new(dfd) # Reorder last_column_name = left_names[-1] From 6c283fe2709487a7b306f4064defa085916bd656 Mon Sep 17 00:00:00 2001 From: Luis Date: Thu, 14 Oct 2021 12:20:36 -0400 Subject: [PATCH 09/31] Fix local load bug on Pandas --- optimus/engines/pandas/io/load.py | 45 ++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/optimus/engines/pandas/io/load.py b/optimus/engines/pandas/io/load.py index 53c828314..1e399b0ce 100644 --- a/optimus/engines/pandas/io/load.py +++ b/optimus/engines/pandas/io/load.py @@ -29,47 +29,72 @@ def df(*args, **kwargs): @staticmethod def _csv(filepath_or_buffer, *args, **kwargs): kwargs.pop("n_partitions", None) + if is_url(filepath_or_buffer): s = requests.get(filepath_or_buffer).text df = pd.read_csv(StringIO(s), *args, **kwargs) else: df = pd.read_csv(filepath_or_buffer, *args, **kwargs) + if isinstance(df, pd.io.parsers.TextFileReader): df = df.get_chunk() + return df @staticmethod def _json(filepath_or_buffer, *args, **kwargs): kwargs.pop("n_partitions", None) - s = requests.get(filepath_or_buffer).text - return pd.read_json(StringIO(s), *args, **kwargs) + + if is_url(filepath_or_buffer): + s = requests.get(filepath_or_buffer).text + df = pd.read_json(StringIO(s), *args, **kwargs) + else: + df = pd.read_json(filepath_or_buffer, *args, **kwargs) + + return df @staticmethod def _avro(filepath_or_buffer, nrows=None, *args, **kwargs): kwargs.pop("n_partitions", None) - s = requests.get(filepath_or_buffer).text - df = pdx.read_avro(StringIO(s), *args, **kwargs) + + if is_url(filepath_or_buffer): + s = requests.get(filepath_or_buffer).text + df = pdx.read_avro(StringIO(s), *args, **kwargs) + else: + df = pdx.read_avro(filepath_or_buffer, *args, **kwargs) + if nrows: logger.warn(f"'load.avro' on {EnginePretty.PANDAS.value} loads the whole dataset and then truncates it") df = df[:nrows] + return df @staticmethod def _parquet(filepath_or_buffer, nrows=None, engine="pyarrow", *args, **kwargs): - kwargs.pop("n_partitions", None) - s = requests.get(filepath_or_buffer).text - df = pd.StringIO(s)(filepath_or_buffer, engine=engine, *args, **kwargs) + kwargs.pop("n_partitions", None) + + if is_url(filepath_or_buffer): + s = requests.get(filepath_or_buffer).text + df = pd.read_parquet(StringIO(s), engine=engine, *args, **kwargs) + else: + df = pd.read_parquet(filepath_or_buffer, engine=engine, *args, **kwargs) + if nrows: logger.warn(f"'load.parquet' on {EnginePretty.PANDAS.value} loads the whole dataset and then truncates it") df = df[:nrows] - + return df @staticmethod def _xml(filepath_or_buffer, nrows=None, *args, **kwargs): kwargs.pop("n_partitions", None) - s = requests.get(filepath_or_buffer).text - df = pd.read_xml(StringIO(s), *args, **kwargs) + + if is_url(filepath_or_buffer): + s = requests.get(filepath_or_buffer).text + df = pd.read_xml(StringIO(s), *args, **kwargs) + else: + df = pd.read_xml(filepath_or_buffer, *args, **kwargs) + if nrows: logger.warn(f"'load.xml' on {EnginePretty.PANDAS.value} loads the whole dataset and then truncates it") df = df[:nrows] From ce9f491fd5e4d01a153db42ed4158c3715862332 Mon Sep 17 00:00:00 2001 From: Luis Date: Fri, 15 Oct 2021 10:00:20 -0400 Subject: [PATCH 10/31] Adjust mode and percentile functions output formats --- optimus/engines/base/functions.py | 43 +++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/optimus/engines/base/functions.py b/optimus/engines/base/functions.py index 8104dbf1a..570cee754 100644 --- a/optimus/engines/base/functions.py +++ b/optimus/engines/base/functions.py @@ -11,7 +11,7 @@ from optimus.helpers.logger import logger from optimus.helpers.constants import ProfilerDataTypes -from optimus.helpers.core import one_tuple_to_val, val_to_list +from optimus.helpers.core import one_list_to_val, one_tuple_to_val, val_to_list from optimus.infer import is_datetime_str, is_list, is_list_of_list, is_null, is_bool, \ is_credit_card_number, is_zip_code, is_float_like, is_datetime, is_valid_datetime_format, \ is_object_value, is_ip, is_url, is_missing, is_gender, is_list_of_int, is_list_of_str, \ @@ -55,6 +55,27 @@ def _functions(self): """ return self._partition_engine + @staticmethod + def _format_to_dict(_s): + if hasattr(_s, "to_dict"): + return _s.to_dict() + else: + return _s + + @staticmethod + def _format_to_list(_s): + if hasattr(_s, "tolist"): + return _s.tolist() + else: + return _s + + @staticmethod + def _format_to_list_one(_s): + if hasattr(_s, "tolist"): + return one_list_to_val(_s.tolist()) + else: + return _s + @property def n_partitions(self): if self.root is None: @@ -290,9 +311,12 @@ def max(self, series, numeric=False, string=False): def mean(self, series): return self.to_float(series).mean() - @staticmethod - def mode(series): - return series.mode() + def mode(self, series): + + if str(series.dtype) not in self.constants.NUMERIC_INTERNAL_TYPES: + return self.delayed(self._format_to_list_one)(self.to_string(series.dropna()).mode()) + else: + return self.delayed(self._format_to_list_one)(series.mode()) @staticmethod def crosstab(series, other): @@ -372,7 +396,7 @@ def range(self, series): series = self.to_float(series) return {"min": series.min(), "max": series.max()} - def percentile(self, series, values, error, estimate=False): + def percentile(self, series, values=0.5, error=False, estimate=False): _series = self.to_float(series).dropna() @@ -382,14 +406,7 @@ def percentile(self, series, values, error, estimate=False): if not len(_series): return np.nan else: - @self.delayed - def format_percentile(_s): - if hasattr(_s, "to_dict"): - return _s.to_dict() - else: - return _s - - return format_percentile(_series.quantile(values)) + return self.delayed(self._format_to_dict)(_series.quantile(values)) # def radians(series): # return series.to_float().radians() From b0ea69674f0868f4d7e0ca80c9bf80ee8f9a0f81 Mon Sep 17 00:00:00 2001 From: Luis Date: Fri, 15 Oct 2021 10:00:39 -0400 Subject: [PATCH 11/31] Adjust parser function descriptions --- optimus/expressions.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/optimus/expressions.py b/optimus/expressions.py index 74dd05b9f..49d5f5d55 100644 --- a/optimus/expressions.py +++ b/optimus/expressions.py @@ -267,8 +267,6 @@ "example": "ROUND(99.44, 1)", "text": "ROUND", }, - # "RADIANS": "RADIANS function description", # Trigonometric Functions - # "DEGREES": "DEGREES function description", "SIN": { "description": "Returns the sin of an angle provided in radians.", "parameters": [ @@ -479,7 +477,6 @@ "example": "TRIM('optimus prime')", "text": "TRIM", }, - "REMOVE": "ATANH function description", "LEN": { "description": "Returns the length of a string.", "parameters": [ @@ -511,7 +508,7 @@ "example": "LEN('optimus prime')", "text": "LEN", }, - "RFIND": "ATANH function description", + "RFIND": "RFIND", "LEFT": { "description": "Returns a substring from the beginning of a specified string.", "parameters": [ @@ -536,9 +533,9 @@ "example": "LEN('optimus prime')", "text": "LEN", }, - "STARTS_WITH": "ATANH function description", - "ENDS_WITH": "ATANH function description", - "EXACT": "ATANH function description", + "STARTS_WITH": "STARTS_WITH", + "ENDS_WITH": "ENDS_WITH", + "EXACT": "EXACT", "YEAR": { # Date Functions "description": "Returns the year specified by a given date.", "parameters": [ From 59a70b169b06e6d5724051e063d3a7bed0330e73 Mon Sep 17 00:00:00 2001 From: Luis Date: Fri, 15 Oct 2021 17:13:46 -0400 Subject: [PATCH 12/31] Allow dataframe values on fill_na --- optimus/engines/base/columns.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/optimus/engines/base/columns.py b/optimus/engines/base/columns.py index 3c255765f..bfcbf0ae5 100644 --- a/optimus/engines/base/columns.py +++ b/optimus/engines/base/columns.py @@ -2918,22 +2918,33 @@ def impute(self, cols="*", data_type="auto", strategy="auto", fill_value=None, o return df - def fill_na(self, cols="*", value=None, output_cols=None) -> 'DataFrameType': + def fill_na(self, cols="*", value=None, output_cols=None, + eval_value: bool = False) -> 'DataFrameType': """ Replace null data with a specified value. :param cols: '*', list of columns names or a single column name. :param value: value to replace the nan/None values :param output_cols: Column name or list of column names where the transformed data will be saved. + :param eval_value: Parse 'value' param in case a string is passed. :return: Returns the column filled with given value. """ df = self.root - columns = prepare_columns(df, cols, output_cols) + cols = parse_columns(df, cols) + values, eval_values = prepare_columns_arguments(cols, value, eval_value) + output_cols = get_output_cols(cols, output_cols) kw_columns = {} - for input_col, output_col in columns: + for input_col, output_col, value, eval_value in zip(cols, output_cols, values, eval_values): + + if eval_value and is_str(value): + value = eval(value) + + if isinstance(value, self.root.__class__): + value = value.get_series() + kw_columns[output_col] = df.data[input_col].fillna(value) kw_columns[output_col] = kw_columns[output_col].mask( kw_columns[output_col] == "", value) From 485f9ea9e22612537cedf34b531b4ef873e5f96d Mon Sep 17 00:00:00 2001 From: Luis Date: Fri, 15 Oct 2021 17:36:52 -0400 Subject: [PATCH 13/31] Fix match mask parameter bug --- optimus/engines/base/mask.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/optimus/engines/base/mask.py b/optimus/engines/base/mask.py index 92a5913a8..7caf1ea38 100644 --- a/optimus/engines/base/mask.py +++ b/optimus/engines/base/mask.py @@ -161,11 +161,13 @@ def match_data_type(self, cols="*", data_type=None) -> 'MaskDataFrameType': if is_list(data_type): mask_match = None for _col, _data_type in zip(cols, data_type): + _data_type = df.constants.INTERNAL_TO_OPTIMUS.get(_data_type, _data_type) if mask_match is None: mask_match = getattr(df[_col].mask, _data_type)(_col) else: mask_match[_col] = getattr(df[_col].mask, _data_type)(_col) else: + data_type = df.constants.INTERNAL_TO_OPTIMUS.get(data_type, data_type) mask_match = getattr(df[cols].mask, data_type)(cols) return mask_match From 8f396e8b5d3a0aa291edd773694381a0697fa83c Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 18 Oct 2021 14:30:04 -0400 Subject: [PATCH 14/31] Fix tranformations bug on partial profiling --- optimus/engines/base/basedataframe.py | 4 ++-- optimus/engines/base/meta.py | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index 40809a25c..f240f9ed7 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -983,7 +983,7 @@ def merge(_columns, _hist, _freq, _mismatch, _data_types, _count_uniques): all_columns_names = df.cols.names() - meta = Meta.set(meta, "transformations", value={}) + # meta = Meta.set(meta, "transformations", value={}) # Order columns actual_columns = profiler_data["columns"] @@ -997,7 +997,7 @@ def merge(_columns, _hist, _freq, _mismatch, _data_types, _count_uniques): meta = df.meta # Reset Actions - meta = Meta.reset_actions(meta) + meta = Meta.reset_actions(meta, parse_columns(df, cols)) df.meta = meta profiler_time["end"] = {"elapsed_time": time.process_time() - _t} # print(profiler_time) diff --git a/optimus/engines/base/meta.py b/optimus/engines/base/meta.py index 6445e746d..d8c76896f 100644 --- a/optimus/engines/base/meta.py +++ b/optimus/engines/base/meta.py @@ -61,14 +61,19 @@ def get(meta, spec=None) -> dict: return copy.deepcopy(data) @staticmethod - def reset_actions(meta): + def reset_actions(meta, cols="*"): """ Reset the data frame metadata :param meta: Meta data to be modified :return: """ - return Meta.set(meta, ACTIONS_PATH, []) + if cols == "*": + return Meta.set(meta, ACTIONS_PATH, []) + else: + actions = Meta.get(meta, ACTIONS_PATH) or [] + actions = [action for action in actions if action["columns"] not in cols] + return Meta.set(meta, ACTIONS_PATH, actions) @staticmethod From e539130bd4663a9672ba1f6a7eb8e130f03fcb22 Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 18 Oct 2021 14:30:22 -0400 Subject: [PATCH 15/31] Fix aggregation and group by operations --- optimus/engines/base/basedataframe.py | 15 +++++++++--- optimus/engines/base/columns.py | 34 ++++++++++++++++++++------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index f240f9ed7..2fface011 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -1095,14 +1095,22 @@ def string_clustering(self, cols="*", algorithm="fingerrint", *args, **kwargs): # return clusters def agg(self, aggregations: dict, groupby=None, output="dict", tidy=True): + """ + :param aggregations: Dictionary or list of tuples with the form [("col", "agg")] + :param groupby: None, list of columns names or a single column name to group the aggregations. + :param output{‘dict’, ‘dataframe’}, default ‘dict’: Output type. + """ df = self dfd = df.data + if is_dict(aggregations): + aggregations = aggregations.items() + if groupby: groupby = parse_columns(df, groupby) - for column, aggregations_set in aggregations.items(): + for column, aggregations_set in aggregations: aggregations[column] = val_to_list(aggregations_set) dfd = dfd.groupby(groupby).agg(aggregations) @@ -1113,19 +1121,20 @@ def agg(self, aggregations: dict, groupby=None, output="dict", tidy=True): result = dfd.to_dict() elif output == "dataframe": + dfd.columns = [str(c) for c in dfd.columns] result = self.new(dfd.reset_index()) else: result = {} - for column, aggregations_set in aggregations.items(): + for column, aggregations_set in aggregations: aggregations_set = val_to_list(aggregations_set) for aggregation in aggregations_set: result[column + "_" + aggregation] = getattr( df.cols, aggregation)(column, tidy=True) if output == "dataframe": - result = self.new(result) + result = self.op.create.dataframe({k: [v] for k, v in result.items()}) return convert_numpy(format_dict(result, tidy=tidy)) diff --git a/optimus/engines/base/columns.py b/optimus/engines/base/columns.py index bfcbf0ae5..7d8b0115c 100644 --- a/optimus/engines/base/columns.py +++ b/optimus/engines/base/columns.py @@ -1098,15 +1098,21 @@ def pattern_counts(self, cols="*", n=10, mode=0, flush=False) -> dict: return result - def groupby(self, by, agg) -> 'DataFrameType': + def groupby(self, by: Union[str, list] = None, agg: Union[list, dict] = None) -> 'DataFrameType': """ This helper function aims to help managing columns name in the aggregation output. Also how to handle ordering columns because dask can order columns. - :param by: Column name. - :param agg: List of tuples with the form [("agg", "col")] + :param by: None, list of columns names or a single column name to group the aggregations. + :param agg: List of tuples with the form [("agg", "col")] or + [("agg", "col", "new_col_name")] or dictionary with the form + {"new_col_name": {"col": "agg"}} :return: """ + + if agg is None: + raise TypeError(f"Can't aggregate with 'agg' value {agg}") + df = self.root compact = {} @@ -1129,10 +1135,20 @@ def groupby(self, by, agg) -> 'DataFrameType': dfd = df.data - dfd = dfd.groupby(by=by).agg(compact).reset_index() - agg_names = agg_names or [a[0] + "_" + a[1] for a in agg] - dfd.columns = (val_to_list(by) + agg_names) - dfd.columns = [str(c) for c in dfd.columns] + if by and len(by): + dfd = dfd.groupby(by=by) + + dfd = dfd.agg(compact).reset_index() + + if by and len(by): + agg_names = agg_names or [a[1] if len(a) < 3 else a[2] for a in agg] + dfd.columns = (val_to_list(by) + agg_names) + else: + if agg_names is not None: + logger.warn("New columns names are not supported when 'by' is not passed.") + dfd.columns = (["aggregation"] + list(dfd.columns[1:])) + dfd.columns = [str(c) for c in dfd.columns] + return self.root.new(dfd) def move(self, column, position, ref_col=None) -> 'DataFrameType': @@ -2949,7 +2965,9 @@ def fill_na(self, cols="*", value=None, output_cols=None, kw_columns[output_col] = kw_columns[output_col].mask( kw_columns[output_col] == "", value) - return df.cols.assign(kw_columns) + df = df.cols.assign(kw_columns) + df.meta = Meta.action(df.meta, Actions.FILL_NA.value, list(kw_columns.keys())) + return df def count(self) -> int: """ From 0d8a63a640e695db70d6a10219861c7b226efe9a Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 18 Oct 2021 18:22:14 -0400 Subject: [PATCH 16/31] Fix fill_na eval value bug --- optimus/engines/base/columns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimus/engines/base/columns.py b/optimus/engines/base/columns.py index 7d8b0115c..d97ec812f 100644 --- a/optimus/engines/base/columns.py +++ b/optimus/engines/base/columns.py @@ -2955,7 +2955,7 @@ def fill_na(self, cols="*", value=None, output_cols=None, for input_col, output_col, value, eval_value in zip(cols, output_cols, values, eval_values): - if eval_value and is_str(value): + if eval_value and is_str(value) and value: value = eval(value) if isinstance(value, self.root.__class__): From 1768f4f4c70ceb7ef38c0d159847c26355a3edec Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 18 Oct 2021 18:22:24 -0400 Subject: [PATCH 17/31] Fix load file name bug --- optimus/engines/base/io/load.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimus/engines/base/io/load.py b/optimus/engines/base/io/load.py index 9a067ebdd..7ee6a571d 100644 --- a/optimus/engines/base/io/load.py +++ b/optimus/engines/base/io/load.py @@ -80,7 +80,7 @@ def csv(self, filepath_or_buffer, sep=",", header=True, infer_schema=True, encod unquoted_path = glob.glob(unquote_path(filepath_or_buffer)) if unquoted_path and len(unquoted_path): - meta = {"file_name": unquoted_path, "name": ntpath.basename(unquoted_path[0])} + meta = {"file_name": unquoted_path[0], "name": ntpath.basename(unquoted_path[0])} else: meta = {"file_name": filepath_or_buffer, "name": ntpath.basename(filepath_or_buffer)} From e18711cdc7b49bb38d16b9bf8361ad072d48353f Mon Sep 17 00:00:00 2001 From: Luis Date: Wed, 20 Oct 2021 15:57:35 -0400 Subject: [PATCH 18/31] Adjust sort --- optimus/engines/base/rows.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/optimus/engines/base/rows.py b/optimus/engines/base/rows.py index 7ffb76aa2..93470a163 100644 --- a/optimus/engines/base/rows.py +++ b/optimus/engines/base/rows.py @@ -189,10 +189,11 @@ def _set_order(o): dfd = df.functions.sort_df(df.data, sort_cols, order) meta = Meta.action(self.root.meta, Actions.SORT_ROW.value, cols) + if cast: + dfd = dfd.drop(sort_cols, axis=1) + df = self.root.new(dfd, meta=meta) - if cast: - df = df.cols.drop(sort_cols) return df From 75f6fd7f895347ccfe2f81907ac9e3e20463bc52 Mon Sep 17 00:00:00 2001 From: Luis Date: Fri, 22 Oct 2021 14:22:34 -0400 Subject: [PATCH 19/31] Fix match_positions function bugs --- optimus/engines/base/commons/functions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/optimus/engines/base/commons/functions.py b/optimus/engines/base/commons/functions.py index 5b3039a65..a467479d2 100644 --- a/optimus/engines/base/commons/functions.py +++ b/optimus/engines/base/commons/functions.py @@ -75,10 +75,10 @@ def find(df, columns, sub, ignore_case=False): def get_match_positions(_value, _separator): result = None if is_str(_value): - # Using re.IGNORECASE in finditer not seems to work - if ignore_case is True: - _separator = _separator + [s.lower() for s in _separator] - regex = re.compile('|'.join(_separator)) + regex_str = '|'.join([re.escape(s) for s in _separator]) + if ignore_case: + regex_str = f"(?i)({regex_str})" + regex = re.compile(regex_str) length = [[match.start(), match.end()] for match in regex.finditer(_value)] From 4478595430e966e4c0e46055612a8d1100717bf7 Mon Sep 17 00:00:00 2001 From: Luis Date: Mon, 25 Oct 2021 15:26:48 -0400 Subject: [PATCH 20/31] Fix empty dataset profiling bug --- optimus/engines/base/basedataframe.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index 2fface011..910e5deeb 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -971,13 +971,21 @@ def merge(_columns, _hist, _freq, _mismatch, _data_types, _count_uniques): data_set_info.update({'size': df.size(format="human")}) assign(profiler_data, "summary", data_set_info, dict) + data_types_list = list(set(df.cols.data_type("*", tidy=False)["data_type"].values())) + assign(profiler_data, "summary.data_types_list", data_types_list, dict) assign(profiler_data, "summary.total_count_data_types", len(set([i for i in data_types.values()])), dict) assign(profiler_data, "summary.missing_count", total_count_na, dict) - assign(profiler_data, "summary.p_missing", round( - total_count_na / df.rows.count() * 100, 2)) + + rows_count = df.rows.count() + + if rows_count: + assign(profiler_data, "summary.p_missing", round( + total_count_na / rows_count * 100, 2)) + else: + assign(profiler_data, "summary.p_missing", None) # _t = time.process_time() From b828e6c70ddae9f010e18268601ed87a9d102f5e Mon Sep 17 00:00:00 2001 From: Luis Date: Thu, 28 Oct 2021 13:51:00 -0400 Subject: [PATCH 21/31] Default index_to_string and string_to_index behavior to replace --- optimus/engines/base/commons/functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/optimus/engines/base/commons/functions.py b/optimus/engines/base/commons/functions.py index a467479d2..1a6d45716 100644 --- a/optimus/engines/base/commons/functions.py +++ b/optimus/engines/base/commons/functions.py @@ -36,7 +36,7 @@ def _string_to_index(value): return df.cols.apply(cols, _string_to_index, output_cols=output_cols, meta_action=Actions.STRING_TO_INDEX.value, - mode="vectorized", default=STRING_TO_INDEX) + mode="vectorized") def index_to_string(df, cols, output_cols=None, le=None, **kwargs): @@ -56,7 +56,7 @@ def _index_to_string(value): return df.cols.apply(cols, _index_to_string, output_cols=output_cols, meta_action=Actions.INDEX_TO_STRING.value, - mode="vectorized", default=INDEX_TO_STRING) + mode="vectorized") def find(df, columns, sub, ignore_case=False): From f9595151766343fce3ed4905a57f0bf13e1de3a9 Mon Sep 17 00:00:00 2001 From: Luis Date: Thu, 28 Oct 2021 13:54:55 -0400 Subject: [PATCH 22/31] Implement copy dataframe function --- optimus/engines/base/basedataframe.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index 910e5deeb..92777e5a4 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -63,6 +63,7 @@ def _compatible_data(data): def __del__(self): del self.data + del self.le @property def root(self) -> 'DataFrameType': @@ -105,6 +106,13 @@ def new(self, dfd, meta=None) -> 'DataFrameType': df.le = self.le return df + def copy(self) -> 'DataFrameType': + """ + Return a copy of a dataframe + """ + df = self.root + return self.root.new(df.data.copy(), meta=df.meta.copy()) + @staticmethod def __operator__(df, data_type=None, multiple_columns=False) -> 'DataFrameType': if isinstance(df, (BaseDataFrame,)): From 1fa40d3b49d74c2df2b4968725700212ae506a5c Mon Sep 17 00:00:00 2001 From: Luis Date: Thu, 28 Oct 2021 17:56:23 -0400 Subject: [PATCH 23/31] Fix copy Label Encoder instance --- optimus/engines/base/basedataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/optimus/engines/base/basedataframe.py b/optimus/engines/base/basedataframe.py index 92777e5a4..0dbcb6d4a 100644 --- a/optimus/engines/base/basedataframe.py +++ b/optimus/engines/base/basedataframe.py @@ -103,7 +103,8 @@ def new(self, dfd, meta=None) -> 'DataFrameType': df = self.__class__(dfd, op=self.op) if meta is not None: df.meta = meta - df.le = self.le + import copy + df.le = copy.deepcopy(self.le) return df def copy(self) -> 'DataFrameType': From 1be2932e2df5ea8b8f1a7f2b6a028e23c10bb6d8 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 30 Oct 2021 14:49:57 -0500 Subject: [PATCH 24/31] Fix docs url --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2c3b2051b..2f41978ad 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ Some amazing things Optimus can do for you: * Out of box functions to explore and fix data quality. * Use the same code to process your data in your laptop or in a remote cluster of GPUs. +[See Documentation](https://docs.hi-optimus.com/en/latest/) + ## Try Optimus To launch a live notebook server to test optimus using binder or Colab, click on one of the following badges: @@ -176,7 +178,7 @@ If you have issues, see our [Troubleshooting Guide](https://github.com/hi-primus Contributions go far beyond pull requests and commits. We are very happy to receive any kind of contributions including: -* [Documentation](https://github.com/hi-primus/optimus/tree/develop-21.9/docs/source) updates, enhancements, designs, or bugfixes. +* [Documentation](https://docs.hi-optimus.com/en/latest/) updates, enhancements, designs, or bugfixes. * Spelling or grammar fixes. * README.md corrections or redesigns. * Adding unit, or functional [tests](https://github.com/hi-primus/optimus/tree/develop-21.9/tests) From bb71a9d24dbf57796d66915a98a4cfb919cddfdb Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Sat, 30 Oct 2021 14:50:19 -0500 Subject: [PATCH 25/31] Fix formatting --- troubleshooting.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/troubleshooting.md b/troubleshooting.md index d332bf739..4c49a0f1c 100644 --- a/troubleshooting.md +++ b/troubleshooting.md @@ -28,16 +28,16 @@ conda install mysqlclient pip install pyodbc ``` -2. Then run the following commands on the terminal : +2. Then run the following commands on the terminal: ``` brew install unixodbc ``` -(in my case, it appeared an error that got solved by running: +(in my case, it appeared an error that got solved by running:) ``` sudo chown -R $(whoami) $(brew --prefix)/* brew install unixodbc ``` -) + ``` brew tap microsoft/mssql-preview https://github.com/Microsoft/homebrew-mssql-preview @@ -45,11 +45,10 @@ brew update brew install mssql-tools ``` -(again, in my case, it appeared another error that got solved by running: +(again, in my case, it appeared another error that got solved by running:) ``` brew untap microsoft mssql-preview brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release brew update brew install mssql-tools ``` -) \ No newline at end of file From 7819fb5e66b2a77b9313e1851075e2c6728115ff Mon Sep 17 00:00:00 2001 From: pyup-bot Date: Mon, 1 Nov 2021 10:10:16 -0400 Subject: [PATCH 26/31] Update dask from 2021.9.0 to 2021.10.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 15c47edc3..26b501cb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ pandas==1.3.2 -dask[complete]==2021.9.0 +dask[complete]==2021.10.0 distributed==2021.9.0 numba>=0.53.1 jsonschema>=3.2.0 From e646e50091386d75ddfdceb48df70d97065e9972 Mon Sep 17 00:00:00 2001 From: pyup-bot Date: Mon, 1 Nov 2021 10:10:16 -0400 Subject: [PATCH 27/31] Update dask from 2021.4.0 to 2021.10.0 --- requirements/cudf-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/cudf-requirements.txt b/requirements/cudf-requirements.txt index ea5d2a8ad..ad2b9c822 100644 --- a/requirements/cudf-requirements.txt +++ b/requirements/cudf-requirements.txt @@ -1,3 +1,3 @@ gputil -dask[complete]==2021.4.0 +dask[complete]==2021.10.0 distributed==2021.4.0 \ No newline at end of file From c4093bdb7953cf5c38703e52479516a700599137 Mon Sep 17 00:00:00 2001 From: pyup-bot Date: Mon, 1 Nov 2021 10:10:17 -0400 Subject: [PATCH 28/31] Update dask from 2021.9.0 to 2021.10.0 --- requirements/dask-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/dask-requirements.txt b/requirements/dask-requirements.txt index d6997200b..07fce942f 100644 --- a/requirements/dask-requirements.txt +++ b/requirements/dask-requirements.txt @@ -1,4 +1,4 @@ -dask[complete]==2021.9.0 +dask[complete]==2021.10.0 distributed==2021.9.0 dask-ml>=1.9.0 pyarrow==1.0.1 From 02e4c4f6199c49d1ff98db2bbe396a7f09252ca2 Mon Sep 17 00:00:00 2001 From: pyup-bot Date: Mon, 1 Nov 2021 10:10:17 -0400 Subject: [PATCH 29/31] Update dask from 2021.9.0 to 2021.10.0 --- requirements/google-colab-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/google-colab-requirements.txt b/requirements/google-colab-requirements.txt index bf644f4da..1322dd961 100644 --- a/requirements/google-colab-requirements.txt +++ b/requirements/google-colab-requirements.txt @@ -1,4 +1,4 @@ -dask==2021.9.0 +dask==2021.10.0 distributed==2021.9.0 numba>=0.53.1 jsonschema>=3.2.0 From 0996a1c280ee83445ea4a9955c51602d0234f565 Mon Sep 17 00:00:00 2001 From: pyup-bot Date: Mon, 1 Nov 2021 10:10:18 -0400 Subject: [PATCH 30/31] Update tensorflow from 2.5.1 to 2.6.0 --- requirements/ai-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/ai-requirements.txt b/requirements/ai-requirements.txt index 24673285e..9282a8efb 100644 --- a/requirements/ai-requirements.txt +++ b/requirements/ai-requirements.txt @@ -1,3 +1,3 @@ -tensorflow==2.5.1 +tensorflow==2.6.0 keras>=2.6.0 nltk>=3.4.5 \ No newline at end of file From 41b417397de0e7015d0a92d9311ffe342836276f Mon Sep 17 00:00:00 2001 From: pyup-bot Date: Mon, 1 Nov 2021 10:10:18 -0400 Subject: [PATCH 31/31] Update tensorflow from 2.5.1 to 2.6.0 --- requirements/databricks-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/databricks-requirements.txt b/requirements/databricks-requirements.txt index b7b4d685a..84dd6727b 100644 --- a/requirements/databricks-requirements.txt +++ b/requirements/databricks-requirements.txt @@ -14,7 +14,7 @@ deprecated==1.2.5 setuptools==41.6.0 pyarrow==0.15.0 Jinja2==2.11.3 -tensorflow==2.5.1 +tensorflow==2.6.0 keras==2.6.0 pygments>=2.1.3 flask==1.0.2