Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled biweekly dependency update for week 44 #1212

Open
wants to merge 31 commits into
base: develop-21.9
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4951e74
Refactor 'decimal' type to 'float'
luis11011 Oct 6, 2021
ce9d5e1
Remove decimal
luis11011 Oct 8, 2021
4b6a508
Fix float type inferring bugs
luis11011 Oct 11, 2021
599be72
Fix quality on user set data types bug
luis11011 Oct 11, 2021
bbd030d
Fix dask.distributed bug
luis11011 Oct 11, 2021
9c605d3
Fix partial profiling bug
luis11011 Oct 13, 2021
2d6970c
Fix sort bug on Pandas
luis11011 Oct 13, 2021
fe1a4b4
Include exclusive joins
luis11011 Oct 14, 2021
6c283fe
Fix local load bug on Pandas
luis11011 Oct 14, 2021
ce9f491
Adjust mode and percentile functions output formats
luis11011 Oct 15, 2021
b0ea696
Adjust parser function descriptions
luis11011 Oct 15, 2021
59a70b1
Allow dataframe values on fill_na
luis11011 Oct 15, 2021
485f9ea
Fix match mask parameter bug
luis11011 Oct 15, 2021
8f396e8
Fix tranformations bug on partial profiling
luis11011 Oct 18, 2021
e539130
Fix aggregation and group by operations
luis11011 Oct 18, 2021
0d8a63a
Fix fill_na eval value bug
luis11011 Oct 18, 2021
1768f4f
Fix load file name bug
luis11011 Oct 18, 2021
e18711c
Adjust sort
luis11011 Oct 20, 2021
75f6fd7
Fix match_positions function bugs
luis11011 Oct 22, 2021
4478595
Fix empty dataset profiling bug
luis11011 Oct 25, 2021
b828e6c
Default index_to_string and string_to_index behavior to replace
luis11011 Oct 28, 2021
f959515
Implement copy dataframe function
luis11011 Oct 28, 2021
1fa40d3
Fix copy Label Encoder instance
luis11011 Oct 28, 2021
1be2932
Fix docs url
argenisleon Oct 30, 2021
bb71a9d
Fix formatting
argenisleon Oct 30, 2021
7819fb5
Update dask from 2021.9.0 to 2021.10.0
pyup-bot Nov 1, 2021
e646e50
Update dask from 2021.4.0 to 2021.10.0
pyup-bot Nov 1, 2021
c4093bd
Update dask from 2021.9.0 to 2021.10.0
pyup-bot Nov 1, 2021
02e4c4f
Update dask from 2021.9.0 to 2021.10.0
pyup-bot Nov 1, 2021
0996a1c
Update tensorflow from 2.5.1 to 2.6.0
pyup-bot Nov 1, 2021
41b4173
Update tensorflow from 2.5.1 to 2.6.0
pyup-bot Nov 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Some amazing things Optimus can do for you:
* Out of box functions to explore and fix data quality.
* Use the same code to process your data in your laptop or in a remote cluster of GPUs.

