Skip to content
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
9c6ab40
testing branch
nm3224 Oct 20, 2025
87192cd
adjusting python code logics
nm3224 Oct 21, 2025
ccc094b
changing where log file gets saved to avoid over-writing
nm3224 Oct 21, 2025
f174f59
added "AUDIT" to grades
nm3224 Oct 21, 2025
68c8d8b
fixing warning message for prefixes
nm3224 Oct 21, 2025
3476294
changed prefixes
nm3224 Oct 21, 2025
091d5ea
change gt 100 to ge 200
nm3224 Oct 21, 2025
0a1a8f9
append, not overwrite
nm3224 Oct 21, 2025
dbfacef
changing where logs get saved
nm3224 Oct 22, 2025
b509829
remove final print
nm3224 Oct 22, 2025
fb97e8e
creating training folder
nm3224 Oct 22, 2025
6e8aaf4
editing for inference too
nm3224 Oct 22, 2025
fa7a48b
change return type for gateway courses
nm3224 Oct 22, 2025
f6365b2
changing inference for targets
nm3224 Oct 22, 2025
a0ab211
fixing returns and job type
nm3224 Oct 22, 2025
ef26cb5
fixing style
nm3224 Oct 22, 2025
2637553
fixing linter
nm3224 Oct 22, 2025
da9b05a
removed list
nm3224 Oct 22, 2025
ff85bf7
trying to save logs for each step
nm3224 Oct 22, 2025
88b7ecc
ruff formatting for linting
nm3224 Oct 22, 2025
4d437c8
removing log file save attempt from student selection & target
nm3224 Oct 22, 2025
6900615
fixing log file saving
nm3224 Oct 22, 2025
4b32cb4
changing back job type default to training- no inference
nm3224 Oct 22, 2025
80d54ef
pdp_training_log not training_log
nm3224 Oct 22, 2025
ab29b6c
changing where things save
nm3224 Oct 22, 2025
5b6a7b7
add function
nm3224 Oct 22, 2025
aef6172
ruff formatting for linter
nm3224 Oct 22, 2025
b06bf56
fix func error
nm3224 Oct 22, 2025
6976a71
changing git tag back to version from branch
nm3224 Oct 23, 2025
f18902c
adding additional prints
nm3224 Oct 23, 2025
0dead7b
changing order of prints
nm3224 Oct 23, 2025
4466b8e
styling
nm3224 Oct 23, 2025
3fb4922
changing order of prints
nm3224 Oct 23, 2025
6d61035
trying to add breakdowns
nm3224 Oct 23, 2025
0685180
fix style
nm3224 Oct 23, 2025
5a7a5b5
adding cohort term, removing elif
nm3224 Oct 23, 2025
043ee86
fix style
nm3224 Oct 23, 2025
b7b908a
removing inference from options
nm3224 Oct 23, 2025
10a7760
fix ruff format
nm3224 Oct 23, 2025
9050548
adding back elif
nm3224 Oct 23, 2025
1a35d9d
changing gateway course func
nm3224 Oct 23, 2025
b761b6a
fix ruff formatting
nm3224 Oct 24, 2025
7c6ac9e
adding for lower levels too
nm3224 Oct 24, 2025
2bf8bf4
ruff formatting
nm3224 Oct 24, 2025
e48ef72
add lower mask
nm3224 Oct 24, 2025
3797a14
fixing message
nm3224 Oct 24, 2025
7ad5bb7
changing order
nm3224 Oct 24, 2025
3fb2550
ruff format
nm3224 Oct 24, 2025
f83f5c5
change order
nm3224 Oct 24, 2025
25ff85c
notes if no pre cohort courses were found
nm3224 Oct 24, 2025
54bb786
changing auto populate options
nm3224 Oct 24, 2025
02d984e
ruff format
nm3224 Oct 24, 2025
ebffaef
removing tolist()
nm3224 Oct 24, 2025
0034df2
trying to clean up the code
nm3224 Oct 24, 2025
108b90c
fixing print
nm3224 Oct 24, 2025
ce962c0
adjusting
nm3224 Oct 24, 2025
e313fce
fixing cip code issue
nm3224 Oct 24, 2025
98a68c3
fix ruff format
nm3224 Oct 24, 2025
c895cd1
fixing type checks
nm3224 Oct 24, 2025
1b7ef50
removing commented out section
nm3224 Oct 27, 2025
dbbfbf4
adding subset breakdown for inference as well
nm3224 Oct 27, 2025
67e9eb6
ruff formatting
nm3224 Oct 27, 2025
b0b9f26
saving out inference as well
nm3224 Oct 27, 2025
f343cd2
remove "audit"
nm3224 Oct 27, 2025
1c7eb30
change ruff format
nm3224 Oct 27, 2025
0121217
add missing grades to feature table
nm3224 Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 127 additions & 81 deletions src/edvise/data_audit/eda.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import numpy as np
import pandas as pd
import scipy.stats as ss
from typing import List
from edvise import utils as edvise_utils

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -340,109 +339,156 @@ def log_high_null_columns(df: pd.DataFrame, threshold: float = 0.2) -> None:
)


