diff --git a/8Knot/pages/repo_overview/visualizations/code_languages.py b/8Knot/pages/repo_overview/visualizations/code_languages.py index b44aa61a..02ec3b76 100644 --- a/8Knot/pages/repo_overview/visualizations/code_languages.py +++ b/8Knot/pages/repo_overview/visualizations/code_languages.py @@ -4,10 +4,12 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl import logging from dateutil.relativedelta import * # type: ignore import plotly.express as px from pages.utils.graph_utils import baby_blue +from pages.utils.polars_utils import to_polars, to_pandas from queries.repo_languages_query import repo_languages_query as rlq from pages.utils.job_utils import nodata_graph import time @@ -166,24 +168,59 @@ def code_languages_graph(repolist, view): return fig -def process_data(df: pd.DataFrame): +def process_data(df: pd.DataFrame) -> pd.DataFrame: + """ + Process language data using Polars for performance, returning Pandas for visualization. + + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === + + # Convert to Polars for fast processing + pl_df = to_polars(df) # SVG files give one line of code per file - df.loc[df["programming_language"] == "SVG", "code_lines"] = df["files"] + pl_df = pl_df.with_columns( + pl.when(pl.col("programming_language") == "SVG") + .then(pl.col("files")) + .otherwise(pl.col("code_lines")) + .alias("code_lines") + ) - # require a language to have atleast .1 % of total lines to be shown, if not grouped into other - min_lines = df["code_lines"].sum() / 1000 - df.loc[df.code_lines <= min_lines, "programming_language"] = "Other" - df = df[["programming_language", "code_lines", "files"]].groupby("programming_language").sum().reset_index() + # Calculate minimum lines threshold (0.1% of total) + total_lines = pl_df.select(pl.col("code_lines").sum()).item() + min_lines = total_lines / 1000 + + # Group languages with few lines into "Other" + pl_df = pl_df.with_columns( + pl.when(pl.col("code_lines") <= min_lines) + .then(pl.lit("Other")) + .otherwise(pl.col("programming_language")) + .alias("programming_language") + ) - # order by descending file number and reset format - df = df.sort_values(by="files", axis=0, ascending=False).reset_index(drop=True) + # Aggregate by language + pl_df = ( + pl_df.group_by("programming_language") + .agg([pl.col("code_lines").sum(), pl.col("files").sum()]) + .sort("files", descending=True) + ) + + # Calculate percentages + total_code = pl_df.select(pl.col("code_lines").sum()).item() + total_files = pl_df.select(pl.col("files").sum()).item() + + pl_df = pl_df.with_columns( + [ + ((pl.col("code_lines") / total_code) * 100).alias("Code %"), + ((pl.col("files") / total_files) * 100).alias("Files %"), + ] + ) - # calculate percentages - df["Code %"] = (df["code_lines"] / df["code_lines"].sum()) * 100 - df["Files %"] = (df["files"] / df["files"].sum()) * 100 + # === POLARS PROCESSING END === - return df + # Convert to Pandas at the visualization boundary + return to_pandas(pl_df) def create_figure(df: pd.DataFrame, view): diff --git a/8Knot/pages/repo_overview/visualizations/ossf_scorecard.py b/8Knot/pages/repo_overview/visualizations/ossf_scorecard.py index 4b530177..4168a1f4 100644 --- a/8Knot/pages/repo_overview/visualizations/ossf_scorecard.py +++ b/8Knot/pages/repo_overview/visualizations/ossf_scorecard.py @@ -3,8 +3,10 @@ import dash_bootstrap_components as dbc from dash.dependencies import Input, Output, State import pandas as pd +import polars as pl import logging from dateutil.relativedelta import * # type: ignore +from pages.utils.polars_utils import to_polars, to_pandas from queries.ossf_score_query import ossf_score_query as osq import io import cache_manager.cache_facade as cf @@ -121,27 +123,49 @@ def ossf_scorecard(repo: str): logging.warning(f"{VIZ_ID} - NO DATA AVAILABLE") return dbc.Table.from_dataframe(df, striped=True, bordered=True, hover=True), dbc.Label("No data") - # repo id not needed for table - df = df.drop(columns=["repo_id"]) + # Process data using Polars, return Pandas for visualization + df_result, updated_date = process_data(df) - # get all values from the data_collection_date column - updated_times = pd.to_datetime(df["data_collection_date"]) + table = dbc.Table.from_dataframe(df_result, striped=True, bordered=True, hover=True) + + logging.warning(f"{VIZ_ID} - END - {time.perf_counter() - start}") + return table, dbc.Label(updated_date) - # we dont need to display this column for every entry - df = df.drop(columns=["data_collection_date"]) - df.loc[df.name == "OSSF_SCORECARD_AGGREGATE_SCORE", "name"] = "Aggregate Score" - df = df.sort_values("name", ascending=True) - df = df.rename(columns={"name": "Check Type", "score": "Score"}) +def process_data(df: pd.DataFrame) -> tuple[pd.DataFrame, str]: + """ + Process OSSF scorecard data using Polars for performance, returning Pandas for visualization. - table = dbc.Table.from_dataframe(df, striped=True, bordered=True, hover=True) + Follows the "Polars Core, Pandas Edge" architecture. + """ + # === POLARS PROCESSING START === - unique_updated_times = updated_times.drop_duplicates().to_numpy().flatten() + # Convert to Polars for fast processing + pl_df = to_polars(df) - if len(unique_updated_times) > 1: + # Get last update date + updated_times = pl_df.select(pl.col("data_collection_date").cast(pl.Datetime)).unique() + if updated_times.height > 1: logging.warning(f"{VIZ_ID} - MORE THAN ONE DATA COLLECTION DATE") + updated_date = updated_times.row(-1)[0].strftime("%d/%m/%Y") if updated_times.height > 0 else "Unknown" - updated_date = pd.to_datetime(str(unique_updated_times[-1])).strftime("%d/%m/%Y") + # Drop unnecessary columns + pl_df = pl_df.drop(["repo_id", "data_collection_date"]) - logging.warning(f"{VIZ_ID} - END - {time.perf_counter() - start}") - return table, dbc.Label(updated_date) + # Rename aggregate score and sort + pl_df = pl_df.with_columns( + pl.when(pl.col("name") == "OSSF_SCORECARD_AGGREGATE_SCORE") + .then(pl.lit("Aggregate Score")) + .otherwise(pl.col("name")) + .alias("name") + ) + + pl_df = pl_df.sort("name") + + # Rename columns for display + pl_df = pl_df.rename({"name": "Check Type", "score": "Score"}) + + # === POLARS PROCESSING END === + + # Convert to Pandas at the visualization boundary + return to_pandas(pl_df), updated_date diff --git a/8Knot/pages/repo_overview/visualizations/repo_general_info.py b/8Knot/pages/repo_overview/visualizations/repo_general_info.py index e90f04f9..b0a1e3f7 100644 --- a/8Knot/pages/repo_overview/visualizations/repo_general_info.py +++ b/8Knot/pages/repo_overview/visualizations/repo_general_info.py @@ -4,10 +4,12 @@ from dash.dependencies import Input, Output, State import plotly.graph_objects as go import pandas as pd +import polars as pl import logging from dateutil.relativedelta import * # type: ignore import plotly.express as px from pages.utils.graph_utils import get_graph_time_values, color_seq +from pages.utils.polars_utils import to_polars, to_pandas from queries.repo_info_query import repo_info_query as riq # from queries.repo_files_query import repo_files_query as rfq #TODO: run back on when the query hang is fixed @@ -103,70 +105,75 @@ def repo_general_info(repo): def process_data(df_repo_files, df_repo_info, df_releases): + """ + Process repository data using Polars for performance, returning Pandas for visualization. - updated_times_repo_info = pd.to_datetime(df_repo_info["data_collection_date"]) + This follows the "Polars Core, Pandas Edge" architecture: + - Core processing in Polars (2-10x faster) + - Return Pandas DataFrame for Plotly/Dash compatibility + """ + # === POLARS PROCESSING START === - unique_updated_times = updated_times_repo_info.drop_duplicates().to_numpy().flatten() + # Convert to Polars for fast processing + pl_repo_info = to_polars(df_repo_info) + pl_releases = to_polars(df_releases) if not df_releases.empty else pl.DataFrame() + pl_files = to_polars(df_repo_files) if not df_repo_files.empty else pl.DataFrame() - if len(unique_updated_times) > 1: + # Get last update date + updated_times = pl_repo_info.select(pl.col("data_collection_date").cast(pl.Datetime)).unique() + if updated_times.height > 1: logging.warning(f"{VIZ_ID} - MORE THAN ONE LAST UPDATE DATE") - - updated_date = pd.to_datetime(str(unique_updated_times[-1])).strftime("%d/%m/%Y") - - # convert to datetime objects rather than strings - df_releases["release_published_at"] = pd.to_datetime(df_releases["release_published_at"], utc=True) - - # release information preprocessing - # get date of previous row/previous release - df_releases["previous_release"] = df_releases["release_published_at"].shift() - # calculate difference - df_releases["time_bt_release"] = df_releases["release_published_at"] - df_releases["previous_release"] - # reformat to days (vectorized - faster than .apply()) - df_releases["time_bt_release"] = df_releases["time_bt_release"].dt.days - - # release info initial assignments - num_releases = df_releases.shape[0] - last_release_date = df_releases["release_published_at"].max() - avg_release_time = df_releases["time_bt_release"].abs().mean().round(1) - - # reformat based on if there are any releases - if num_releases == 0: + updated_date = updated_times.row(-1)[0].strftime("%d/%m/%Y") if updated_times.height > 0 else "Unknown" + + # Release information processing with Polars + if pl_releases.height > 0: + pl_releases = pl_releases.with_columns(pl.col("release_published_at").cast(pl.Datetime("us", "UTC"))) + pl_releases = pl_releases.with_columns(pl.col("release_published_at").shift(1).alias("previous_release")) + pl_releases = pl_releases.with_columns( + (pl.col("release_published_at") - pl.col("previous_release")).dt.total_days().alias("time_bt_release") + ) + + num_releases = pl_releases.height + last_release_date = pl_releases.select(pl.col("release_published_at").max()).item() + avg_release_time = pl_releases.select(pl.col("time_bt_release").abs().mean()).item() + + if avg_release_time is not None: + avg_release_time = f"{round(avg_release_time, 1)} Days" + else: + avg_release_time = "No Releases Found" + last_release_date = last_release_date.strftime("%Y-%m-%d") if last_release_date else "No Releases Found" + else: + num_releases = 0 avg_release_time = "No Releases Found" last_release_date = "No Releases Found" - else: - avg_release_time = str(avg_release_time) + " Days" - last_release_date = last_release_date.strftime("%Y-%m-%d") - - # direct varible assignment from query results - license = df_repo_info.loc[0, "license"] - stars_count = df_repo_info.loc[0, "stars_count"] - fork_count = df_repo_info.loc[0, "fork_count"] - watchers_count = df_repo_info.loc[0, "watchers_count"] - issues_enabled = df_repo_info.loc[0, "issues_enabled"].capitalize() - - # checks for code of conduct file - coc = df_repo_info.loc[0, "code_of_conduct_file"] - if coc is None: - coc = "File not found" - else: - coc = "File found" - # check files for CONTRIBUTING.md - contrib_guide = (df_repo_files["file_name"].eq("CONTRIBUTING.md")).any() - if contrib_guide: - contrib_guide = "File found" + # Extract repo info values using Polars + repo_info_row = pl_repo_info.row(0, named=True) + license_val = repo_info_row["license"] + stars_count = repo_info_row["stars_count"] + fork_count = repo_info_row["fork_count"] + watchers_count = repo_info_row["watchers_count"] + issues_enabled = str(repo_info_row["issues_enabled"]).capitalize() + + # Check for code of conduct file + coc = repo_info_row["code_of_conduct_file"] + coc = "File found" if coc is not None else "File not found" + + # Check files for CONTRIBUTING.md and SECURITY.md using Polars + if pl_files.height > 0: + contrib_guide = pl_files.filter(pl.col("file_name") == "CONTRIBUTING.md").height > 0 + security_policy = pl_files.filter(pl.col("file_name") == "SECURITY.md").height > 0 else: - contrib_guide = "File not found" + contrib_guide = False + security_policy = False - # keep an eye out if github changes this to be located like coc - security_policy = (df_repo_files["file_name"].eq("SECURITY.md")).any() - if security_policy: - security_policy = "File found" - else: - security_policy = "File not found" + contrib_guide = "File found" if contrib_guide else "File not found" + security_policy = "File found" if security_policy else "File not found" + + # === POLARS PROCESSING END === - # create df to hold table information - df = pd.DataFrame( + # Create final DataFrame in Polars, then convert to Pandas for visualization + pl_result = pl.DataFrame( { "Section": [ "License", @@ -182,22 +189,23 @@ def process_data(df_repo_files, df_repo_info, df_releases): "Issues Enabled", ], "Info": [ - license, + str(license_val) if license_val else "Unknown", coc, contrib_guide, security_policy, - num_releases, + str(num_releases), last_release_date, avg_release_time, - stars_count, - fork_count, - watchers_count, + str(stars_count), + str(fork_count), + str(watchers_count), issues_enabled, ], } ) - return df, dbc.Label(updated_date) + # Convert to Pandas at the visualization boundary + return to_pandas(pl_result), dbc.Label(updated_date) def multi_query_helper(repos: list[int]): diff --git a/8Knot/pages/utils/polars_utils.py b/8Knot/pages/utils/polars_utils.py new file mode 100644 index 00000000..c70aee78 --- /dev/null +++ b/8Knot/pages/utils/polars_utils.py @@ -0,0 +1,168 @@ +""" +Polars utilities for 8Knot. + +This module provides the adapter layer for the "Polars Core, Pandas Edge" architecture: +- Core data processing uses Polars for 2-10x performance improvements +- Visualization boundary uses Pandas for Plotly/Dash compatibility + +Architecture: + Database → Query Layer (Polars) → Processing (Polars) → Visualization (Pandas → Plotly) + +Usage: + from pages.utils.polars_utils import to_polars, to_pandas, process_with_polars + + # Simple conversion + pl_df = to_polars(pandas_df) + result = to_pandas(polars_df) + + # Process with automatic conversion + def my_processor(pl_df): + return pl_df.filter(pl.col("x") > 0).group_by("category").agg(pl.col("value").sum()) + + result = process_with_polars(pandas_df, my_processor) # Returns Pandas DataFrame +""" + +from typing import Callable, Union + +import pandas as pd +import polars as pl + +# Type alias for DataFrame compatibility +DataFrameLike = Union[pd.DataFrame, pl.DataFrame] + + +def to_polars(df: pd.DataFrame) -> pl.DataFrame: + """ + Convert Pandas DataFrame to Polars for high-performance processing. + + Uses Arrow interchange for near zero-copy conversion when possible. + + Args: + df: Input Pandas DataFrame + + Returns: + Polars DataFrame ready for processing + """ + return pl.from_pandas(df) + + +def to_pandas(df: pl.DataFrame) -> pd.DataFrame: + """ + Convert Polars DataFrame to Pandas for visualization layer. + + This should be called at the visualization boundary, right before + passing data to Plotly/Dash components. + + Args: + df: Input Polars DataFrame + + Returns: + Pandas DataFrame ready for Plotly/Dash + """ + return df.to_pandas() + + +def process_with_polars( + df: pd.DataFrame, + processor: Callable[[pl.DataFrame], pl.DataFrame], +) -> pd.DataFrame: + """ + Process a Pandas DataFrame with Polars and return Pandas. + + This is a convenience wrapper that handles the Pandas → Polars → Pandas + conversion automatically. Use this when you want to leverage Polars + performance while maintaining Pandas compatibility at boundaries. + + Args: + df: Input Pandas DataFrame + processor: Function that takes a Polars DataFrame and returns a Polars DataFrame + + Returns: + Pandas DataFrame (result of processing) + + Example: + def aggregate_by_category(pl_df: pl.DataFrame) -> pl.DataFrame: + return ( + pl_df.lazy() + .filter(pl.col("status") == "active") + .group_by("category") + .agg(pl.col("value").sum()) + .collect() + ) + + result = process_with_polars(pandas_df, aggregate_by_category) + # result is a Pandas DataFrame ready for Plotly + """ + pl_df = to_polars(df) + result = processor(pl_df) + return to_pandas(result) + + +def lazy_process( + df: pd.DataFrame, + processor: Callable[[pl.LazyFrame], pl.LazyFrame], +) -> pd.DataFrame: + """ + Process a Pandas DataFrame with Polars lazy evaluation. + + Lazy evaluation allows Polars to optimize the entire query plan + before execution, potentially resulting in significant speedups. + + Args: + df: Input Pandas DataFrame + processor: Function that takes a Polars LazyFrame and returns a LazyFrame + + Returns: + Pandas DataFrame (result of processing) + + Example: + def complex_aggregation(lf: pl.LazyFrame) -> pl.LazyFrame: + return ( + lf.filter(pl.col("value") > 0) + .with_columns(pl.col("date").dt.month().alias("month")) + .group_by("month") + .agg([ + pl.col("value").sum().alias("total"), + pl.col("value").mean().alias("avg"), + ]) + ) + + result = lazy_process(pandas_df, complex_aggregation) + """ + pl_df = to_polars(df) + lazy_result = processor(pl_df.lazy()) + return to_pandas(lazy_result.collect()) + + +# Common Polars expressions for reuse +class Expressions: + """ + Common Polars expressions used across visualizations. + + These are pre-built expression patterns that can be reused + to ensure consistency and avoid duplication. + """ + + @staticmethod + def count_open_at_date( + created_col: str = "created_at", + closed_col: str = "closed_at", + ) -> pl.Expr: + """ + Expression to check if an item is open at a given date. + + An item is open if: created_at <= date AND (closed_at > date OR closed_at is null) + """ + # This is a template - actual date comparison needs to be done in context + return (pl.col(created_col).is_not_null()) & ( + pl.col(closed_col).is_null() | (pl.col(closed_col) > pl.col(created_col)) + ) + + @staticmethod + def safe_log(col: str) -> pl.Expr: + """ + Safe logarithm that handles zero values. + + Returns 0 for zero values, log(x) otherwise. + """ + return pl.when(pl.col(col) != 0).then(pl.col(col).log()).otherwise(0) diff --git a/pyproject.toml b/pyproject.toml index b37954d2..67475d82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "numpy~=2.0", "pandas~=2.3.0", "plotly~=6.3", + "polars~=1.30", "psycopg2-binary==2.9.9", "pyarrow~=21.0", "python-dateutil~=2.9",