[See Documentation](https://docs.hi-optimus.com/en/latest/)

## Try Optimus
To launch a live notebook server to test optimus using binder or Colab, click on one of the following badges:

Expand Down Expand Up @@ -176,7 +178,7 @@ If you have issues, see our [Troubleshooting Guide](https://github.com/hi-primus
Contributions go far beyond pull requests and commits. We are very happy to receive any kind of contributions
including:

* [Documentation](https://github.com/hi-primus/optimus/tree/develop-21.9/docs/source) updates, enhancements, designs, or bugfixes.
* [Documentation](https://docs.hi-optimus.com/en/latest/) updates, enhancements, designs, or bugfixes.
* Spelling or grammar fixes.
* README.md corrections or redesigns.
* Adding unit, or functional [tests](https://github.com/hi-primus/optimus/tree/develop-21.9/tests)
Expand Down
82 changes: 64 additions & 18 deletions optimus/engines/base/basedataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def _compatible_data(data):

def __del__(self):
del self.data
del self.le

@property
def root(self) -> 'DataFrameType':
Expand Down Expand Up @@ -102,9 +103,17 @@ def new(self, dfd, meta=None) -> 'DataFrameType':
df = self.__class__(dfd, op=self.op)
if meta is not None:
df.meta = meta
df.le = self.le
import copy
df.le = copy.deepcopy(self.le)
return df

def copy(self) -> 'DataFrameType':
"""
Return a copy of a dataframe
"""
df = self.root
return self.root.new(df.data.copy(), meta=df.meta.copy())

@staticmethod
def __operator__(df, data_type=None, multiple_columns=False) -> 'DataFrameType':
if isinstance(df, (BaseDataFrame,)):
Expand Down Expand Up @@ -850,13 +859,12 @@ def calculate_profile(self, cols="*", bins: int = MAX_BUCKETS, flush: bool = Fal
cols_data_types = {}
cols_to_infer = [*cols_to_profile]

if not flush:
for col_name in cols_to_profile:
_props = Meta.get(df.meta, f"columns_data_types.{col_name}")
for col_name in cols_to_profile:
col_data_type = Meta.get(df.meta, f"columns_data_types.{col_name}")

if _props is not None:
cols_data_types[col_name] = _props
cols_to_infer.remove(col_name)
if col_data_type is not None:
cols_data_types[col_name] = col_data_type
cols_to_infer.remove(col_name)

if cols_to_infer:
cols_data_types = {**cols_data_types, **df.cols.infer_type(cols_to_infer, tidy=False)["infer_type"]}
Expand Down Expand Up @@ -972,19 +980,27 @@ def merge(_columns, _hist, _freq, _mismatch, _data_types, _count_uniques):
data_set_info.update({'size': df.size(format="human")})

assign(profiler_data, "summary", data_set_info, dict)

data_types_list = list(set(df.cols.data_type("*", tidy=False)["data_type"].values()))

assign(profiler_data, "summary.data_types_list", data_types_list, dict)
assign(profiler_data, "summary.total_count_data_types",
len(set([i for i in data_types.values()])), dict)
assign(profiler_data, "summary.missing_count", total_count_na, dict)
assign(profiler_data, "summary.p_missing", round(
total_count_na / df.rows.count() * 100, 2))

rows_count = df.rows.count()

if rows_count:
assign(profiler_data, "summary.p_missing", round(
total_count_na / rows_count * 100, 2))
else:
assign(profiler_data, "summary.p_missing", None)

# _t = time.process_time()

all_columns_names = df.cols.names()

meta = Meta.set(meta, "transformations", value={})
# meta = Meta.set(meta, "transformations", value={})

# Order columns
actual_columns = profiler_data["columns"]
Expand All @@ -998,7 +1014,7 @@ def merge(_columns, _hist, _freq, _mismatch, _data_types, _count_uniques):
meta = df.meta

# Reset Actions
meta = Meta.reset_actions(meta)
meta = Meta.reset_actions(meta, parse_columns(df, cols))
df.meta = meta
profiler_time["end"] = {"elapsed_time": time.process_time() - _t}
# print(profiler_time)
Expand All @@ -1015,7 +1031,7 @@ def join(self, df_right: 'DataFrameType', how="left", on=None, left_on=None, rig
"""
Join 2 dataframes SQL style
:param df_right:
:param how{‘left’, ‘right’, ‘outer’, ‘inner’}, default ‘left’
:param how{‘left’, ‘right’, ‘outer’, ‘inner’, ‘exclusive’, ‘exclusive left’, ‘exclusive right’}, default ‘left’
:param on:
:param left_on:
:param right_on:
Expand Down Expand Up @@ -1052,8 +1068,29 @@ def join(self, df_right: 'DataFrameType', how="left", on=None, left_on=None, rig
left_names = df_left.cols.names()
right_names = df_right.cols.names()

df = self.root.new(df_left.data.merge(df_right.data, how=how, left_on=left_on, right_on=right_on,
suffixes=(suffix_left, suffix_right)))

if how in ['exclusive', 'exclusive left', 'exclusive right']:
_how = 'outer'
indicator = True
else:
_how = how
indicator = False

dfd = df_left.data.merge(df_right.data, how=_how, left_on=left_on,
right_on=right_on, suffixes=(suffix_left, suffix_right),
indicator=indicator)

if how == 'exclusive':
dfd = dfd[(dfd["_merge"] == "left_only") | (dfd["_merge"] == "right_only")]
elif how == 'exclusive left':
dfd = dfd[dfd["_merge"] == "left_only"]
elif how == 'exclusive right':
dfd = dfd[dfd["_merge"] == "right_only"]

if indicator:
dfd = dfd.drop(["_merge"], axis=1)

df = self.root.new(dfd)

# Reorder
last_column_name = left_names[-1]
Expand All @@ -1075,14 +1112,22 @@ def string_clustering(self, cols="*", algorithm="fingerrint", *args, **kwargs):
# return clusters

def agg(self, aggregations: dict, groupby=None, output="dict", tidy=True):
"""
:param aggregations: Dictionary or list of tuples with the form [("col", "agg")]
:param groupby: None, list of columns names or a single column name to group the aggregations.
:param output{‘dict’, ‘dataframe’}, default ‘dict’: Output type.
"""

df = self
dfd = df.data

if is_dict(aggregations):
aggregations = aggregations.items()

if groupby:
groupby = parse_columns(df, groupby)

for column, aggregations_set in aggregations.items():
for column, aggregations_set in aggregations:
aggregations[column] = val_to_list(aggregations_set)

dfd = dfd.groupby(groupby).agg(aggregations)
Expand All @@ -1093,19 +1138,20 @@ def agg(self, aggregations: dict, groupby=None, output="dict", tidy=True):
result = dfd.to_dict()

elif output == "dataframe":
dfd.columns = [str(c) for c in dfd.columns]
result = self.new(dfd.reset_index())

else:
result = {}

for column, aggregations_set in aggregations.items():
for column, aggregations_set in aggregations:
aggregations_set = val_to_list(aggregations_set)
for aggregation in aggregations_set:
result[column + "_" + aggregation] = getattr(
df.cols, aggregation)(column, tidy=True)

if output == "dataframe":
result = self.new(result)
result = self.op.create.dataframe({k: [v] for k, v in result.items()})

return convert_numpy(format_dict(result, tidy=tidy))

Expand Down Expand Up @@ -1166,7 +1212,7 @@ def report(self, df, cols="*", buckets=MAX_BUCKETS, infer=False, relative_error=
"hist_hours": hist_hour, "hist_minutes": hist_minute}

elif col["column_data_type"] == "int" or col["column_data_type"] == "string" or col[
"column_data_type"] == "decimal":
"column_data_type"] == "float":
hist = plot_hist({col_name: hist_dict}, output="base64")
hist_pic = {"hist_numeric_string": hist}
if "frequency" in col:
Expand Down
74 changes: 52 additions & 22 deletions optimus/engines/base/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ def select(self, cols="*", regex=None, data_type=None, invert=False, accepts_mis
cols = parse_columns(df, cols if regex is None else regex, is_regex=regex is not None,
filter_by_column_types=data_type, invert=invert,
accepts_missing_cols=accepts_missing_cols)
meta = Meta.select_columns(df.meta, cols)
meta = df.meta
meta = Meta.select_columns(meta, cols)
dfd = df.data
if cols is not None:
dfd = dfd[cols]
Expand Down Expand Up @@ -578,7 +579,7 @@ def parse_inferred_types(self, col_data_type):
columns[k] = result_default
return columns

def inferred_data_type(self, cols="*", use_internal=False, tidy=True):
def inferred_data_type(self, cols="*", use_internal=False, calculate=False, tidy=True):
"""
Get the inferred data types from the meta data.

Expand All @@ -595,10 +596,16 @@ def inferred_data_type(self, cols="*", use_internal=False, tidy=True):

for col_name in cols:
data_type = Meta.get(df.meta, f"columns_data_types.{col_name}.data_type")

if data_type is None:
data_type = Meta.get(df.meta, f"profile.columns.{col_name}.stats.inferred_data_type.data_type")

if data_type is None:
data_type = result.get(col_name, None)

if calculate and data_type is None:
data_type = df.cols.infer_type(col_name)

result.update({col_name: data_type})

result = {"inferred_data_type": result}
Expand Down Expand Up @@ -1091,15 +1098,21 @@ def pattern_counts(self, cols="*", n=10, mode=0, flush=False) -> dict:

return result

def groupby(self, by, agg) -> 'DataFrameType':
def groupby(self, by: Union[str, list] = None, agg: Union[list, dict] = None) -> 'DataFrameType':
"""
This helper function aims to help managing columns name in the aggregation output.
Also how to handle ordering columns because dask can order columns.

:param by: Column name.
:param agg: List of tuples with the form [("agg", "col")]
:param by: None, list of columns names or a single column name to group the aggregations.
:param agg: List of tuples with the form [("agg", "col")] or
[("agg", "col", "new_col_name")] or dictionary with the form
{"new_col_name": {"col": "agg"}}
:return:
"""

if agg is None:
raise TypeError(f"Can't aggregate with 'agg' value {agg}")

df = self.root
compact = {}

Expand All @@ -1122,10 +1135,20 @@ def groupby(self, by, agg) -> 'DataFrameType':

dfd = df.data

dfd = dfd.groupby(by=by).agg(compact).reset_index()
agg_names = agg_names or [a[0] + "_" + a[1] for a in agg]
dfd.columns = (val_to_list(by) + agg_names)
dfd.columns = [str(c) for c in dfd.columns]
if by and len(by):
dfd = dfd.groupby(by=by)

dfd = dfd.agg(compact).reset_index()

if by and len(by):
agg_names = agg_names or [a[1] if len(a) < 3 else a[2] for a in agg]
dfd.columns = (val_to_list(by) + agg_names)
else:
if agg_names is not None:
logger.warn("New columns names are not supported when 'by' is not passed.")
dfd.columns = (["aggregation"] + list(dfd.columns[1:]))
dfd.columns = [str(c) for c in dfd.columns]

return self.root.new(dfd)

def move(self, column, position, ref_col=None) -> 'DataFrameType':
Expand Down Expand Up @@ -2911,27 +2934,40 @@ def impute(self, cols="*", data_type="auto", strategy="auto", fill_value=None, o

return df

def fill_na(self, cols="*", value=None, output_cols=None) -> 'DataFrameType':
def fill_na(self, cols="*", value=None, output_cols=None,
eval_value: bool = False) -> 'DataFrameType':
"""
Replace null data with a specified value.

:param cols: '*', list of columns names or a single column name.
:param value: value to replace the nan/None values
:param output_cols: Column name or list of column names where the transformed data will be saved.
:param eval_value: Parse 'value' param in case a string is passed.
:return: Returns the column filled with given value.
"""
df = self.root

columns = prepare_columns(df, cols, output_cols)
cols = parse_columns(df, cols)
values, eval_values = prepare_columns_arguments(cols, value, eval_value)
output_cols = get_output_cols(cols, output_cols)

kw_columns = {}

for input_col, output_col in columns:
for input_col, output_col, value, eval_value in zip(cols, output_cols, values, eval_values):

if eval_value and is_str(value) and value:
value = eval(value)

if isinstance(value, self.root.__class__):
value = value.get_series()

kw_columns[output_col] = df.data[input_col].fillna(value)
kw_columns[output_col] = kw_columns[output_col].mask(
kw_columns[output_col] == "", value)

return df.cols.assign(kw_columns)
df = df.cols.assign(kw_columns)
df.meta = Meta.action(df.meta, Actions.FILL_NA.value, list(kw_columns.keys()))
return df

def count(self) -> int:
"""
Expand Down Expand Up @@ -3374,12 +3410,9 @@ def quality(self, cols="*", flush=False, compute=True) -> dict:
if is_dict(cols):
cols_types = cols
else:
cols_types = self.root.cols.infer_type(cols, tidy=False)["infer_type"]
cols_types = self.root.cols.inferred_data_type(cols, calculate=True, tidy=False)["inferred_data_type"]

result = {}
profiler_to_mask_func = {
"decimal": "float"
}

quality_props = ["match", "missing", "mismatch"]

Expand All @@ -3396,13 +3429,10 @@ def quality(self, cols="*", flush=False, compute=True) -> dict:
"mismatch": cached_props.get("mismatch")}
continue

# Match the profiler dtype with the function. The only function that need to be remapped are decimal and int
dtype = props["data_type"]
dtype = props if is_str(props) else props["data_type"]

dtype = df.constants.INTERNAL_TO_OPTIMUS.get(dtype, dtype)

dtype = profiler_to_mask_func.get(dtype, dtype)

matches_mismatches = getattr(df[col_name].mask, dtype)(
col_name).cols.frequency()

Expand Down Expand Up @@ -3478,7 +3508,7 @@ def infer_type(self, cols="*", sample=INFER_PROFILER_ROWS, tidy=True) -> dict:
dtype_i = 0

if len(dtypes) > 1:
if dtypes[0] == ProfilerDataTypes.INT.value and dtypes[1] == ProfilerDataTypes.DECIMAL.value:
if dtypes[0] == ProfilerDataTypes.INT.value and dtypes[1] == ProfilerDataTypes.FLOAT.value:
dtype_i = 1

if dtypes[0] == ProfilerDataTypes.ZIP_CODE.value and dtypes[1] == ProfilerDataTypes.INT.value:
Expand Down
Loading