-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreprocessing.py
More file actions
96 lines (82 loc) · 3.48 KB
/
preprocessing.py
File metadata and controls
96 lines (82 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""Data preprocessing utilities."""
from __future__ import annotations
import logging
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
logger = logging.getLogger(__name__)
def build_preprocessor(
df: pd.DataFrame,
target_column: str,
impute_strategy: str = "median",
handle_categoricals: bool = True,
scale_numeric: bool = True,
max_categorical_features: int = 100,
) -> tuple[ColumnTransformer, list[str] | None]:
"""Build preprocessing pipeline with imputation, encoding, and scaling."""
X = df.drop(columns=[target_column]) if target_column in df.columns else df.copy()
categorical_cols = (
X.select_dtypes(include=["object", "category", "string"]).columns.tolist()
if handle_categoricals
else []
)
numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()
transformers = []
if numeric_cols:
num_steps = [("imputer", SimpleImputer(strategy=impute_strategy))]
if scale_numeric:
num_steps.append(("scaler", StandardScaler()))
transformers.append(("num", Pipeline(steps=num_steps), numeric_cols))
if handle_categoricals and categorical_cols:
# Filter out columns that are all NaN or have no valid values
valid_cat_cols = [
col for col in categorical_cols
if not X[col].isna().all() and X[col].nunique(dropna=True) > 0
]
if valid_cat_cols:
cat_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
(
"encoder",
OneHotEncoder(
handle_unknown="ignore",
sparse_output=False,
max_categories=max_categorical_features,
),
),
]
)
transformers.append(("cat", cat_pipeline, valid_cat_cols))
logger.info(
f"Encoding {len(valid_cat_cols)} categorical columns "
f"(max {max_categorical_features} categories each)"
)
elif categorical_cols:
logger.warning(
f"Found {len(categorical_cols)} categorical columns, but all are empty or have no valid values. Skipping."
)
if not transformers:
raise ValueError(
f"No usable features. handle_categoricals={handle_categoricals}, "
f"dtypes={X.dtypes.astype(str).to_dict()}"
)
preprocessor = ColumnTransformer(transformers=transformers, remainder="drop")
preprocessor.fit(X)
feature_names_out: list[str] | None = None
try:
feature_names_out = numeric_cols.copy()
if handle_categoricals and "cat" in preprocessor.named_transformers_:
ohe = preprocessor.named_transformers_["cat"].named_steps["encoder"]
# Use the valid_cat_cols that were actually processed
cat_cols_used = [t[2] for t in transformers if t[0] == "cat"]
if cat_cols_used:
cats = ohe.get_feature_names_out(cat_cols_used[0]).tolist()
feature_names_out += cats
except Exception as e:
logger.warning(f"Feature name extraction failed: {e}")
feature_names_out = None
return preprocessor, feature_names_out