def compute_gateway_course_ids_and_cips(df_course: pd.DataFrame) -> List[str]:
def compute_gateway_course_ids_and_cips(
df_course: pd.DataFrame,
) -> tuple[list[str], list[str], bool, list[str], list[str]]:
"""
Build a list of course IDs and CIP codes for Math/English gateway courses.
Filter: math_or_english_gateway in {"M", "E"}
ID format: "<course_prefix><course_number>" (both coerced to strings, trimmed)
CIP codes taken from 'course_cip' column

Logs:
- If CIP column is missing or has no values or gateway field unpopulated
- Log prefixes for English (E) and Math (M) courses, with a note that they
may need to be swapped if they don’t look right
Returns: (ids, cips, has_upper_level_gateway, lower_ids, lower_cips)
- ids: all gateway course IDs (M/E)
- cips: CIP 2-digit codes from LOWER-LEVEL rows only (same as lower_cips)
- has_upper_level_gateway: True if any gateway course has level >=200
- lower_ids: gateway IDs with level <200
- lower_cips: CIP 2-digit codes for lower_ids
"""
if not {"math_or_english_gateway", "course_prefix", "course_number"}.issubset(
df_course.columns
):
LOGGER.warning(" ⚠️ Cannot compute key_course_ids: required columns missing.")
return []

mask = df_course["math_or_english_gateway"].astype("string").isin({"M", "E"})
if not mask.any():
LOGGER.info(" No Math/English gateway courses found.")
return []
# ---- helpers ----
def _s(x: pd.Series) -> pd.Series:
"""Normalize to string, strip, and remove literal 'nan' (categorical-safe)."""
s = x.astype("string") # cast before fillna to avoid Categorical fill errors
s = s.fillna("")
s = s.str.strip().replace("^nan$", "", regex=True)
return s

def _cip_series(x: pd.Series) -> list[str]:
"""
Accept canonical CIP codes like '24', '24.02', '24.0201' and return unique 2-digit series (e.g., '24').
Ignores placeholders and malformed values.
"""
s = x.astype("string").str.strip().replace("^nan$", pd.NA, regex=True)
# Strictly match valid CIP shapes and capture the 2-digit series as group 1
series = (
s.str.extract(r"^\s*(\d{2})(?:\.(\d{2})(?:\d{2})?)?\s*$", expand=True)[0]
.dropna()
.astype("string")
.loc[lambda z: z.ne("")]
.drop_duplicates()
.tolist()
)
return list(series)

ids = df_course.loc[mask, "course_prefix"].fillna("") + df_course.loc[
mask, "course_number"
].fillna("")
def _last_level(num: pd.Series) -> pd.Series:
"""Parse last numeric token, then last up-to-3 digits as integer level."""
tok = _s(num).str.extract(r"(\d+)(?!.*\d)", expand=True)[0]
return pd.to_numeric(tok.str[-3:], errors="coerce")

