diff --git a/8Knot/pages/contributors/visualizations/contrib_activity_cycle.py b/8Knot/pages/contributors/visualizations/contrib_activity_cycle.py index 7494b7e0..9c0d3cb1 100644 --- a/8Knot/pages/contributors/visualizations/contrib_activity_cycle.py +++ b/8Knot/pages/contributors/visualizations/contrib_activity_cycle.py @@ -4,10 +4,12 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl import logging from dateutil.relativedelta import * # type: ignore import plotly.express as px from pages.utils.graph_utils import baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from queries.commits_query import commits_query as cmq import cache_manager.cache_facade as cf from pages.utils.job_utils import nodata_graph @@ -156,36 +158,65 @@ def contrib_activity_cycle_graph(repolist, interval): def process_data(df: pd.DataFrame, interval): - # for this usecase we want the datetimes to be in their local values - # tricking pandas to keep local values when UTC conversion is required for to_datetime - df["author_timestamp"] = df["author_timestamp"].astype("str").str[:-6] - df["committer_timestamp"] = df["committer_timestamp"].astype("str").str[:-6] - - # convert to datetime objects rather than strings - df["author_timestamp"] = pd.to_datetime(df["author_timestamp"], utc=True) - df["committer_timestamp"] = pd.to_datetime(df["committer_timestamp"], utc=True) - # removes duplicate values when the author and committer is the same - df.loc[df["author_timestamp"] == df["committer_timestamp"], "author_timestamp"] = None + """ + Process contributor activity cycle data using Polars for performance. + + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === + + # Convert to Polars for fast processing + pl_df = to_polars(df) + + # Convert string timestamps to datetime, stripping timezone offset + pl_df = pl_df.with_columns( + [ + pl.col("author_timestamp").cast(pl.Utf8).str.slice(0, -6).str.to_datetime().alias("author_timestamp"), + pl.col("committer_timestamp").cast(pl.Utf8).str.slice(0, -6).str.to_datetime().alias("committer_timestamp"), + ] + ) - df_final = pd.DataFrame() + # Remove duplicate values when author and committer are the same + pl_df = pl_df.with_columns( + pl.when(pl.col("author_timestamp") == pl.col("committer_timestamp")) + .then(None) + .otherwise(pl.col("author_timestamp")) + .alias("author_timestamp") + ) if interval == "H": - # combine the hour values for author and committer - hour = pd.concat([df["author_timestamp"].dt.hour, df["committer_timestamp"].dt.hour]) - df_hour = pd.DataFrame(hour, columns=["Hour"]) - df_final = df_hour.groupby(["Hour"])["Hour"].count() + # Extract hour values and combine + author_hours = pl_df.select(pl.col("author_timestamp").dt.hour().alias("Hour")).drop_nulls() + committer_hours = pl_df.select(pl.col("committer_timestamp").dt.hour().alias("Hour")).drop_nulls() + combined = pl.concat([author_hours, committer_hours]) + pl_result = combined.group_by("Hour").agg(pl.len().alias("Hour")).sort("Hour") else: - # combine the weekday values for author and committer - weekday = pd.concat( - [ - df["author_timestamp"].dt.day_name(), - df["committer_timestamp"].dt.day_name(), - ] + # Extract weekday names and combine + # Polars uses 1-7 for weekdays, we need to map to names + weekday_map = { + 1: "Monday", + 2: "Tuesday", + 3: "Wednesday", + 4: "Thursday", + 5: "Friday", + 6: "Saturday", + 7: "Sunday", + } + author_weekdays = pl_df.select(pl.col("author_timestamp").dt.weekday().alias("day_num")).drop_nulls() + committer_weekdays = pl_df.select(pl.col("committer_timestamp").dt.weekday().alias("day_num")).drop_nulls() + combined = pl.concat([author_weekdays, committer_weekdays]) + + # Map day numbers to names + combined = combined.with_columns( + pl.col("day_num").replace_strict(weekday_map, default="Unknown").alias("Weekday") ) - df_weekday = pd.DataFrame(weekday, columns=["Weekday"]) - df_final = df_weekday.groupby(["Weekday"])["Weekday"].count() + pl_result = combined.group_by("Weekday").agg(pl.len().alias("Weekday")).sort("Weekday") + + # === POLARS PROCESSING END === - return df_final + # Convert to Pandas Series for compatibility with existing create_figure + result_df = to_pandas(pl_result) + return result_df.set_index(result_df.columns[0])[result_df.columns[1]] def create_figure(df: pd.DataFrame, interval): diff --git a/8Knot/pages/contributors/visualizations/contrib_drive_repeat.py b/8Knot/pages/contributors/visualizations/contrib_drive_repeat.py index 278a0c8d..1cf39f70 100644 --- a/8Knot/pages/contributors/visualizations/contrib_drive_repeat.py +++ b/8Knot/pages/contributors/visualizations/contrib_drive_repeat.py @@ -4,9 +4,11 @@ from dash import callback from dash.dependencies import Input, Output, State import pandas as pd +import polars as pl import logging import plotly.express as px from pages.utils.graph_utils import baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from pages.utils.job_utils import nodata_graph from queries.contributors_query import contributors_query as ctq import time @@ -210,24 +212,33 @@ def repeat_drive_by_graph(repolist, contribs, view, bot_switch): def process_data(df, view, contribs): - # convert to datetime objects with consistent column name - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) - # df.rename(columns={"created_at": "created"}, inplace=True) + """ + Process contributor drive/repeat data using Polars for performance. - # graph on contribution subset - contributors = df["cntrb_id"][df["rank"] == contribs].to_list() - df_cont_subset = pd.DataFrame(df) + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === - # filtering data by view + # Convert to Polars for fast processing + pl_df = to_polars(df) + + # Convert to datetime + pl_df = pl_df.with_columns(pl.col("created_at").cast(pl.Datetime("us", "UTC"))) + + # Get contributors with specified rank + contributors = pl_df.filter(pl.col("rank") == contribs).select("cntrb_id").unique().to_series().to_list() + contributors_set = set(contributors) + + # Filter based on view if view == "drive": - df_cont_subset = df_cont_subset.loc[~df_cont_subset["cntrb_id"].isin(contributors)] + pl_result = pl_df.filter(~pl.col("cntrb_id").is_in(contributors_set)) else: - df_cont_subset = df_cont_subset.loc[df_cont_subset["cntrb_id"].isin(contributors)] + pl_result = pl_df.filter(pl.col("cntrb_id").is_in(contributors_set)) - # reset index to be ready for plotly - df_cont_subset = df_cont_subset.reset_index() + # === POLARS PROCESSING END === - return df_cont_subset + # Convert to Pandas for visualization + return to_pandas(pl_result) def create_figure(df_cont_subset): diff --git a/8Knot/pages/contributors/visualizations/contrib_importance_pie.py b/8Knot/pages/contributors/visualizations/contrib_importance_pie.py index e983d805..291893c0 100644 --- a/8Knot/pages/contributors/visualizations/contrib_importance_pie.py +++ b/8Knot/pages/contributors/visualizations/contrib_importance_pie.py @@ -6,10 +6,12 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl import logging from dateutil.relativedelta import * # type: ignore import plotly.express as px from pages.utils.graph_utils import get_graph_time_values, baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from queries.contributors_query import contributors_query as ctq from pages.utils.job_utils import nodata_graph import time @@ -253,51 +255,47 @@ def create_top_k_cntrbs_graph(repolist, action_type, top_k, start_date, end_date def process_data(df: pd.DataFrame, action_type, top_k, start_date, end_date): - # convert to datetime objects rather than strings - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) + """ + Process contributor importance pie data using Polars for performance. - # order values chronologically by created_at date - df = df.sort_values(by="created_at", ascending=True) + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === - # filter values based on date picker - if start_date is not None: - df = df[df.created_at >= start_date] - if end_date is not None: - df = df[df.created_at <= end_date] - - # subset the df such that it only contains rows where the Action column value is the action type - df = df[df["Action"].str.contains(action_type)] + # Convert to Polars for fast processing + pl_df = to_polars(df) - # get the number of total contributions of the specific action type - t_sum = df.shape[0] + # Convert to datetime and sort + pl_df = pl_df.with_columns(pl.col("created_at").cast(pl.Datetime("us", "UTC"))) + pl_df = pl_df.sort("created_at") - # count the number of contributions for each contributor - df = (df.groupby("cntrb_id")["Action"].count()).to_frame() - - # sort rows according to amount of contributions from greatest to least - df = df.sort_values(by="Action", ascending=False) + # Filter by date range + if start_date is not None: + pl_df = pl_df.filter(pl.col("created_at") >= start_date) + if end_date is not None: + pl_df = pl_df.filter(pl.col("created_at") <= end_date) - df = df.reset_index() + # Filter by action type + pl_df = pl_df.filter(pl.col("Action").str.contains(action_type)) - # rename Action column to action_type - df = df.rename(columns={"Action": action_type}) + # Count contributions per contributor + pl_grouped = pl_df.group_by("cntrb_id").agg(pl.len().alias(action_type)).sort(action_type, descending=True) - # get the number of total contributions - t_sum = df[action_type].sum() + # Get total sum + t_sum = pl_grouped.select(pl.col(action_type).sum()).item() - # index df to get first k rows - df = df.head(top_k) + # Get top k + pl_top_k = pl_grouped.head(top_k) + df_sum = pl_top_k.select(pl.col(action_type).sum()).item() - # get the number of total top k contributions - df_sum = df[action_type].sum() + # Add "Other" row for remaining contributions + other_row = pl.DataFrame({"cntrb_id": ["Other"], action_type: [t_sum - df_sum]}) + pl_result = pl.concat([pl_top_k, other_row]) - # calculate the remaining contributions by taking the the difference of t_sum and df_sum - # dataframes no longer implement above 'append' interface as of Pandas 1.4.4 - # create a single-entry dataframe that we can concatenate onto existing df - df_concat = pd.DataFrame(data={"cntrb_id": ["Other"], action_type: [t_sum - df_sum]}) - df = pd.concat([df, df_concat], ignore_index=True) + # === POLARS PROCESSING END === - return df + # Convert to Pandas for visualization + return to_pandas(pl_result) def create_figure(df: pd.DataFrame, action_type): diff --git a/8Knot/pages/contributors/visualizations/contribs_by_action.py b/8Knot/pages/contributors/visualizations/contribs_by_action.py index b65aa482..6fb8a87c 100644 --- a/8Knot/pages/contributors/visualizations/contribs_by_action.py +++ b/8Knot/pages/contributors/visualizations/contribs_by_action.py @@ -4,10 +4,12 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl import logging from dateutil.relativedelta import * # type: ignore import plotly.express as px from pages.utils.graph_utils import get_graph_time_values, baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from queries.contributors_query import contributors_query as ctq from pages.utils.job_utils import nodata_graph import time @@ -221,32 +223,38 @@ def contribs_by_action_graph(repolist, interval, action, bot_switch): def process_data(df: pd.DataFrame, interval, action): - # convert to datetime objects rather than strings - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) + """ + Process contributors by action data using Polars for performance. - # order values chronologically by COLUMN_TO_SORT_BY date - df = df.sort_values(by="created_at", axis=0, ascending=True) + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === - # drop all contributions that are not the selected action - df = df[df["Action"].str.contains(action)] + # Convert to Polars for fast processing + pl_df = to_polars(df) - # For distinct contributors per interval: keep one row per (cntrb_id, interval) - """df["_period"] = df["created_at"].dt.to_period(interval) - df = df.drop_duplicates(subset=["cntrb_id", "_period"], keep="first") - # Use the start of the interval for plotting consistency - df["created_at"] = df["_period"].dt.start_time - df = df.drop(columns=["_period"]) # cleanup""" + # Convert to datetime and sort + pl_df = pl_df.with_columns(pl.col("created_at").cast(pl.Datetime("us", "UTC"))) + pl_df = pl_df.sort("created_at") - freq_map = {"M1": "M", "M3": "Q", "M6": "2Q", "M12": "Y"} - pandas_freq = freq_map.get(interval, interval) + # Filter for selected action using Polars string contains + pl_df = pl_df.filter(pl.col("Action").str.contains(action)) - df["_period"] = df["created_at"].dt.to_period(pandas_freq) - df = df.drop_duplicates(subset=["cntrb_id", "_period"], keep="first") - df["created_at"] = df["_period"].dt.start_time - df = df.drop(columns=["_period"]) - print(df) + # Map interval to Polars truncation format + interval_map = {"M1": "1mo", "M3": "3mo", "M6": "6mo", "M12": "1y"} + polars_interval = interval_map.get(interval, "1mo") - return df + # Add period column and dedupe per contributor per period + pl_df = pl_df.with_columns(pl.col("created_at").dt.truncate(polars_interval).alias("_period")) + pl_df = pl_df.unique(subset=["cntrb_id", "_period"], keep="first") + + # Update created_at to period start time + pl_df = pl_df.with_columns(pl.col("_period").alias("created_at")).drop("_period") + + # === POLARS PROCESSING END === + + # Convert to Pandas for visualization + return to_pandas(pl_df) def create_figure(df: pd.DataFrame, interval, action): diff --git a/8Knot/pages/contributors/visualizations/contributors_types_over_time.py b/8Knot/pages/contributors/visualizations/contributors_types_over_time.py index 7184bbf5..e8cec37a 100644 --- a/8Knot/pages/contributors/visualizations/contributors_types_over_time.py +++ b/8Knot/pages/contributors/visualizations/contributors_types_over_time.py @@ -4,10 +4,12 @@ from dash import callback from dash.dependencies import Input, Output, State import pandas as pd +import polars as pl import logging import numpy as np import plotly.express as px from pages.utils.graph_utils import get_graph_time_values, baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from pages.utils.job_utils import nodata_graph from queries.contributors_query import contributors_query as ctq import time @@ -189,69 +191,68 @@ def create_contrib_over_time_graph(repolist, contribs, interval, bot_switch): def process_data(df, interval, contribs): - # convert to datetime objects with consistent column name - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) - # df.rename(columns={"created_at": "created"}, inplace=True) - - # remove null contrib ids - df = df.dropna() - - # create column for identifying Drive by and Repeat Contributors - contributors = df["cntrb_id"][df["rank"] == contribs].to_list() - - # dfs for drive by and repeat contributors - df_drive_temp = df.loc[~df["cntrb_id"].isin(contributors)] - df_repeat_temp = df.loc[df["cntrb_id"].isin(contributors)] - - # order values chronologically by creation date - df = df.sort_values(by="created_at", axis=0, ascending=True) - - # variable to slice on to handle weekly period edge case - period_slice = None - if interval == "W": - # this is to slice the extra period information that comes with the weekly case - period_slice = 10 - - # create empty df for empty case - df_drive = pd.DataFrame(columns=["Date", "Drive"]) - df_drive["Drive"] = df_drive.Drive.astype("int64") - - # fill df only if there is data - if not df_drive_temp.empty: - # df for drive by contributros in time interval - df_drive = ( - # disable and re-enable formatter - # fmt: off - df_drive_temp.groupby(by=df_drive_temp.created_at.dt.to_period(interval))["cntrb_id"] - # fmt: on - .nunique() - .reset_index() - .rename(columns={"cntrb_id": "Drive", "created_at": "Date"}) + """ + Process contributor types over time data using Polars for performance. + + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === + + # Convert to Polars for fast processing + pl_df = to_polars(df) + + # Convert to datetime and drop nulls + pl_df = pl_df.with_columns(pl.col("created_at").cast(pl.Datetime("us", "UTC"))) + pl_df = pl_df.drop_nulls() + + # Get contributors with specified rank + contributors = pl_df.filter(pl.col("rank") == contribs).select("cntrb_id").unique().to_series().to_list() + contributors_set = set(contributors) + + # Split into drive-by and repeat contributors + pl_drive = pl_df.filter(~pl.col("cntrb_id").is_in(contributors_set)) + pl_repeat = pl_df.filter(pl.col("cntrb_id").is_in(contributors_set)) + + # Map interval to Polars truncation format + interval_map = {"D": "1d", "W": "1w", "M": "1mo", "Y": "1y"} + polars_interval = interval_map.get(interval, "1mo") + + # Count unique drive-by contributors per period + if pl_drive.height > 0: + pl_drive_result = ( + pl_drive.with_columns(pl.col("created_at").dt.truncate(polars_interval).alias("Date")) + .group_by("Date") + .agg(pl.col("cntrb_id").n_unique().alias("Drive")) ) - df_drive["Date"] = pd.to_datetime(df_drive["Date"].astype(str).str[:period_slice]) - - # create empty df for empty case - df_repeat = pd.DataFrame(columns=["Date", "Repeat"]) - df_repeat["Repeat"] = df_repeat.Repeat.astype("int64") - - # fill df only if there is data - if not df_repeat_temp.empty: - # df for repeat contributors in time interval - df_repeat = ( - # disable and re-enable formatter - # fmt: off - df_repeat_temp.groupby(by=df_repeat_temp.created_at.dt.to_period(interval))["cntrb_id"] - # fmt: on - .nunique() - .reset_index() - .rename(columns={"cntrb_id": "Repeat", "created_at": "Date"}) + else: + pl_drive_result = pl.DataFrame({"Date": [], "Drive": []}) + + # Count unique repeat contributors per period + if pl_repeat.height > 0: + pl_repeat_result = ( + pl_repeat.with_columns(pl.col("created_at").dt.truncate(polars_interval).alias("Date")) + .group_by("Date") + .agg(pl.col("cntrb_id").n_unique().alias("Repeat")) ) - df_repeat["Date"] = pd.to_datetime(df_repeat["Date"].astype(str).str[:period_slice]) + else: + pl_repeat_result = pl.DataFrame({"Date": [], "Repeat": []}) - # A single df created for plotting merged and closed as stacked bar chart - df_drive_repeat = pd.merge(df_drive, df_repeat, on="Date", how="outer") + # Join drive and repeat data + if pl_drive_result.height > 0 and pl_repeat_result.height > 0: + pl_result = pl_drive_result.join(pl_repeat_result, on="Date", how="full").sort("Date") + elif pl_drive_result.height > 0: + pl_result = pl_drive_result.with_columns(pl.lit(None).cast(pl.UInt32).alias("Repeat")).sort("Date") + elif pl_repeat_result.height > 0: + pl_result = pl_repeat_result.with_columns(pl.lit(None).cast(pl.UInt32).alias("Drive")).sort("Date") + else: + pl_result = pl.DataFrame({"Date": [], "Drive": [], "Repeat": []}) - # formating for graph generation + # === POLARS PROCESSING END === + + # Convert to Pandas for visualization + df_drive_repeat = to_pandas(pl_result) + + # Format dates for graph generation if interval == "M": df_drive_repeat["Date"] = df_drive_repeat["Date"].dt.strftime("%Y-%m-01") elif interval == "Y": diff --git a/8Knot/pages/contributors/visualizations/first_time_contributions.py b/8Knot/pages/contributors/visualizations/first_time_contributions.py index 205b3892..488f4d26 100644 --- a/8Knot/pages/contributors/visualizations/first_time_contributions.py +++ b/8Knot/pages/contributors/visualizations/first_time_contributions.py @@ -4,9 +4,11 @@ from dash import callback from dash.dependencies import Input, Output, State import pandas as pd +import polars as pl import logging import plotly.express as px from pages.utils.graph_utils import baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from queries.contributors_query import contributors_query as ctq import time from pages.utils.job_utils import nodata_graph @@ -127,17 +129,26 @@ def create_first_time_contributors_graph(repolist, bot_switch): def process_data(df): - # convert to datetime objects with consistent column name - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) - # df.rename(columns={"created_at": "created"}, inplace=True) + """ + Process first-time contribution data using Polars for performance. - # selection for 1st contribution only - df = df[df["rank"] == 1] + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === - # reset index to be ready for plotly - df = df.reset_index() + # Convert to Polars for fast processing + pl_df = to_polars(df) - return df + # Convert to datetime + pl_df = pl_df.with_columns(pl.col("created_at").cast(pl.Datetime("us", "UTC"))) + + # Filter for first contributions only (rank == 1) + pl_df = pl_df.filter(pl.col("rank") == 1) + + # === POLARS PROCESSING END === + + # Convert to Pandas for visualization + return to_pandas(pl_df) def create_figure(df): diff --git a/8Knot/pages/contributors/visualizations/new_contributor.py b/8Knot/pages/contributors/visualizations/new_contributor.py index 460f86e1..66f20ecb 100644 --- a/8Knot/pages/contributors/visualizations/new_contributor.py +++ b/8Knot/pages/contributors/visualizations/new_contributor.py @@ -4,9 +4,11 @@ from dash import callback from dash.dependencies import Input, Output, State import pandas as pd +import polars as pl import logging import plotly.express as px from pages.utils.graph_utils import get_graph_time_values, baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from queries.contributors_query import contributors_query as ctq from pages.utils.job_utils import nodata_graph import time @@ -158,43 +160,38 @@ def new_contributor_graph(repolist, interval, bot_switch): def process_data(df, interval): - # convert to datetime objects with consistent column name - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) - # df.rename(columns={"created_at": "created"}, inplace=True) - - # order from beginning of time to most recent - df = df.sort_values("created_at", axis=0, ascending=True) - """ - Assume that the cntrb_id values are unique to individual contributors. - Find the first rank-1 contribution of the contributors, saving the created - date. + Process new contributor data using Polars for performance, returning Pandas for visualization. + + Follows the "Polars Core, Pandas Edge" architecture. """ + # === POLARS PROCESSING START === + + # Convert to Polars for fast processing + pl_df = to_polars(df) + + # Convert to datetime and sort + pl_df = pl_df.with_columns(pl.col("created_at").cast(pl.Datetime("us", "UTC"))) + pl_df = pl_df.sort("created_at") - # keep only first contributions - df = df[df["rank"] == 1] + # Keep only first contributions (rank == 1) and unique contributors + pl_df = pl_df.filter(pl.col("rank") == 1).unique(subset=["cntrb_id"], keep="first") - # get all of the unique entries by contributor ID - df = df.drop_duplicates(subset=["cntrb_id"]) - df = df.reset_index(drop=True) + # Truncate to period for grouping + interval_map = {"D": "1d", "W": "1w", "M": "1mo", "Y": "1y"} + polars_interval = interval_map.get(interval, "1mo") - # variable to slice on to handle weekly period edge case - period_slice = None - if interval == "W": - # this is to slice the extra period information that comes with the weekly case - period_slice = 10 + pl_df = pl_df.with_columns(pl.col("created_at").dt.truncate(polars_interval).alias("Date")) - # get the count of new contributors in the desired interval in pandas period format, sort index to order entries - created_range = pd.to_datetime(df["created_at"]).dt.to_period(interval).value_counts().sort_index() + # Group by period and count + pl_result = pl_df.group_by("Date").agg(pl.len().alias("contribs")).sort("Date") - # converts to data frame object and creates date column from period values - df_contribs = created_range.to_frame().reset_index().rename(columns={"created_at": "Date", "count": "contribs"}) + # Convert to Pandas for visualization + df_contribs = to_pandas(pl_result) - # converts date column to a datetime object, converts to string first to handle period information - df_contribs["Date"] = pd.to_datetime(df_contribs["Date"].astype(str)) + # === POLARS PROCESSING END === - # correction for year binning - - # rounded up to next year so this is a simple patch + # Correction for year binning if interval == "Y": df_contribs["Date"] = df_contribs["Date"].dt.year elif interval == "M": diff --git a/8Knot/pages/utils/polars_utils.py b/8Knot/pages/utils/polars_utils.py index c70aee78..1da14c25 100644 --- a/8Knot/pages/utils/polars_utils.py +++ b/8Knot/pages/utils/polars_utils.py @@ -144,7 +144,8 @@ class Expressions: """ @staticmethod - def count_open_at_date( + def is_open_at_date( + date, created_col: str = "created_at", closed_col: str = "closed_at", ) -> pl.Expr: @@ -153,16 +154,163 @@ def count_open_at_date( An item is open if: created_at <= date AND (closed_at > date OR closed_at is null) """ - # This is a template - actual date comparison needs to be done in context - return (pl.col(created_col).is_not_null()) & ( - pl.col(closed_col).is_null() | (pl.col(closed_col) > pl.col(created_col)) - ) + return (pl.col(created_col) <= date) & (pl.col(closed_col).is_null() | (pl.col(closed_col) > date)) @staticmethod - def safe_log(col: str) -> pl.Expr: + def safe_log(col: str, alias: str = None) -> pl.Expr: """ Safe logarithm that handles zero values. Returns 0 for zero values, log(x) otherwise. """ - return pl.when(pl.col(col) != 0).then(pl.col(col).log()).otherwise(0) + expr = pl.when(pl.col(col) != 0).then(pl.col(col).log()).otherwise(0) + return expr.alias(alias) if alias else expr + + @staticmethod + def truncate_to_period(col: str, interval: str) -> pl.Expr: + """ + Truncate datetime column to a period (day, week, month, year). + + Args: + col: Column name + interval: "D", "W", "M", or "Y" + + Returns: + Polars expression + """ + interval_map = {"D": "1d", "W": "1w", "M": "1mo", "Y": "1y"} + polars_interval = interval_map.get(interval, "1mo") + return pl.col(col).dt.truncate(polars_interval) + + @staticmethod + def to_utc_datetime(col: str) -> pl.Expr: + """Convert a column to UTC datetime.""" + return pl.col(col).cast(pl.Datetime("us", "UTC")) + + @staticmethod + def count_in_range( + date, + created_col: str = "created_at", + closed_col: str = "closed_at", + ) -> int: + """ + Count items open at a specific date. + + This is a helper for use with filter operations. + """ + return (pl.col(created_col) <= date) & (pl.col(closed_col).is_null() | (pl.col(closed_col) > date)) + + +# Lazy evaluation helpers for complex aggregations +class LazyPatterns: + """ + Common lazy evaluation patterns for Polars. + + Lazy evaluation allows Polars to optimize the entire query plan + before execution. Use these patterns for complex multi-step operations. + """ + + @staticmethod + def group_count_by_period( + df: pl.DataFrame, + date_col: str, + interval: str, + count_col: str = None, + unique: bool = False, + ) -> pl.DataFrame: + """ + Group by time period and count (optionally unique values). + + Args: + df: Polars DataFrame + date_col: Column to use for grouping + interval: "D", "W", "M", or "Y" + count_col: Column to count (if None, counts rows) + unique: If True, count unique values + + Returns: + Aggregated DataFrame + + Example: + # Count unique commits per month + result = LazyPatterns.group_count_by_period( + df, "created_at", "M", count_col="commit_hash", unique=True + ) + """ + interval_map = {"D": "1d", "W": "1w", "M": "1mo", "Y": "1y"} + polars_interval = interval_map.get(interval, "1mo") + + lf = df.lazy().with_columns(pl.col(date_col).dt.truncate(polars_interval).alias("_period")) + + if count_col: + if unique: + agg_expr = pl.col(count_col).n_unique().alias("count") + else: + agg_expr = pl.col(count_col).count().alias("count") + else: + agg_expr = pl.len().alias("count") + + return lf.group_by("_period").agg(agg_expr).sort("_period").collect() + + @staticmethod + def filter_and_aggregate( + df: pl.DataFrame, + filter_expr: pl.Expr, + group_by: Union[str, list], + agg_exprs: list, + ) -> pl.DataFrame: + """ + Filter, group, and aggregate in one optimized operation. + + Args: + df: Polars DataFrame + filter_expr: Polars filter expression + group_by: Column(s) to group by + agg_exprs: List of aggregation expressions + + Returns: + Aggregated DataFrame + + Example: + result = LazyPatterns.filter_and_aggregate( + df, + filter_expr=pl.col("status") == "active", + group_by="category", + agg_exprs=[pl.col("value").sum(), pl.col("value").mean()], + ) + """ + return df.lazy().filter(filter_expr).group_by(group_by).agg(agg_exprs).collect() + + @staticmethod + def cumsum_threshold_search( + df: pl.DataFrame, + value_col: str, + threshold: float, + ) -> int: + """ + Find the number of rows needed to reach a cumulative sum threshold. + + This is a vectorized replacement for iterrows() loops that calculate + cumulative sums until a threshold is reached. + + Args: + df: Polars DataFrame (sorted by the column of interest) + value_col: Column to cumsum + threshold: Target threshold + + Returns: + Number of rows needed to reach threshold + + Example: + # Find how many top contributors account for 80% of contributions + df_sorted = df.sort("contributions", descending=True) + n_rows = LazyPatterns.cumsum_threshold_search( + df_sorted, "contributions", total_contributions * 0.8 + ) + """ + cumsum = df.select(pl.col(value_col).cum_sum())[value_col] + # Find first index where cumsum >= threshold + indices = cumsum.to_numpy() >= threshold + if indices.any(): + return int(indices.argmax()) + 1 + return len(df)