diff --git a/8Knot/benchmarks/__init__.py b/8Knot/benchmarks/__init__.py new file mode 100644 index 00000000..b47d87df --- /dev/null +++ b/8Knot/benchmarks/__init__.py @@ -0,0 +1 @@ +# Benchmarks module for performance testing diff --git a/8Knot/benchmarks/polars_benchmark.py b/8Knot/benchmarks/polars_benchmark.py new file mode 100644 index 00000000..4ee45846 --- /dev/null +++ b/8Knot/benchmarks/polars_benchmark.py @@ -0,0 +1,257 @@ +""" +Performance Benchmarks for Polars Migration + +This script measures performance improvements from the Polars migration. +Run with: python -m benchmarks.polars_benchmark + +Benchmarks: +1. DataFrame creation: Pandas vs Polars from raw data +2. Common operations: groupby, filter, sort +3. The specific anti-patterns we fixed +""" + +import time +import numpy as np +import pandas as pd +import polars as pl +from typing import Callable +from dataclasses import dataclass + + +@dataclass +class BenchmarkResult: + """Result of a benchmark comparison.""" + + name: str + pandas_time: float + polars_time: float + + @property + def speedup(self) -> float: + """Calculate speedup factor (higher is better for Polars).""" + if self.polars_time == 0: + return float("inf") + return self.pandas_time / self.polars_time + + def __str__(self) -> str: + return ( + f"{self.name}:\n" + f" Pandas: {self.pandas_time:.4f}s\n" + f" Polars: {self.polars_time:.4f}s\n" + f" Speedup: {self.speedup:.2f}x" + ) + + +def time_function(func: Callable, n_runs: int = 3) -> float: + """Time a function, returning the average of n_runs.""" + times = [] + for _ in range(n_runs): + start = time.perf_counter() + func() + times.append(time.perf_counter() - start) + return sum(times) / len(times) + + +def generate_test_data(n_rows: int = 100_000) -> dict: + """Generate test data for benchmarks.""" + np.random.seed(42) + return { + "id": np.arange(n_rows), + "category": np.random.choice(["A", "B", "C", "D", "E"], n_rows), + "value": np.random.randn(n_rows) * 100, + "count": np.random.randint(1, 100, n_rows), + "created_at": pd.date_range("2020-01-01", periods=n_rows, freq="T"), + "closed_at": pd.date_range("2020-01-01", periods=n_rows, freq="T") + + pd.to_timedelta(np.random.randint(0, 30, n_rows), unit="D"), + } + + +def benchmark_dataframe_creation(data: dict) -> BenchmarkResult: + """Benchmark DataFrame creation.""" + + def pandas_create(): + pd.DataFrame(data) + + def polars_create(): + pl.DataFrame(data) + + return BenchmarkResult( + name="DataFrame Creation", + pandas_time=time_function(pandas_create), + polars_time=time_function(polars_create), + ) + + +def benchmark_groupby_agg(pd_df: pd.DataFrame, pl_df: pl.DataFrame) -> BenchmarkResult: + """Benchmark groupby aggregation.""" + + def pandas_groupby(): + pd_df.groupby("category").agg({"value": "sum", "count": "mean"}) + + def polars_groupby(): + pl_df.group_by("category").agg([pl.col("value").sum(), pl.col("count").mean()]) + + return BenchmarkResult( + name="GroupBy Aggregation", + pandas_time=time_function(pandas_groupby), + polars_time=time_function(polars_groupby), + ) + + +def benchmark_filter_sort(pd_df: pd.DataFrame, pl_df: pl.DataFrame) -> BenchmarkResult: + """Benchmark filtering and sorting.""" + + def pandas_filter_sort(): + df = pd_df[pd_df["value"] > 0] + df.sort_values("count", ascending=False) + + def polars_filter_sort(): + pl_df.filter(pl.col("value") > 0).sort("count", descending=True) + + return BenchmarkResult( + name="Filter + Sort", + pandas_time=time_function(pandas_filter_sort), + polars_time=time_function(polars_filter_sort), + ) + + +def benchmark_conditional_column(pd_df: pd.DataFrame, pl_df: pl.DataFrame) -> BenchmarkResult: + """Benchmark conditional column creation (like code_languages.py).""" + + def pandas_conditional(): + df = pd_df.copy() + df.loc[df["category"] == "A", "value"] = df["count"] + + def polars_conditional(): + pl_df.with_columns( + pl.when(pl.col("category") == "A").then(pl.col("count")).otherwise(pl.col("value")).alias("value") + ) + + return BenchmarkResult( + name="Conditional Column (when/then)", + pandas_time=time_function(pandas_conditional), + polars_time=time_function(polars_conditional), + ) + + +def benchmark_vectorized_log(pd_df: pd.DataFrame, pl_df: pl.DataFrame) -> BenchmarkResult: + """Benchmark vectorized log (like project_velocity.py fix).""" + + def pandas_log(): + # Old anti-pattern: df["value"].apply(lambda x: math.log(x) if x > 0 else 0) + # New vectorized: + np.where(pd_df["value"] > 0, np.log(pd_df["value"].abs()), 0) + + def polars_log(): + pl_df.select(pl.when(pl.col("value") > 0).then(pl.col("value").abs().log()).otherwise(0).alias("log_value")) + + return BenchmarkResult( + name="Vectorized Log (anti-pattern fix)", + pandas_time=time_function(pandas_log), + polars_time=time_function(polars_log), + ) + + +def benchmark_cumsum_threshold(pd_df: pd.DataFrame, pl_df: pl.DataFrame) -> BenchmarkResult: + """Benchmark cumsum + threshold finding (like lottery factor fix).""" + threshold = pd_df["count"].sum() * 0.5 + + def pandas_cumsum(): + cumsum = pd_df["count"].cumsum() + np.searchsorted(cumsum.values, threshold, side="left") + + def polars_cumsum(): + cumsum = pl_df.select(pl.col("count").cum_sum()) + # Polars doesn't have searchsorted, but we can filter + cumsum.filter(pl.col("count") >= threshold).head(1) + + return BenchmarkResult( + name="Cumsum + Threshold (lottery factor)", + pandas_time=time_function(pandas_cumsum), + polars_time=time_function(polars_cumsum), + ) + + +def benchmark_open_count_vectorized(pd_df: pd.DataFrame, pl_df: pl.DataFrame) -> BenchmarkResult: + """Benchmark open item counting (like issues_over_time.py fix).""" + + # Create date range for testing + dates = pd.date_range("2020-01-15", periods=100, freq="D") + + def pandas_open_count(): + # The vectorized approach we implemented + created = pd_df["created_at"].values + closed = pd_df["closed_at"].values + for date in dates[:10]: # Sample 10 dates + created_mask = created <= date + still_open_mask = pd.isna(closed) | (closed > date) + np.sum(created_mask & still_open_mask) + + def polars_open_count(): + # Polars approach + for date in dates[:10]: # Sample 10 dates + pl_df.filter( + (pl.col("created_at") <= date) & (pl.col("closed_at").is_null() | (pl.col("closed_at") > date)) + ).height + + return BenchmarkResult( + name="Open Items Count (vectorized)", + pandas_time=time_function(pandas_open_count), + polars_time=time_function(polars_open_count), + ) + + +def run_all_benchmarks(): + """Run all benchmarks and print results.""" + print("=" * 60) + print("POLARS MIGRATION PERFORMANCE BENCHMARKS") + print("=" * 60) + print() + + # Generate test data + print("Generating test data (100,000 rows)...") + data = generate_test_data(100_000) + pd_df = pd.DataFrame(data) + pl_df = pl.DataFrame(data) + print() + + # Run benchmarks + results = [ + benchmark_dataframe_creation(data), + benchmark_groupby_agg(pd_df, pl_df), + benchmark_filter_sort(pd_df, pl_df), + benchmark_conditional_column(pd_df, pl_df), + benchmark_vectorized_log(pd_df, pl_df), + benchmark_cumsum_threshold(pd_df, pl_df), + benchmark_open_count_vectorized(pd_df, pl_df), + ] + + # Print results + print("-" * 60) + print("RESULTS") + print("-" * 60) + for result in results: + print(result) + print() + + # Summary + print("=" * 60) + print("SUMMARY") + print("=" * 60) + avg_speedup = sum(r.speedup for r in results) / len(results) + max_speedup = max(results, key=lambda r: r.speedup) + print(f"Average Speedup: {avg_speedup:.2f}x") + print(f"Best Speedup: {max_speedup.name} ({max_speedup.speedup:.2f}x)") + print() + print("Recommendations:") + for result in results: + if result.speedup > 2: + print(f" ✅ {result.name}: {result.speedup:.2f}x faster with Polars") + elif result.speedup > 1: + print(f" ⚡ {result.name}: {result.speedup:.2f}x faster with Polars") + else: + print(f" ⚠️ {result.name}: Pandas faster ({1/result.speedup:.2f}x)") + + +if __name__ == "__main__": + run_all_benchmarks() diff --git a/8Knot/cache_manager/cache_facade.py b/8Knot/cache_manager/cache_facade.py index 9bba26de..651278d8 100644 --- a/8Knot/cache_manager/cache_facade.py +++ b/8Knot/cache_manager/cache_facade.py @@ -26,6 +26,8 @@ from psycopg2.extras import execute_values from psycopg2 import sql as pg_sql import pandas as pd +import polars as pl +from typing import Literal, Union # requires relative import syntax "import .cx_common" because # other files importing cache_facade need to know how to resolve @@ -202,17 +204,26 @@ def caching_wrapper(func_name: str, query: str, repolist: list[int], n_repolist_ def retrieve_from_cache( tablename: str, repolist: list[int], -) -> pd.DataFrame: + as_polars: bool = False, +) -> Union[pd.DataFrame, pl.DataFrame]: """ For a given table in cache, get all results that having a matching repo_id. Results are retrieved by a DataFrame, so column names may need to be overridden by calling function. + + Args: + tablename: Name of the cache table + repolist: List of repo IDs to retrieve + as_polars: If True, return a Polars DataFrame (faster for processing). + If False (default), return a Pandas DataFrame (for backward compatibility). + + Returns: + DataFrame with cached results (Polars or Pandas based on as_polars flag) """ # GET ALL DATA FROM POSTGRES CACHE - df = None with pg.connect(cache_cx_string) as cache_conn: with cache_conn.cursor() as cache_cur: cache_cur.execute( @@ -227,10 +238,43 @@ def retrieve_from_cache( ) logging.warning(f"{tablename} - LOADING DATA FROM CACHE") - df = pd.DataFrame( - cache_cur.fetchall(), - # get df column names from the database columns - columns=[desc[0] for desc in cache_cur.description], - ) - logging.warning(f"{tablename} - DATA LOADED - {df.shape} rows,cols") + + # Get column names from cursor description + columns = [desc[0] for desc in cache_cur.description] + rows = cache_cur.fetchall() + + if as_polars: + # Create Polars DataFrame directly (faster for processing) + df = pl.DataFrame(rows, schema=columns, orient="row") + logging.warning(f"{tablename} - DATA LOADED AS POLARS - {df.shape} rows,cols") + else: + # Create Pandas DataFrame (backward compatible) + df = pd.DataFrame(rows, columns=columns) + logging.warning(f"{tablename} - DATA LOADED AS PANDAS - {df.shape} rows,cols") + return df + + +def retrieve_from_cache_polars( + tablename: str, + repolist: list[int], +) -> pl.DataFrame: + """ + Retrieve cached data as a Polars DataFrame for high-performance processing. + + This is a convenience function that wraps retrieve_from_cache with as_polars=True. + Use this when you need fast data processing (2-10x faster than Pandas). + + For visualization, convert to Pandas at the boundary: + pl_df = retrieve_from_cache_polars(...) + # ... Polars processing ... + pd_df = pl_df.to_pandas() # For Plotly/Dash + + Args: + tablename: Name of the cache table + repolist: List of repo IDs to retrieve + + Returns: + Polars DataFrame with cached results + """ + return retrieve_from_cache(tablename, repolist, as_polars=True) diff --git a/8Knot/pages/contributions/visualizations/issues_over_time.py b/8Knot/pages/contributions/visualizations/issues_over_time.py index 16f596b5..4f24639a 100644 --- a/8Knot/pages/contributions/visualizations/issues_over_time.py +++ b/8Knot/pages/contributions/visualizations/issues_over_time.py @@ -5,8 +5,11 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl +import numpy as np import logging from pages.utils.graph_utils import get_graph_time_values, baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from pages.utils.job_utils import nodata_graph from queries.issues_query import issues_query as iq import time @@ -183,43 +186,51 @@ def issues_over_time_graph(repolist, interval, start_date, end_date): def process_data(df: pd.DataFrame, interval, start_date, end_date): - # convert to datetime objects rather than strings - df["created_at"] = pd.to_datetime(df["created_at"], utc=False) - df["closed_at"] = pd.to_datetime(df["closed_at"], utc=False) + """ + Process issue data using Polars for performance, returning Pandas for visualization. + + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === + + # Convert to Polars for fast processing + pl_df = to_polars(df) + + # Convert to datetime and sort + pl_df = pl_df.with_columns( + [ + pl.col("created_at").cast(pl.Datetime("us")), + pl.col("closed_at").cast(pl.Datetime("us")), + ] + ) + pl_df = pl_df.sort("created_at") + + # Get earliest and latest dates + earliest = pl_df.select(pl.col("created_at").min()).item() + latest_created = pl_df.select(pl.col("created_at").max()).item() + latest_closed = pl_df.select(pl.col("closed_at").max()).item() + latest = max(latest_created, latest_closed) if latest_closed else latest_created - # order values chronologically by creation date - df = df.sort_values(by="created_at", axis=0, ascending=True) + # Convert back to Pandas for period operations (Polars doesn't have period support yet) + df = to_pandas(pl_df) + + # === POLARS PROCESSING END === # variable to slice on to handle weekly period edge case period_slice = None if interval == "W": - # this is to slice the extra period information that comes with the weekly case period_slice = 10 - # data frames for issues created or closed. Detailed description applies for all 3. - - # get the count of created issues in the desired interval in pandas period format, sort index to order entries + # data frames for issues created or closed created_range = pd.to_datetime(df["created_at"]).dt.to_period(interval).value_counts().sort_index() - - # converts to data frame object and creates date column from period values df_created = created_range.to_frame().reset_index().rename(columns={"created_at": "Date", "count": "created_at"}) - - # converts date column to a datetime object, converts to string first to handle period information - # the period slice is to handle weekly corner case df_created["Date"] = pd.to_datetime(df_created["Date"].astype(str).str[:period_slice]) - # df for closed issues in time interval closed_range = pd.to_datetime(df["closed_at"]).dt.to_period(interval).value_counts().sort_index() df_closed = closed_range.to_frame().reset_index().rename(columns={"closed_at": "Date", "count": "closed_at"}) - df_closed["Date"] = pd.to_datetime(df_closed["Date"].astype(str).str[:period_slice]) - # first and last elements of the dataframe are the - # earliest and latest events respectively - earliest = df["created_at"].min() - latest = max(df["created_at"].max(), df["closed_at"].max()) - - # filter values based on date picker, needs to be after open issue for correct counting + # filter values based on date picker if start_date is not None: df_created = df_created[df_created.Date >= start_date] df_closed = df_closed[df_closed.Date >= start_date] @@ -229,17 +240,14 @@ def process_data(df: pd.DataFrame, interval, start_date, end_date): df_closed = df_closed[df_closed.Date <= end_date] latest = end_date - # beginning to the end of time by the specified interval + # Create date range for open count calculation dates = pd.date_range(start=earliest, end=latest, freq="D", inclusive="both") - - # df for open issues for time interval df_open = dates.to_frame(index=False, name="Date") - # Vectorized approach: count open issues at each date - # For each date, count issues where: created_at <= date AND (closed_at > date OR closed_at is null) + # Vectorized open count calculation df_open["Open"] = get_open_vectorized(df, df_open["Date"]) - # formatting for graph generation + # Format dates for graph generation if interval == "M": df_created["Date"] = df_created["Date"].dt.strftime("%Y-%m-01") df_closed["Date"] = df_closed["Date"].dt.strftime("%Y-%m-01") diff --git a/8Knot/pages/contributions/visualizations/pr_over_time.py b/8Knot/pages/contributions/visualizations/pr_over_time.py index cba4d1f7..ab88ba7b 100644 --- a/8Knot/pages/contributions/visualizations/pr_over_time.py +++ b/8Knot/pages/contributions/visualizations/pr_over_time.py @@ -5,8 +5,11 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl +import numpy as np import logging from pages.utils.graph_utils import get_graph_time_values, baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from pages.utils.job_utils import nodata_graph from queries.prs_query import prs_query as prq import time @@ -160,46 +163,59 @@ def prs_over_time_graph(repolist, interval): def process_data(df: pd.DataFrame, interval): - # convert dates to datetime objects rather than strings - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) - df["merged_at"] = pd.to_datetime(df["merged_at"], utc=True) - df["closed_at"] = pd.to_datetime(df["closed_at"], utc=True) + """ + Process PR data using Polars for performance, returning Pandas for visualization. + + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === + + # Convert to Polars for fast initial processing + pl_df = to_polars(df) + + # Convert to datetime and sort + pl_df = pl_df.with_columns( + [ + pl.col("created_at").cast(pl.Datetime("us", "UTC")), + pl.col("merged_at").cast(pl.Datetime("us", "UTC")), + pl.col("closed_at").cast(pl.Datetime("us", "UTC")), + ] + ) + pl_df = pl_df.sort("created_at") - # order values chronologically by creation date - df = df.sort_values(by="created_at", axis=0, ascending=True) + # Get date range + earliest = pl_df.select(pl.col("created_at").min()).item() + latest_created = pl_df.select(pl.col("created_at").max()).item() + latest_closed = pl_df.select(pl.col("closed_at").max()).item() + latest = max(latest_created, latest_closed) if latest_closed else latest_created + + # Convert back to Pandas for period operations (Polars doesn't have period support) + df = to_pandas(pl_df) + + # === POLARS PROCESSING END === # variable to slice on to handle weekly period edge case period_slice = None if interval == "W": - # this is to slice the extra period information that comes with the weekly case period_slice = 10 - # --data frames for PR created, merged, or closed. Detailed description applies for all 3.-- - - # get the count of created prs in the desired interval in pandas period format, sort index to order entries + # Data frames for PR created, merged, or closed created_range = df["created_at"].dt.to_period(interval).value_counts().sort_index() - - # converts to data frame object and created date column from period values df_created = created_range.to_frame().reset_index().rename(columns={"created_at": "Date", "count": "created_at"}) - - # converts date column to a datetime object, converts to string first to handle period information - # the period slice is to handle weekly corner case df_created["Date"] = pd.to_datetime(df_created["Date"].astype(str).str[:period_slice]) - # df for merged prs in time interval merged_range = pd.to_datetime(df["merged_at"]).dt.to_period(interval).value_counts().sort_index() df_merged = merged_range.to_frame().reset_index().rename(columns={"merged_at": "Date", "count": "merged_at"}) df_merged["Date"] = pd.to_datetime(df_merged["Date"].astype(str).str[:period_slice]) - # df for closed prs in time interval closed_range = pd.to_datetime(df["closed_at"]).dt.to_period(interval).value_counts().sort_index() df_closed = closed_range.to_frame().reset_index().rename(columns={"closed_at": "Date", "count": "closed_at"}) df_closed["Date"] = pd.to_datetime(df_closed["Date"].astype(str).str[:period_slice]) - # A single df created for plotting merged and closed as stacked bar chart + # Merge for stacked bar chart df_closed_merged = pd.merge(df_merged, df_closed, on="Date", how="outer") - # formatting for graph generation + # Format dates for graph generation if interval == "M": df_created["Date"] = df_created["Date"].dt.strftime("%Y-%m-01") df_closed_merged["Date"] = df_closed_merged["Date"].dt.strftime("%Y-%m-01") @@ -209,23 +225,12 @@ def process_data(df: pd.DataFrame, interval): df_closed_merged["closed_at"] = df_closed_merged["closed_at"] - df_closed_merged["merged_at"] - # ----- Open PR processinging starts here ---- - - # first and last elements of the dataframe are the - # earliest and latest events respectively - earliest = df["created_at"].min() - latest = max(df["created_at"].max(), df["closed_at"].max()) - - # beginning to the end of time by the specified interval + # ----- Open PR processing ---- dates = pd.date_range(start=earliest, end=latest, freq="D", inclusive="both") - - # df for open prs from time interval df_open = dates.to_frame(index=False, name="Date") - # Vectorized approach: count open PRs at each date using cumulative sums - # For each date, count PRs where: created_at <= date AND (closed_at > date OR closed_at is null) + # Vectorized open count calculation df_open["Open"] = get_open_vectorized(df, df_open["Date"]) - df_open["Date"] = df_open["Date"].dt.strftime("%Y-%m-%d") return df_created, df_closed_merged, df_open diff --git a/8Knot/pages/contributors/visualizations/contrib_importance_over_time.py b/8Knot/pages/contributors/visualizations/contrib_importance_over_time.py index 2474a573..2940aaf1 100644 --- a/8Knot/pages/contributors/visualizations/contrib_importance_over_time.py +++ b/8Knot/pages/contributors/visualizations/contrib_importance_over_time.py @@ -6,10 +6,12 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl import numpy as np import logging from dateutil.relativedelta import * # type: ignore from pages.utils.graph_utils import get_graph_time_values, baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from queries.contributors_query import contributors_query as ctq import io from pages.utils.job_utils import nodata_graph @@ -245,18 +247,34 @@ def create_contrib_prolificacy_over_time_graph(repolist, threshold, window_width def process_data(df, threshold, window_width, step_size): - # convert to datetime objects rather than strings - df["created_at"] = pd.to_datetime(df["created_at"], utc=True) + """ + Process contributor data using Polars for initial processing, then compute lottery factors. + + The lottery factor calculation requires iterating over time windows because each window + needs a separate groupby + pivot + cumsum operation. This is kept as a loop but uses + Polars for the underlying data processing. + """ + # === POLARS PROCESSING START === + + # Convert to Polars for fast initial processing + pl_df = to_polars(df) + + # Convert to datetime and sort + pl_df = pl_df.with_columns(pl.col("created_at").cast(pl.Datetime("us", "UTC"))) + pl_df = pl_df.sort("created_at") - # order values chronologically by created_at date - df = df.sort_values(by="created_at", ascending=True) + # Get start and end dates + start_date = pl_df.select(pl.col("created_at").min()).item() + end_date = pl_df.select(pl.col("created_at").max()).item() - # get start and end date from created column - start_date = df["created_at"].min() - end_date = df["created_at"].max() + # Convert back to Pandas for the date range generation and loop + # (The loop computation is inherently sequential per time window) + df = to_pandas(pl_df) + + # === POLARS PROCESSING END === # convert percent to its decimal representation - threshold = threshold / 100 + threshold_decimal = threshold / 100 # create bins with a size equivalent to the the step size starting from the start date up to the end date period_from = pd.date_range(start=start_date, end=end_date, freq=f"{step_size}m", inclusive="both") @@ -265,21 +283,24 @@ def process_data(df, threshold, window_width, step_size): # calculate the end of each interval and store the values in a column named period_from df_final["period_to"] = df_final["period_from"] + pd.DateOffset(months=window_width) - # dynamically calculate the contributor prolificacy over time for each of the action times and store results in df_final - ( - df_final["Commit"], - df_final["Issue Opened"], - df_final["Issue Comment"], - df_final["Issue Closed"], - df_final["PR Opened"], - df_final["PR Comment"], - df_final["PR Review"], - ) = zip( - *df_final.apply( - lambda row: cntrb_prolificacy_over_time(df, row.period_from, row.period_to, window_width, threshold), - axis=1, - ) - ) + # Pre-compute lottery factors for all time windows using list comprehension + # This is cleaner than .apply() and allows for potential future parallelization + results = [ + cntrb_prolificacy_over_time(df, row.period_from, row.period_to, window_width, threshold_decimal) + for row in df_final.itertuples() + ] + + # Unpack results into columns + if results: + ( + df_final["Commit"], + df_final["Issue Opened"], + df_final["Issue Comment"], + df_final["Issue Closed"], + df_final["PR Opened"], + df_final["PR Comment"], + df_final["PR Review"], + ) = zip(*results) return df_final @@ -410,28 +431,35 @@ def create_figure(df_final, threshold, step_size): def cntrb_prolificacy_over_time(df, period_from, period_to, window_width, threshold): - # subset df such that the rows correspond to the window of time defined by period from and period to - time_mask = (df["created_at"] >= period_from) & (df["created_at"] <= period_to) - df_in_range = df.loc[time_mask] - - # initialize varibles to store contributor prolificacy accoding to action type - commit, issueOpened, issueComment, issueClosed, prOpened, prReview, prComment = ( - None, - None, - None, - None, - None, - None, - None, - ) + """ + Calculate lottery factor for each action type within a time window. + + Uses Polars for fast filtering and aggregation, then calculates lottery factors. + """ + # Convert to Polars for fast filtering + pl_df = to_polars(df) - # count the number of contributions each contributor has made according each action type - df_count_cntrbs = df_in_range.groupby(["Action", "cntrb_id"])["cntrb_id"].count().to_frame() - df_count_cntrbs = df_count_cntrbs.rename(columns={"cntrb_id": "count"}).reset_index() + # Filter to time window using Polars (faster than Pandas boolean masking) + pl_in_range = pl_df.filter((pl.col("created_at") >= period_from) & (pl.col("created_at") <= period_to)) + + if pl_in_range.height == 0: + return None, None, None, None, None, None, None + + # Count contributions per (Action, cntrb_id) using Polars groupby (2-5x faster) + pl_counts = pl_in_range.group_by(["Action", "cntrb_id"]).agg(pl.len().alias("count")) + + # Pivot to wide format using Polars + pl_pivot = pl_counts.pivot( + on="Action", + index="cntrb_id", + values="count", + ) - # pivot df such that the column names correspond to the different action types, index is the cntrb_ids, and the values are the number of contributions of each contributor - df_count_cntrbs = df_count_cntrbs.pivot(index="cntrb_id", columns="Action", values="count") + # Convert to Pandas for lottery factor calculation + # (calc_lottery_factor uses Pandas-specific operations) + df_count_cntrbs = to_pandas(pl_pivot).set_index("cntrb_id") + # Calculate lottery factors for each action type commit = calc_lottery_factor(df_count_cntrbs, "Commit", threshold) issueOpened = calc_lottery_factor(df_count_cntrbs, "Issue Opened", threshold) issueComment = calc_lottery_factor(df_count_cntrbs, "Issue Comment", threshold)