if "course_cip" not in df_course.columns:
LOGGER.warning(" ⚠️ Column 'course_cip' is missing; no CIP codes extracted.")
cips = pd.Series([], dtype=str)
else:
cips = (
df_course.loc[mask, "course_cip"]
.astype(str)
.str.strip()
.replace(
{
"nan": "",
"NaN": "",
"NAN": "",
"missing": "",
"MISSING": "",
"Missing": "",
}
)
.str.extract(
r"^(\d{2})"
) # Extract first two digits only; cip codes usually 23.0101
.dropna()[0]
def _starts_with_any(arr: list[str], prefixes: list[str]) -> bool:
arr = list(arr) # handles numpy arrays / pandas .unique()
return len(arr) > 0 and all(
any(str(p).upper().startswith(ch) for ch in prefixes) for p in arr
)
if cips.eq("").all():
LOGGER.warning(
" ⚠️ Column 'course_cip' is present but unpopulated for gateway courses."
)

# edit this to auto populate the config
cips = cips[cips.ne("")].drop_duplicates()
ids = ids[ids.str.strip().ne("") & ids.str.lower().ne("nan")].drop_duplicates()
# ---- column checks ----
required = {"math_or_english_gateway", "course_prefix", "course_number"}
if not required.issubset(df_course.columns):
LOGGER.warning(" ⚠️ Cannot compute key_course_ids: required columns missing.")
return ([], [], False, [], [])

LOGGER.info(f" Identified {len(ids)} unique gateway course IDs: {ids.tolist()}")
LOGGER.info(f" Identified {len(cips)} unique CIP codes: {cips.tolist()}")
# ---- full-length masks ----
gate = _s(df_course["math_or_english_gateway"])
is_gateway = gate.isin({"M", "E"}) # full-length
if not is_gateway.any():
LOGGER.info(" No Math/English gateway courses found.")
return ([], [], False, [], [])

level = _last_level(df_course["course_number"]) # full-length
upper_mask = is_gateway & level.ge(200).fillna(False)
lower_mask = is_gateway & level.lt(200).fillna(False)
has_upper_level_gateway = bool(upper_mask.any())

# ---- IDs ----
ids_series = (
_s(df_course.loc[is_gateway, "course_prefix"])
+ _s(df_course.loc[is_gateway, "course_number"])
).str.strip()
ids = ids_series[ids_series.ne("")].drop_duplicates().tolist()
LOGGER.info(" Identified %d unique gateway course IDs: %s", len(ids), ids)

lower_ids_series = (
_s(df_course.loc[lower_mask, "course_prefix"])
+ _s(df_course.loc[lower_mask, "course_number"])
).str.strip()
lower_ids = lower_ids_series[lower_ids_series.ne("")].drop_duplicates().tolist()
LOGGER.info(" Identified %d lower-level (<200) gateway IDs.", len(lower_ids))

# ---- CIP extraction from LOWER rows only ----
if "course_cip" in df_course.columns:
lower_cips = _cip_series(df_course.loc[lower_mask, "course_cip"])
cips = lower_cips.copy()
if not lower_cips:
LOGGER.warning(
" ⚠️ 'course_cip' present but yielded no lower-level CIP codes."
)
else:
LOGGER.info(" CIPs restricted to lower-level (<200) rows: %s", cips)
else:
cips, lower_cips = [], []
LOGGER.info(" No 'course_cip' column; skipping CIP extraction.")

# ---- log upper-level anomalies (if any) ----
if has_upper_level_gateway:
upper_ids_series = (
_s(df_course.loc[upper_mask, "course_prefix"])
+ _s(df_course.loc[upper_mask, "course_number"])
).str.strip()
upper_ids = upper_ids_series[upper_ids_series.ne("")].drop_duplicates().tolist()
LOGGER.warning(
" ⚠️ Warning: courses with level >=200 flagged as gateway (%d found). Course IDs: %s. "
"This is unusual; contact the school for more information.",
len(upper_ids),
upper_ids,
)
LOGGER.info(
" ✅ Lower-level IDs found: %d; lower-level CIP codes found: %d",
len(lower_ids),
len(lower_cips),
)
else:
LOGGER.info(" No gateway courses with level >=200 were detected.")

# Sanity-check for prefixes and swap if clearly reversed; has come up for some schools
# ---- prefix sanity check (compact) ----
pref_e = (
df_course.loc[df_course["math_or_english_gateway"].eq("E"), "course_prefix"]
_s(df_course.loc[gate.eq("E"), "course_prefix"])
.replace("", pd.NA)
.dropna()
.astype(str)
.str.strip()
.unique()
)
pref_m = (
df_course.loc[df_course["math_or_english_gateway"].eq("M"), "course_prefix"]
_s(df_course.loc[gate.eq("M"), "course_prefix"])
.replace("", pd.NA)
.dropna()
.astype(str)
.str.strip()
.unique()
)

LOGGER.info(" English (E) prefixes (raw): %s", pref_e.tolist())
LOGGER.info(" Math (M) prefixes (raw): %s", pref_m.tolist())

looks = lambda arr, ch: len(arr) > 0 and all(
str(p).upper().startswith(ch) for p in arr
e_ok, m_ok = (
_starts_with_any(pref_e, ["E", "W"]),
_starts_with_any(pref_m, ["M", "S"]),
)
e_ok, m_ok = looks(pref_e, "E"), looks(pref_m, "M")

if not e_ok and not m_ok:
if e_ok and m_ok:
LOGGER.info(" Prefix starts look correct (E/W for English, M/S for Math).")
elif not e_ok and not m_ok:
LOGGER.warning(
" ⚠️ Prefixes MAY be swapped (do NOT start with E for English, start with M for Math). Consider swapping E <-> M. E=%s, M=%s",
pref_e.tolist(),
pref_m.tolist(),
)
elif e_ok and m_ok:
LOGGER.info(
" Prefixes look correct and not swapped (start with E for English, start with M for Math)."
" ⚠️ Prefixes MAY be swapped. Consider swapping E <-> M. E=%s, M=%s",
list(pref_e),
list(pref_m),
)
else:
LOGGER.warning(" One group inconsistent. English OK=%s, Math OK=%s", e_ok, m_ok)

LOGGER.info(" Final English (E) prefixes: %s", pref_e.tolist())
LOGGER.info(" Final Math (M) prefixes: %s", pref_m.tolist())
LOGGER.warning(
" ⚠️ Prefixes MAY be incorrect; one group inconsistent. English OK=%s, Math OK=%s",
e_ok,
m_ok,
)

return [ids.tolist(), cips.tolist()]
return ids, cips, has_upper_level_gateway, lower_ids, lower_cips


def log_record_drops(
Expand Down
4 changes: 2 additions & 2 deletions src/edvise/feature_generation/course.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

LOGGER = logging.getLogger(__name__)

NON_NUMERIC_GRADES = {"A", "F", "I", "M", "O", "P", "W"}
NON_PASS_FAIL_GRADES = {"A", "I", "M", "O", "W"}
NON_NUMERIC_GRADES = {"AUDIT", "A", "F", "I", "M", "O", "P", "W"}
NON_PASS_FAIL_GRADES = {"AUDIT", "A", "I", "M", "O", "W"}
NON_COMPLETE_GRADES = {"I", "W"}


Expand Down
26 changes: 22 additions & 4 deletions src/edvise/scripts/inference_h2o.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from edvise.utils import emails
from edvise.utils.databricks import get_spark_session
from edvise.modeling.inference import top_n_features, features_box_whiskers_table
from edvise.shared.logger import resolve_run_path, local_fs_path

# Shared predictions pipeline (your extracted module)
from edvise.scripts.predictions_h2o import (
Expand Down Expand Up @@ -131,6 +132,12 @@ def write_delta(
logging.info("%s data written to: %s", table_name_suffix, table_path)

def run(self) -> None:
# Enforce inference mode
if getattr(self.args, "job_type", "inference") != "inference":
raise ValueError(
"ModelInferenceTask must be run with --job_type inference."
)

if self.cfg.modeling is None or self.cfg.modeling.training is None:
raise ValueError("Missing section of the config: modeling.training")
if self.cfg.preprocessing is None:
Expand All @@ -144,16 +151,24 @@ def run(self) -> None:

if self.cfg.model is None or self.cfg.model.run_id is None:
raise ValueError("cfg.model.run_id must be set for inference runs.")
current_run_path = f"{self.args.silver_volume_path}/{self.cfg.model.run_id}"
# Use canonical per-run folder: <silver>/<run_id>/inference/
current_run_path = resolve_run_path(
self.args, self.cfg, self.args.silver_volume_path
)
current_run_path_local = local_fs_path(current_run_path)

# 1) Load UC model metadata (run_id + experiment_id)
self.load_mlflow_model_metadata()
assert self.model_run_id and self.model_experiment_id

# 2) Read the processed dataset
df_processed = dataio.read.read_parquet(
f"{current_run_path}/preprocessed.parquet"
)
preproc_path = os.path.join(current_run_path, "preprocessed.parquet")
preproc_path_local = local_fs_path(preproc_path)
if not os.path.exists(preproc_path_local):
raise FileNotFoundError(
f"Missing preprocessed.parquet at: {preproc_path} (local: {preproc_path_local})"
)
df_processed = dataio.read.read_parquet(preproc_path_local)

# 3) Notify via email
self._send_kickoff_email()
Expand Down Expand Up @@ -311,6 +326,9 @@ def parse_arguments() -> argparse.Namespace:
parser.add_argument("--DK_CC_EMAIL", type=str, required=True)
parser.add_argument("--features_table_path", type=str, required=False)
parser.add_argument("--ds_run_as", type=str, required=False)
parser.add_argument(
"--job_type", type=str, choices=["inference"], default="inference"
)
return parser.parse_args()


Expand Down
Loading
Loading