diff --git a/posts/services/common.py b/posts/services/common.py index 21eabb4695..c5767d513d 100644 --- a/posts/services/common.py +++ b/posts/services/common.py @@ -343,7 +343,7 @@ def compute_sorting_divergence(post: Post) -> dict[int, float]: difference = prediction_difference_for_sorting( forecast.get_prediction_values(), cp.get_prediction_values(), - question, + question.type, ) if (forecast.author_id not in user_divergences) or ( abs(user_divergences[forecast.author_id]) < abs(difference) diff --git a/posts/services/subscriptions.py b/posts/services/subscriptions.py index deaeaabab8..cf919c6d03 100644 --- a/posts/services/subscriptions.py +++ b/posts/services/subscriptions.py @@ -187,7 +187,7 @@ def notify_post_cp_change(post: Post): difference = prediction_difference_for_sorting( old_forecast_values, current_forecast_values, - question=question, + question_type=question.type, ) if max_sorting_diff is None or difference > max_sorting_diff: max_sorting_diff = difference diff --git a/questions/admin.py b/questions/admin.py index e1eda749ef..1ad5aff6dd 100644 --- a/questions/admin.py +++ b/questions/admin.py @@ -6,6 +6,7 @@ from django.utils.html import format_html from django_better_admin_arrayfield.admin.mixins import DynamicArrayMixin +from questions.types import AggregationMethod from questions.models import ( AggregateForecast, Conditional, @@ -42,6 +43,7 @@ class QuestionAdmin(CustomTranslationAdmin, DynamicArrayMixin): "export_selected_questions_data_anonymized", "rebuild_aggregation_history", "trigger_scoring", + "trigger_scoring_with_all_aggregations", ] list_filter = [ "type", @@ -142,6 +144,24 @@ def trigger_scoring(self, request, queryset: QuerySet[Question]): trigger_scoring.short_description = "Trigger Scoring (does nothing if not resolved)" + def trigger_scoring_with_all_aggregations( + self, request, queryset: QuerySet[Question] + ): + from scoring.utils import score_question + + for question in queryset: + if question.resolution in ["", None, "ambiguous", "annulled"]: + continue + score_question( + question=question, + resolution=question.resolution, + aggregation_methods=list(AggregationMethod._member_map_.values()), + ) + + trigger_scoring_with_all_aggregations.short_description = ( + "Trigger Scoring (Includes ALL Aggregations) (does nothing if not resolved)" + ) + @admin.register(Conditional) class ConditionalAdmin(admin.ModelAdmin): diff --git a/questions/serializers/common.py b/questions/serializers/common.py index d6c67ddde3..a4b09c7f89 100644 --- a/questions/serializers/common.py +++ b/questions/serializers/common.py @@ -993,7 +993,7 @@ def serialize_question_movement( divergence = prediction_difference_for_sorting( f1.forecast_values, f2.forecast_values, - question, + question.type, ) if divergence >= threshold: diff --git a/questions/services.py b/questions/services.py index 5920544839..2fdfe45cde 100644 --- a/questions/services.py +++ b/questions/services.py @@ -171,7 +171,7 @@ def compute_question_movement(question: Question) -> float | None: return prediction_difference_for_sorting( cp_now.get_prediction_values(), cp_previous.get_prediction_values(), - question, + question.type, ) @@ -1035,7 +1035,7 @@ def calculate_user_forecast_movement_for_questions( divergence = prediction_difference_for_sorting( f1.forecast_values, f2.forecast_values, - question, + question.type, ) if divergence >= 0.25: diff --git a/questions/types.py b/questions/types.py index d2eb95797c..9d3338f7e3 100644 --- a/questions/types.py +++ b/questions/types.py @@ -6,3 +6,13 @@ class AggregationMethod(models.TextChoices): UNWEIGHTED = "unweighted" SINGLE_AGGREGATION = "single_aggregation" METACULUS_PREDICTION = "metaculus_prediction" + MEDALISTS = "medalists" + EXPERIENCED_USERS_25_RESOLVED = "experienced_users_25_resolved" + IGNORANCE = "ignorance" + RECENCY_WEIGHTED_LOG_ODDS = "recency_weighted_log_odds" + RECENCY_WEIGHTED_MEAN_NO_OUTLIERS = "recency_weighted_mean_no_outliers" + RECENCY_WEIGHTED_MEDALISTS = "recency_weighted_medalists" + RECENCY_WEIGHTED_EXPERIENCED_USERS_25_RESOLVED = ( + "recency_weighted_experienced_users_25_resolved" + ) + RECENCY_WEIGHTED_LOG_ODDS_NO_OUTLIERS = "recency_weighted_log_odds_no_outliers" diff --git a/scoring/reputation.py b/scoring/reputation.py deleted file mode 100644 index da936d5947..0000000000 --- a/scoring/reputation.py +++ /dev/null @@ -1,103 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from collections import defaultdict -from typing import Sequence - -from django.utils import timezone - -from questions.models import Question -from scoring.models import Score -from users.models import User - - -@dataclass -class Reputation: - user: User - value: float - time: datetime - - -def reputation_value(scores: Sequence[Score]) -> float: - return max( - sum([score.score for score in scores]) - / (30 + sum([score.coverage for score in scores])), - 1e-6, - ) - - -def get_reputation_at_time(user: User, time: datetime | None = None) -> Reputation: - """ - Returns the reputation of a user at a given time. - """ - if time is None: - time = timezone.now() - peer_scores = Score.objects.filter( - user=user, - score_type=Score.ScoreTypes.PEER, - question__in=Question.objects.filter_public(), - edited_at__lte=time, - ).distinct() - value = reputation_value(peer_scores) - return Reputation(user, value, time) - - -def get_reputations_at_time( - users: list[User], time: datetime | None = None -) -> list[Reputation]: - """ - Returns the reputations of a list of users at a given time. - """ - if time is None: - time = timezone.now() - peer_scores = Score.objects.filter( - user__in=users, - score_type=Score.ScoreTypes.PEER, - question__in=Question.objects.filter_public(), - edited_at__lte=time, - ).distinct() - user_scores: dict[User, list[Score]] = defaultdict(list) - for score in peer_scores: - user_scores[score.user].append(score) - reputations = [] - for user in users: - value = reputation_value(peer_scores) - reputations.append(Reputation(user, value, time)) - return reputations - - -def get_reputations_during_interval( - users: list[User], start: datetime, end: datetime | None = None -) -> dict[User, list[Reputation]]: - """returns a dict reputations. Each one is a record of what a particular - user's reputation was at a particular time. - The reputation can change during the interval.""" - if end is None: - end = timezone.now() - all_peer_scores = ( - Score.objects.filter( - user__in=users, - score_type=Score.ScoreTypes.PEER, - question__in=Question.objects.filter_public(), - edited_at__lte=end, - ) - .prefetch_related("user", "question") - .distinct() - ) - old_peer_scores = list( - all_peer_scores.filter(edited_at__lte=start).order_by("edited_at") - ) - new_peer_scores = list( - all_peer_scores.filter(edited_at__gt=start).order_by("edited_at") - ) - scores_by_user: dict[User, dict[Question, Score]] = defaultdict(dict) - for score in old_peer_scores: - scores_by_user[score.user][score.question] = score - reputations: dict[User, list[Reputation]] = defaultdict(list) - for user in users: - value = reputation_value(scores_by_user[user].values()) - reputations[user].append(Reputation(user, value, start)) - for score in new_peer_scores: - scores_by_user[score.user][score.question] = score - value = reputation_value(scores_by_user[user].values()) - reputations[user].append(Reputation(score.user, value, score.edited_at)) - return reputations diff --git a/scoring/score_math.py b/scoring/score_math.py index 6fd7aec07a..34fc5de0e4 100644 --- a/scoring/score_math.py +++ b/scoring/score_math.py @@ -1,8 +1,6 @@ from dataclasses import dataclass -from datetime import datetime import numpy as np -from django.db.models import Prefetch from scipy.stats.mstats import gmean from questions.models import ( @@ -13,38 +11,29 @@ ) from questions.types import AggregationMethod from scoring.models import Score -from users.models import User from utils.the_math.aggregations import get_aggregation_history @dataclass class AggregationEntry: - pmf: np.ndarray + pmf: np.ndarray | list[float] num_forecasters: int timestamp: float def get_geometric_means( forecasts: list[Forecast | AggregateForecast], - include_bots: bool = False, ) -> list[AggregationEntry]: - included_forecasts = forecasts - if not include_bots: - included_forecasts = [ - f - for f in forecasts - if (isinstance(f, AggregateForecast) or f.author.is_bot is False) - ] geometric_means = [] - timesteps: set[datetime] = set() - for forecast in included_forecasts: + timesteps: set[float] = set() + for forecast in forecasts: timesteps.add(forecast.start_time.timestamp()) if forecast.end_time: timesteps.add(forecast.end_time.timestamp()) for timestep in sorted(timesteps): prediction_values = [ f.get_pmf() - for f in included_forecasts + for f in forecasts if f.start_time.timestamp() <= timestep and (f.end_time is None or f.end_time.timestamp() > timestep) ] @@ -64,7 +53,7 @@ def get_medians( forecasts: list[Forecast | AggregateForecast], ) -> list[AggregationEntry]: medians = [] - timesteps: set[datetime] = set() + timesteps: set[float] = set() for forecast in forecasts: timesteps.add(forecast.start_time.timestamp()) if forecast.end_time: @@ -102,7 +91,7 @@ def evaluate_forecasts_baseline_accuracy( open_bounds_count: int, ) -> list[ForecastScore]: total_duration = forecast_horizon_end - forecast_horizon_start - forecast_scores: list[tuple[float, float]] = [] + forecast_scores: list[ForecastScore] = [] for forecast in forecasts: forecast_start = max(forecast.start_time.timestamp(), forecast_horizon_start) forecast_end = ( @@ -140,7 +129,7 @@ def evaluate_forecasts_baseline_spot_forecast( question_type: str, open_bounds_count: int, ) -> list[ForecastScore]: - forecast_scores: list[float] = [] + forecast_scores: list[ForecastScore] = [] for forecast in forecasts: start = forecast.start_time.timestamp() end = ( @@ -173,16 +162,13 @@ def evaluate_forecasts_peer_accuracy( forecast_horizon_end: float, question_type: str, geometric_means: list[AggregationEntry] | None = None, - include_bots_in_aggregates: bool = False, ) -> list[ForecastScore]: base_forecasts = base_forecasts or forecasts - geometric_mean_forecasts = geometric_means or get_geometric_means( - base_forecasts, include_bots_in_aggregates - ) + geometric_mean_forecasts = geometric_means or get_geometric_means(base_forecasts) for gm in geometric_mean_forecasts: gm.timestamp = max(gm.timestamp, forecast_horizon_start) total_duration = forecast_horizon_end - forecast_horizon_start - forecast_scores: list[float] = [] + forecast_scores: list[ForecastScore] = [] for forecast in forecasts: forecast_start = max(forecast.start_time.timestamp(), forecast_horizon_start) forecast_end = ( @@ -209,8 +195,8 @@ def evaluate_forecasts_peer_accuracy( else: interval_scores.append(None) - forecast_score = 0 - forecast_coverage = 0 + forecast_score: float = 0 + forecast_coverage: float = 0 times = [ gm.timestamp for gm in geometric_mean_forecasts @@ -234,12 +220,9 @@ def evaluate_forecasts_peer_spot_forecast( spot_forecast_timestamp: float, question_type: str, geometric_means: list[AggregationEntry] | None = None, - include_bots_in_aggregates: bool = False, ) -> list[ForecastScore]: base_forecasts = base_forecasts or forecasts - geometric_mean_forecasts = geometric_means or get_geometric_means( - base_forecasts, include_bots_in_aggregates - ) + geometric_mean_forecasts = geometric_means or get_geometric_means(base_forecasts) g = None for gm in geometric_mean_forecasts[::-1]: if gm.timestamp < spot_forecast_timestamp: @@ -248,7 +231,7 @@ def evaluate_forecasts_peer_spot_forecast( if g is None: return [ForecastScore(0)] * len(forecasts) - forecast_scores: list[float] = [] + forecast_scores: list[ForecastScore] = [] for forecast in forecasts: start = forecast.start_time.timestamp() end = ( @@ -285,7 +268,7 @@ def evaluate_forecasts_legacy_relative( for bf in base_forecasts ] total_duration = actual_close_time - forecast_horizon_start - forecast_scores: list[float] = [] + forecast_scores: list[ForecastScore] = [] for forecast in forecasts: forecast_start = max(forecast.start_time.timestamp(), forecast_horizon_start) forecast_end = ( @@ -306,8 +289,8 @@ def evaluate_forecasts_legacy_relative( else: interval_scores.append(None) - forecast_score = 0 - forecast_coverage = 0 + forecast_score: float = 0 + forecast_coverage: float = 0 times = [ bf.timestamp for bf in baseline_forecasts @@ -329,50 +312,67 @@ def evaluate_question( resolution_bucket: int | None, score_types: list[Score.ScoreTypes], spot_forecast_timestamp: float | None = None, + aggregation_methods: list[AggregationMethod] | None = None, + score_users: bool | list[int] = True, ) -> list[Score]: + aggregation_methods = aggregation_methods or [] + aggregations_to_calculate = aggregation_methods.copy() + if Score.ScoreTypes.RELATIVE_LEGACY in score_types and ( + AggregationMethod.RECENCY_WEIGHTED not in aggregations_to_calculate + ): + aggregations_to_calculate.append(AggregationMethod.RECENCY_WEIGHTED) if resolution_bucket is None: return [] forecast_horizon_start = question.open_time.timestamp() actual_close_time = question.actual_close_time.timestamp() forecast_horizon_end = question.scheduled_close_time.timestamp() - user_forecasts = ( - question.user_forecasts.all() - .prefetch_related(Prefetch("author", queryset=User.objects.only("is_bot"))) - .order_by("start_time") - ) - community_forecasts = get_aggregation_history( + # We need all user forecasts to calculated GeoMean even + # if we're only scoring some or none of the users + user_forecasts = question.user_forecasts.all() + if score_users is True: + scoring_user_forecasts = user_forecasts + elif not score_users: + scoring_user_forecasts = Forecast.objects.none() + else: # we have a list of user ids to score + scoring_user_forecasts = user_forecasts.filter(author_id__in=score_users) + if not question.include_bots_in_aggregates: + user_forecasts = user_forecasts.exclude(author__is_bot=True) + aggregations = get_aggregation_history( question, minimize=False, - aggregation_methods=[AggregationMethod.RECENCY_WEIGHTED], + aggregation_methods=aggregations_to_calculate, include_bots=question.include_bots_in_aggregates, - )[AggregationMethod.RECENCY_WEIGHTED] + include_stats=False, + ) + recency_weighted_aggregation = aggregations.get(AggregationMethod.RECENCY_WEIGHTED) geometric_means: list[AggregationEntry] = [] - ScoreTypes = Score.ScoreTypes - if ScoreTypes.PEER in score_types: - geometric_means = get_geometric_means( - user_forecasts, include_bots=question.include_bots_in_aggregates - ) + if Score.ScoreTypes.PEER in score_types: + geometric_means = get_geometric_means(user_forecasts) scores: list[Score] = [] for score_type in score_types: - match score_type: - case ScoreTypes.BASELINE: - open_bounds_count = bool(question.open_upper_bound) + bool( - question.open_lower_bound - ) - user_scores = evaluate_forecasts_baseline_accuracy( - user_forecasts, - resolution_bucket, - forecast_horizon_start, - actual_close_time, - forecast_horizon_end, - question.type, - open_bounds_count, - ) - community_scores = evaluate_forecasts_baseline_accuracy( - community_forecasts, + aggregation_scores: dict[AggregationMethod, list[Score | ForecastScore]] = ( + dict() + ) + if score_type == Score.ScoreTypes.BASELINE: + open_bounds_count = bool(question.open_upper_bound) + bool( + question.open_lower_bound + ) + user_scores = evaluate_forecasts_baseline_accuracy( + scoring_user_forecasts, + resolution_bucket, + forecast_horizon_start, + actual_close_time, + forecast_horizon_end, + question.type, + open_bounds_count, + ) + for method in aggregation_methods: + aggregation_forecasts = aggregations[method] + aggregation_scores[method] = evaluate_forecasts_baseline_accuracy( + aggregation_forecasts, resolution_bucket, forecast_horizon_start, actual_close_time, @@ -380,37 +380,41 @@ def evaluate_question( question.type, open_bounds_count, ) - case ScoreTypes.SPOT_BASELINE: - open_bounds_count = bool(question.open_upper_bound) + bool( - question.open_lower_bound - ) - user_scores = evaluate_forecasts_baseline_spot_forecast( - user_forecasts, - resolution_bucket, - spot_forecast_timestamp, - question.type, - open_bounds_count, - ) - community_scores = evaluate_forecasts_baseline_spot_forecast( - community_forecasts, + elif score_type == Score.ScoreTypes.SPOT_BASELINE: + open_bounds_count = bool(question.open_upper_bound) + bool( + question.open_lower_bound + ) + user_scores = evaluate_forecasts_baseline_spot_forecast( + scoring_user_forecasts, + resolution_bucket, + spot_forecast_timestamp, + question.type, + open_bounds_count, + ) + for method in aggregation_methods: + aggregation_forecasts = aggregations[method] + aggregation_scores[method] = evaluate_forecasts_baseline_spot_forecast( + aggregation_forecasts, resolution_bucket, spot_forecast_timestamp, question.type, open_bounds_count, ) - case ScoreTypes.PEER: - user_scores = evaluate_forecasts_peer_accuracy( - user_forecasts, - user_forecasts, - resolution_bucket, - forecast_horizon_start, - actual_close_time, - forecast_horizon_end, - question.type, - geometric_means=geometric_means, - ) - community_scores = evaluate_forecasts_peer_accuracy( - community_forecasts, + elif score_type == Score.ScoreTypes.PEER: + user_scores = evaluate_forecasts_peer_accuracy( + scoring_user_forecasts, + user_forecasts, + resolution_bucket, + forecast_horizon_start, + actual_close_time, + forecast_horizon_end, + question.type, + geometric_means=geometric_means, + ) + for method in aggregation_methods: + aggregation_forecasts = aggregations[method] + aggregation_scores[method] = evaluate_forecasts_peer_accuracy( + aggregation_forecasts, user_forecasts, resolution_bucket, forecast_horizon_start, @@ -419,46 +423,50 @@ def evaluate_question( question.type, geometric_means=geometric_means, ) - case ScoreTypes.SPOT_PEER: - user_scores = evaluate_forecasts_peer_spot_forecast( - user_forecasts, - user_forecasts, - resolution_bucket, - spot_forecast_timestamp, - question.type, - geometric_means=geometric_means, - ) - community_scores = evaluate_forecasts_peer_spot_forecast( - community_forecasts, + elif score_type == Score.ScoreTypes.SPOT_PEER: + user_scores = evaluate_forecasts_peer_spot_forecast( + scoring_user_forecasts, + user_forecasts, + resolution_bucket, + spot_forecast_timestamp, + question.type, + geometric_means=geometric_means, + ) + for method in aggregation_methods: + aggregation_forecasts = aggregations[method] + aggregation_scores[method] = evaluate_forecasts_peer_spot_forecast( + aggregation_forecasts, user_forecasts, resolution_bucket, spot_forecast_timestamp, question.type, geometric_means=geometric_means, ) - case ScoreTypes.RELATIVE_LEGACY: - user_scores = evaluate_forecasts_legacy_relative( - user_forecasts, - community_forecasts, - resolution_bucket, - forecast_horizon_start, - actual_close_time, - ) - community_scores = evaluate_forecasts_legacy_relative( - community_forecasts, - community_forecasts, + elif score_type == Score.ScoreTypes.RELATIVE_LEGACY: + user_scores = evaluate_forecasts_legacy_relative( + scoring_user_forecasts, + recency_weighted_aggregation, + resolution_bucket, + forecast_horizon_start, + actual_close_time, + ) + for method in aggregation_methods: + aggregation_forecasts = aggregations[method] + aggregation_scores[method] = evaluate_forecasts_legacy_relative( + aggregation_forecasts, + recency_weighted_aggregation, resolution_bucket, forecast_horizon_start, actual_close_time, ) - case other: - raise NotImplementedError(f"Score type {other} not implemented") + else: + raise NotImplementedError(f"Score type {score_type} not implemented") - user_ids = {forecast.author_id for forecast in user_forecasts} + user_ids = {forecast.author_id for forecast in scoring_user_forecasts} for user_id in user_ids: - user_score = 0 - user_coverage = 0 - for forecast, score in zip(user_forecasts, user_scores): + user_score: float = 0 + user_coverage: float = 0 + for forecast, score in zip(scoring_user_forecasts, user_scores): if forecast.author_id == user_id: user_score += score.score user_coverage += score.coverage @@ -471,18 +479,20 @@ def evaluate_question( score_type=score_type, ) ) - community_score = 0 - community_coverage = 0 - for score in community_scores: - community_score += score.score - community_coverage += score.coverage - scores.append( - Score( - user=None, - aggregation_method=AggregationMethod.RECENCY_WEIGHTED, - score=community_score, - coverage=community_coverage, - score_type=score_type, + for method in aggregation_methods: + aggregation_score: float = 0 + aggregation_coverage: float = 0 + community_scores = aggregation_scores[method] + for score in community_scores: + aggregation_score += score.score + aggregation_coverage += score.coverage + scores.append( + Score( + user=None, + aggregation_method=method, + score=aggregation_score, + coverage=aggregation_coverage, + score_type=score_type, + ) ) - ) return scores diff --git a/scoring/utils.py b/scoring/utils.py index ddc7cd6530..0d906b81c8 100644 --- a/scoring/utils.py +++ b/scoring/utils.py @@ -55,7 +55,12 @@ def score_question( resolution: str, spot_scoring_time: float | None = None, score_types: list[str] | None = None, + aggregation_methods: list[AggregationMethod] | None = None, + protect_uncalculated_scores: bool = False, + score_users: bool | list[int] = True, ): + if aggregation_methods is None: + aggregation_methods = [AggregationMethod.RECENCY_WEIGHTED] resolution_bucket = string_location_to_bucket_index(resolution, question) if not spot_scoring_time: if question.spot_scoring_time: @@ -74,23 +79,31 @@ def score_question( for score in previous_scores } new_scores = evaluate_question( - question, - resolution_bucket, - score_types, - spot_scoring_time, + question=question, + resolution_bucket=resolution_bucket, + score_types=score_types, + spot_forecast_timestamp=spot_scoring_time, + aggregation_methods=aggregation_methods, + score_users=score_users, ) + seen = set() for new_score in new_scores: previous_score_id = previous_scores_map.get( (new_score.user_id, new_score.aggregation_method, new_score.score_type) ) + if previous_score_id: + seen.add(previous_score_id) new_score.id = previous_score_id new_score.question = question new_score.edited_at = question.resolution_set_time with transaction.atomic(): - previous_scores.delete() + scores_to_delete = previous_scores + if protect_uncalculated_scores: + scores_to_delete = scores_to_delete.filter(id__in=seen) + scores_to_delete.delete() Score.objects.bulk_create(new_scores, batch_size=500) diff --git a/tests/unit/test_utils/test_aggregations.py b/tests/unit/test_utils/test_aggregations.py index 36c350d6ed..e5dcd7d180 100644 --- a/tests/unit/test_utils/test_aggregations.py +++ b/tests/unit/test_utils/test_aggregations.py @@ -1,7 +1,15 @@ import pytest import numpy as np +from datetime import datetime, timezone as dt_timezome -from utils.the_math.aggregations import summarize_array +from utils.the_math.aggregations import ( + summarize_array, + ForecastSet, + UnweightedAggregation, + RecencyWeightedAggregation, +) +from questions.types import AggregationMethod +from questions.models import Question, AggregateForecast @pytest.mark.parametrize( @@ -20,3 +28,360 @@ def test_summarize_array(array, max_size, expceted_array): # Check that the summarized list has the correct length assert np.allclose(summarized, expceted_array) + + +class TestAggregations: + + @pytest.mark.parametrize( + "init_params, forecast_set, include_stats, histogram, expected", + [ + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[[0.5, 0.5]], + timestep=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1], + timesteps=[datetime(2023, 1, 1, tzinfo=dt_timezome.utc)], + ), + False, + False, + AggregateForecast( + start_time=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.5, 0.5], + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[[0.5, 0.5]], + timestep=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1], + timesteps=[datetime(2023, 1, 1, tzinfo=dt_timezome.utc)], + ), + True, + True, + AggregateForecast( + start_time=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.5, 0.5], + forecaster_count=1, + interval_lower_bounds=[0.5, 0.5], + centers=[0.5, 0.5], + interval_upper_bounds=[0.5, 0.5], + means=None, + histogram=[0] * 50 + [1] + [0] * 49, + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[ + [0.2, 0.8], + [0.4, 0.6], + ], + timestep=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1, 2], + timesteps=[ + datetime(2022, 1, 1, tzinfo=dt_timezome.utc), + datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + ], + ), + False, + False, + AggregateForecast( + start_time=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.3, 0.7], + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[ + [0.2, 0.8], + [0.4, 0.6], + ], + timestep=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1, 2], + timesteps=[ + datetime(2022, 1, 1, tzinfo=dt_timezome.utc), + datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + ], + ), + True, + True, + AggregateForecast( + start_time=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.3, 0.7], + forecaster_count=2, + interval_lower_bounds=[0.2, 0.6], + centers=[0.3, 0.7], + interval_upper_bounds=[0.4, 0.8], + means=None, + histogram=[0] * 60 + [1] + [0] * 19 + [1] + [0] * 19, + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[ + [0.2, 0.8], + [0.3, 0.7], + [0.4, 0.6], + ], + timestep=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1, 2, 3], + timesteps=[ + datetime(2021, 1, 1, tzinfo=dt_timezome.utc), + datetime(2022, 1, 1, tzinfo=dt_timezome.utc), + datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + ], + ), + True, + False, + AggregateForecast( + start_time=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.3, 0.7], + forecaster_count=3, + interval_lower_bounds=[0.2, 0.6], + centers=[0.3, 0.7], + interval_upper_bounds=[0.4, 0.8], + means=None, + histogram=None, + ), + ), + ], + ) + def test_UnweightedAggregation( + self, + init_params: dict, + forecast_set: ForecastSet, + include_stats: bool, + histogram: bool, + expected: AggregateForecast, + ): + aggregation = UnweightedAggregation(**init_params) + new_aggregation = aggregation.calculate_aggregation_entry( + forecast_set, include_stats, histogram + ) + + assert new_aggregation.start_time == expected.start_time + assert ( + new_aggregation.forecast_values == expected.forecast_values + ) or np.allclose(new_aggregation.forecast_values, expected.forecast_values) + assert new_aggregation.forecaster_count == expected.forecaster_count + assert ( + new_aggregation.interval_lower_bounds == expected.interval_lower_bounds + ) or np.allclose( + new_aggregation.interval_lower_bounds, expected.interval_lower_bounds + ) + assert (new_aggregation.centers == expected.centers) or np.allclose( + new_aggregation.centers, expected.centers + ) + assert ( + new_aggregation.interval_upper_bounds == expected.interval_upper_bounds + ) or np.allclose( + new_aggregation.interval_upper_bounds, expected.interval_upper_bounds + ) + assert (new_aggregation.means == expected.means) or np.allclose( + new_aggregation.means, expected.means + ) + assert (new_aggregation.histogram == expected.histogram) or np.allclose( + new_aggregation.histogram, expected.histogram + ) + + @pytest.mark.parametrize( + "init_params, forecast_set, include_stats, histogram, expected", + [ + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[[0.5, 0.5]], + timestep=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1], + timesteps=[datetime(2023, 1, 1, tzinfo=dt_timezome.utc)], + ), + False, + False, + AggregateForecast( + start_time=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.5, 0.5], + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[[0.5, 0.5]], + timestep=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1], + timesteps=[datetime(2023, 1, 1, tzinfo=dt_timezome.utc)], + ), + True, + True, + AggregateForecast( + start_time=datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.5, 0.5], + forecaster_count=1, + interval_lower_bounds=[0.5, 0.5], + centers=[0.5, 0.5], + interval_upper_bounds=[0.5, 0.5], + means=None, + histogram=[0] * 50 + [1] + [0] * 49, + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[ + [0.2, 0.8], + [0.4, 0.6], + ], + timestep=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1, 2], + timesteps=[ + datetime(2022, 1, 1, tzinfo=dt_timezome.utc), + datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + ], + ), + False, + False, + AggregateForecast( + start_time=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.3, 0.7], + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[ + [0.2, 0.8], + [0.4, 0.6], + ], + timestep=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1, 2], + timesteps=[ + datetime(2022, 1, 1, tzinfo=dt_timezome.utc), + datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + ], + ), + True, + True, + AggregateForecast( + start_time=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.3, 0.7], + forecaster_count=2, + interval_lower_bounds=[0.2, 0.6], + centers=[0.3, 0.7], + interval_upper_bounds=[0.4, 0.8], + means=None, + histogram=[0] * 60 + [1] + [0] * 19 + [1] + [0] * 19, + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[ + [0.2, 0.8], + [0.3, 0.7], + [0.4, 0.6], + ], + timestep=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1, 2, 3], + timesteps=[ + datetime(2021, 1, 1, tzinfo=dt_timezome.utc), + datetime(2022, 1, 1, tzinfo=dt_timezome.utc), + datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + ], + ), + True, + False, + AggregateForecast( + start_time=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.3, 0.7], + forecaster_count=3, + interval_lower_bounds=[0.3, 0.6], + centers=[0.3, 0.7], + interval_upper_bounds=[0.4, 0.7], + means=None, + histogram=None, + ), + ), + ( + {"question_type": Question.QuestionType.BINARY}, + ForecastSet( + forecasts_values=[ + [0.2, 0.8], + [0.3, 0.7], + [0.5, 0.5], + [0.4, 0.6], + ], + timestep=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + user_ids=[1, 2, 3, 4], + timesteps=[ + datetime(2021, 1, 1, tzinfo=dt_timezome.utc), + datetime(2022, 1, 1, tzinfo=dt_timezome.utc), + datetime(2023, 1, 1, tzinfo=dt_timezome.utc), + datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + ], + ), + True, + False, + AggregateForecast( + start_time=datetime(2024, 1, 1, tzinfo=dt_timezome.utc), + method=AggregationMethod.UNWEIGHTED, + forecast_values=[0.4, 0.6], + forecaster_count=4, + interval_lower_bounds=[0.3, 0.5], + centers=[0.4, 0.6], + interval_upper_bounds=[0.5, 0.7], + means=None, + histogram=None, + ), + ), + ], + ) + def test_RecencyWeightedAggregation( + self, + init_params: dict, + forecast_set: ForecastSet, + include_stats: bool, + histogram: bool, + expected: AggregateForecast, + ): + aggregation = RecencyWeightedAggregation(**init_params) + new_aggregation = aggregation.calculate_aggregation_entry( + forecast_set, include_stats, histogram + ) + + assert new_aggregation.start_time == expected.start_time + assert ( + new_aggregation.forecast_values == expected.forecast_values + ) or np.allclose(new_aggregation.forecast_values, expected.forecast_values) + assert new_aggregation.forecaster_count == expected.forecaster_count + assert ( + new_aggregation.interval_lower_bounds == expected.interval_lower_bounds + ) or np.allclose( + new_aggregation.interval_lower_bounds, expected.interval_lower_bounds + ) + assert (new_aggregation.centers == expected.centers) or np.allclose( + new_aggregation.centers, expected.centers + ) + assert ( + new_aggregation.interval_upper_bounds == expected.interval_upper_bounds + ) or np.allclose( + new_aggregation.interval_upper_bounds, expected.interval_upper_bounds + ) + assert (new_aggregation.means == expected.means) or np.allclose( + new_aggregation.means, expected.means + ) + assert (new_aggregation.histogram == expected.histogram) or np.allclose( + new_aggregation.histogram, expected.histogram + ) diff --git a/tests/unit/test_utils/test_measures.py b/tests/unit/test_utils/test_measures.py index 2a6f6d336d..e3aff4da9b 100644 --- a/tests/unit/test_utils/test_measures.py +++ b/tests/unit/test_utils/test_measures.py @@ -207,7 +207,7 @@ def test_percent_point_function(cdf, percentiles, expected_result): ], ) def test_prediction_difference_for_sorting(p1, p2, question, expected_result): - result = prediction_difference_for_sorting(p1, p2, question) + result = prediction_difference_for_sorting(p1, p2, question.type) assert np.isclose(result, expected_result) diff --git a/utils/the_math/aggregations.py b/utils/the_math/aggregations.py index 0df22f11d0..a432d34c23 100644 --- a/utils/the_math/aggregations.py +++ b/utils/the_math/aggregations.py @@ -11,11 +11,14 @@ from bisect import bisect_left, bisect_right from dataclasses import dataclass +from collections import defaultdict from datetime import datetime, timedelta, timezone +from typing import Sequence import numpy as np -from django.db.models import Q +from django.db.models import F, Q, QuerySet +from projects.permissions import ObjectPermission from questions.models import ( QUESTION_CONTINUOUS_TYPES, Question, @@ -23,13 +26,13 @@ AggregateForecast, ) from questions.types import AggregationMethod -from scoring.reputation import ( - get_reputations_at_time, - get_reputations_during_interval, - Reputation, -) +from scoring.models import Score, LeaderboardEntry from users.models import User -from utils.the_math.measures import weighted_percentile_2d, percent_point_function +from utils.the_math.measures import ( + weighted_percentile_2d, + percent_point_function, + prediction_difference_for_sorting, +) from utils.typing import ( ForecastValues, ForecastsValues, @@ -38,8 +41,16 @@ ) +@dataclass +class ForecastSet: + forecasts_values: ForecastsValues + timestep: datetime + user_ids: list[int] = list + timesteps: list[datetime] = list + + def get_histogram( - values: ForecastValues, + values: ForecastsValues, weights: Weights | None, question_type: Question.QuestionType = Question.QuestionType.BINARY, ) -> np.ndarray: @@ -48,15 +59,12 @@ def get_histogram( return np.zeros(100) if weights is None: weights = np.ones(len(values)) - transposed_values = values.T - if question_type == Question.QuestionType.BINARY: histogram = np.zeros(100) for p, w in zip(transposed_values[1], weights): histogram[int(p * 100)] += w return histogram - histogram = np.zeros((len(values[0]), 100)) for forecast_values, w in zip(values, weights): for i, p in enumerate(forecast_values): @@ -86,9 +94,12 @@ def compute_discrete_forecast_values( def compute_weighted_semi_standard_deviations( forecasts_values: ForecastsValues, - weights: Weights, + weights: Weights | None, ) -> tuple[ForecastValues, ForecastValues]: """returns the upper and lower standard_deviations""" + forecasts_values = np.array(forecasts_values) + if weights is None: + weights = np.ones(forecasts_values.shape[0]) average = np.average(forecasts_values, axis=0, weights=weights) lower_semivariances = np.zeros(forecasts_values.shape[1]) upper_semivariances = np.zeros(forecasts_values.shape[1]) @@ -109,109 +120,635 @@ def compute_weighted_semi_standard_deviations( return np.sqrt(lower_semivariances), np.sqrt(upper_semivariances) -@dataclass -class ForecastSet: - forecasts_values: ForecastsValues - timestep: datetime - users: list[User] = list - timesteps: list[datetime] = list +class Weighting: + def get_weights(self, forecast_set: ForecastSet) -> np.ndarray | None: + """Combine all weights multiplicatively""" + mro = type(self).__mro__ + weights = None + for klass in mro: + new_weights = None + if "calculate_weights" in klass.__dict__: + new_weights = klass.calculate_weights(self, forecast_set) + if weights is None: + weights = new_weights + elif new_weights is not None: + weights = weights * new_weights + if weights is None or np.all(weights == 0): + return None + if len(forecast_set.forecasts_values) != weights.shape[0]: + weights = weights[0] + return weights -def calculate_aggregation_entry( - forecast_set: ForecastSet, - question_type: str, - weights: Weights, - method: AggregationMethod, - include_stats: bool = False, - histogram: bool = False, -) -> AggregateForecast: - weights = np.array(weights) if weights is not None else None - if ( - question_type in QUESTION_CONTINUOUS_TYPES - or method == AggregationMethod.SINGLE_AGGREGATION - ): - aggregation = AggregateForecast( - forecast_values=np.average( - forecast_set.forecasts_values, axis=0, weights=weights - ).tolist() - ) - elif question_type == Question.QuestionType.BINARY: - aggregation = AggregateForecast( - forecast_values=compute_discrete_forecast_values( - forecast_set.forecasts_values, weights, 50.0 - )[0] - ) - else: # multiple_choice - medians = np.array( - compute_discrete_forecast_values( - forecast_set.forecasts_values, weights, 50.0 - )[0] - ) - floored_medians = medians - 0.001 - normalized_floored_medians = floored_medians / sum(floored_medians) - normalized_medians = ( - normalized_floored_medians * (1 - len(medians) * 0.001) + 0.001 +class Unweighted(Weighting): + """No special weighting""" + + def calculate_weights(self, forecast_set: ForecastSet) -> np.ndarray | None: + return None + + +class RecencyWeighted(Weighting): + """Applies a recency weighting to the forecasts equal to: + e^(sqrt(n) - sqrt(N)) where n is the nth forecast of the N active forecasts, + ordered by start_time ascending.""" + + def calculate_weights(self, forecast_set: ForecastSet) -> np.ndarray | None: + number_of_forecasts = len(forecast_set.forecasts_values) + if number_of_forecasts <= 2: + return None + return np.exp( + np.sqrt(np.arange(number_of_forecasts) + 1) - np.sqrt(number_of_forecasts) ) - aggregation = AggregateForecast(forecast_values=normalized_medians.tolist()) - if include_stats: - forecasts_values = np.array(forecast_set.forecasts_values) - aggregation.start_time = forecast_set.timestep - aggregation.forecaster_count = len(forecast_set.forecasts_values) - if question_type in [ - Question.QuestionType.BINARY, - Question.QuestionType.MULTIPLE_CHOICE, - ]: - if method == AggregationMethod.SINGLE_AGGREGATION: - centers = aggregation.forecast_values - lowers_sd, uppers_sd = compute_weighted_semi_standard_deviations( - forecasts_values, weights - ) - lowers = (np.array(centers) - lowers_sd).tolist() - uppers = (np.array(centers) + uppers_sd).tolist() - else: - lowers, centers, uppers = compute_discrete_forecast_values( - forecast_set.forecasts_values, weights, [25.0, 50.0, 75.0] - ) - if question_type == Question.QuestionType.MULTIPLE_CHOICE: - centers_array = np.array(centers) - normalized_centers = np.array(aggregation.forecast_values) - normalized_lowers = ( - np.array(lowers) * normalized_centers / centers_array - ) - normalized_uppers = ( - np.array(uppers) * normalized_centers / centers_array - ) - centers = normalized_centers.tolist() - lowers = normalized_lowers.tolist() - uppers = normalized_uppers.tolist() + +class MedianValues: + """Takes the median of the forecasts values for Binary and MC, mean for continuous""" + + def __init__(self, *args, question_type: Question.QuestionType, **kwargs): + super().__init__(*args, question_type=question_type, **kwargs) + self.question_type = question_type + + def calculate_forecast_values( + self, forecast_set: ForecastSet, weights: np.ndarray | None = None + ) -> np.ndarray: + # Default Aggregation method uses weighted medians for binary and MC questions + # and weighted average for continuous + if self.question_type == Question.QuestionType.BINARY: + return np.array( + compute_discrete_forecast_values( + forecast_set.forecasts_values, weights, 50.0 + )[0] + ) + elif self.question_type == Question.QuestionType.MULTIPLE_CHOICE: + medians = np.array( + compute_discrete_forecast_values( + forecast_set.forecasts_values, weights, 50.0 + )[0] + ) + floored_medians = medians - 0.001 + normalized_floored_medians = floored_medians / sum(floored_medians) + return normalized_floored_medians * (1 - len(medians) * 0.001) + 0.001 + else: # continuous + return np.average(forecast_set.forecasts_values, axis=0, weights=weights) + + def get_range_values( + self, + forecast_set: ForecastSet, + aggregation_forecast_values: ForecastValues, + weights: np.ndarray | None = None, + ): + if self.question_type == Question.QuestionType.BINARY: + lowers, centers, uppers = compute_discrete_forecast_values( + forecast_set.forecasts_values, weights, [25.0, 50.0, 75.0] + ) + elif self.question_type == Question.QuestionType.MULTIPLE_CHOICE: + lowers, centers, uppers = compute_discrete_forecast_values( + forecast_set.forecasts_values, weights, [25.0, 50.0, 75.0] + ) + centers_array = np.array(centers) + normalized_centers = np.array(aggregation_forecast_values) + normalized_lowers = np.array(lowers) * normalized_centers / centers_array + normalized_uppers = np.array(uppers) * normalized_centers / centers_array + centers = normalized_centers.tolist() + lowers = normalized_lowers.tolist() + uppers = normalized_uppers.tolist() + else: # continuous + lowers, centers, uppers = percent_point_function( + aggregation_forecast_values, [25.0, 50.0, 75.0] + ) + lowers = [lowers] + centers = [centers] + uppers = [uppers] + return lowers, centers, uppers + + +class MeanValues: + """Takes the mean of the forecast values""" + + def __init__(self, *args, question_type: Question.QuestionType, **kwargs): + super().__init__(*args, question_type=question_type, **kwargs) + self.question_type = question_type + + def calculate_forecast_values( + self, forecast_set: ForecastSet, weights: np.ndarray | None = None + ) -> np.ndarray: + return np.average(forecast_set.forecasts_values, axis=0, weights=weights) + + def get_range_values( + self, + forecast_set: ForecastSet, + aggregation_forecast_values: ForecastValues, + weights: np.ndarray | None = None, + ): + if self.question_type in QUESTION_CONTINUOUS_TYPES: + lowers, centers, uppers = percent_point_function( + aggregation_forecast_values, [25.0, 50.0, 75.0] + ) + lowers = [lowers] + centers = [centers] + uppers = [uppers] else: + centers = aggregation_forecast_values + lowers_sd, uppers_sd = compute_weighted_semi_standard_deviations( + forecast_set.forecasts_values, weights + ) + lowers = (np.array(centers) - lowers_sd).tolist() + uppers = (np.array(centers) + uppers_sd).tolist() + return lowers, centers, uppers + + +class LogOddsMeanValues: + """Takes the mean of the forecast values""" + + def __init__(self, *args, question_type: Question.QuestionType, **kwargs): + super().__init__(*args, question_type=question_type, **kwargs) + self.question_type = question_type + + def calculate_forecast_values( + self, forecast_set: ForecastSet, weights: np.ndarray | None = None + ) -> np.ndarray: + log_odds = np.log( + np.array(forecast_set.forecasts_values) + / (1 - np.array(forecast_set.forecasts_values)) + ) + average_log_odds = np.average(log_odds, axis=0, weights=weights) + average_odds = np.exp(average_log_odds) + average = average_odds / (1 + average_odds) + # First and last cdf value of continuous questions with closed bounds get + # screwed up by the logodds transformation, just replace them with + # set values 0 and 1 + if np.isnan(average[0]): + average[0] = 0 + if np.isnan(average[-1]): + average[-1] = 1 + return average + + def get_range_values( + self, + forecast_set: ForecastSet, + aggregation_forecast_values: ForecastValues, + weights: np.ndarray | None = None, + ): + if self.question_type in QUESTION_CONTINUOUS_TYPES: lowers, centers, uppers = percent_point_function( - aggregation.forecast_values, [25.0, 50.0, 75.0] + aggregation_forecast_values, [25.0, 50.0, 75.0] ) lowers = [lowers] centers = [centers] uppers = [uppers] - aggregation.interval_lower_bounds = lowers - aggregation.centers = centers - aggregation.interval_upper_bounds = uppers - if question_type in [ + else: + centers = aggregation_forecast_values + lowers_sd, uppers_sd = compute_weighted_semi_standard_deviations( + forecast_set.forecasts_values, weights + ) + lowers = (np.array(centers) - lowers_sd).tolist() + uppers = (np.array(centers) + uppers_sd).tolist() + return lowers, centers, uppers + + +@dataclass +class Reputation: + user_id: int + value: float + time: datetime + + +class ReputationWeighted(Weighting): + reputations: dict[int, list[Reputation]] + question: Question + + def __init__( + self, + *args, + question: Question | None = None, + user_ids: list[int] | None = None, + **kwargs, + ): + super().__init__(*args, question=question, user_ids=user_ids, **kwargs) + if question is None or user_ids is None: + raise ValueError("question and user_ids must be provided") + self.question = question + self.reputations = self.get_reputation_history(user_ids) + + def get_reputation_history( + self, user_ids: list[int] + ) -> dict[int, list[Reputation]]: + """implement in subclass + returns a dict of reputations. Each one is a record of what a particular + user's reputation was at a particular time. + The reputation can change during the interval.""" + start = self.question.open_time + return {user_id: [Reputation(user_id, 1, start)] for user_id in user_ids} + + def get_reputations(self, forecast_set: ForecastSet) -> list[Reputation]: + reps = [] + for user_id in forecast_set.user_ids: + found = False + for reputation in self.reputations[user_id][::-1]: + if reputation.time <= forecast_set.timestep: + reps.append(reputation) + found = True + break + if not found: + # no reputation -> no weight + reps.append(Reputation(user_id, 0, forecast_set.timestep)) + return reps + + def calculate_weights(self, forecast_set: ForecastSet) -> np.ndarray | None: + reps = self.get_reputations(forecast_set) + return np.array([reputation.value for reputation in reps]) + + +class NoOutliers: + + question_type: Question.QuestionType + + def __init__(self, *args, question_type: Question.QuestionType, **kwargs): + super().__init__(*args, question_type=question_type, **kwargs) + self.question_type = question_type + + def calculate_weights(self, forecast_set: ForecastSet) -> np.ndarray | None: + forecasts_values = forecast_set.forecasts_values + if len(forecasts_values) <= 2: + return None + average_forecast = np.average(forecasts_values, axis=0) + distances = [] + for forecast in forecasts_values: + distances.append( + prediction_difference_for_sorting( + np.array(forecast), + average_forecast, + self.question_type, + ) + ) + mask = distances <= np.percentile(distances, 80, axis=0) + return mask + + +class Aggregation(Unweighted, MedianValues): + """ + Base class for specific aggregation methods to inherit from. + Unweighted + Median + """ + + question_type: Question.QuestionType + method: AggregationMethod + + def __init__(self, *args, question_type: Question.QuestionType, **kwargs): + self.question_type = question_type + + def calculate_aggregation_entry( + self, + forecast_set: ForecastSet, + include_stats: bool = False, + histogram: bool = False, + ) -> AggregateForecast: + weights = self.get_weights(forecast_set) + aggregation = AggregateForecast() + aggregation.forecast_values = self.calculate_forecast_values( + forecast_set, weights + ).tolist() + + aggregation.start_time = forecast_set.timestep + if include_stats: + aggregation.forecaster_count = len(forecast_set.forecasts_values) + lowers, centers, uppers = self.get_range_values( + forecast_set, aggregation.forecast_values, weights + ) + aggregation.interval_lower_bounds = lowers + aggregation.centers = centers + aggregation.interval_upper_bounds = uppers + + if histogram and self.question_type in [ Question.QuestionType.BINARY, Question.QuestionType.MULTIPLE_CHOICE, ]: - aggregation.means = np.average( - forecast_set.forecasts_values, weights=weights, axis=0 + aggregation.histogram = get_histogram( + forecast_set.forecasts_values, + weights, + question_type=self.question_type, ).tolist() - if histogram and question_type in [ - Question.QuestionType.BINARY, - Question.QuestionType.MULTIPLE_CHOICE, - ]: - aggregation.histogram = get_histogram( - forecast_set.forecasts_values, - weights, - question_type=question_type, - ).tolist() - return aggregation + + return aggregation + + +class UnweightedAggregation(Aggregation): + """ + unweighted + median + """ + + method = AggregationMethod.UNWEIGHTED + + +class RecencyWeightedAggregation(RecencyWeighted, Aggregation): + """ + recency weighted + median + """ + + method = AggregationMethod.RECENCY_WEIGHTED + + +class SingleAggregation(MeanValues, ReputationWeighted, Aggregation): + """ + Custom Reputation weighting + mean + """ + + method = AggregationMethod.SINGLE_AGGREGATION + reputations: dict[int, list[Reputation]] + question: Question + + @staticmethod + def reputation_value(scores: Sequence[Score]) -> float: + return max( + sum([score.score for score in scores]) + / (30 + sum([score.coverage for score in scores])), + 1e-6, + ) + + def get_reputation_history( + self, user_ids: list[int] + ) -> dict[int, list[Reputation]]: + + start = self.question.open_time + end = self.question.scheduled_close_time + if end is None: + end = timezone.now() + peer_scores = Score.objects.filter( + user_id__in=user_ids, + score_type=Score.ScoreTypes.PEER, + question__in=Question.objects.filter_public(), + edited_at__lte=end, + ).distinct() + + # setup + scores_by_user: dict[int, dict[int, Score]] = defaultdict(dict) + reputations: dict[int, list[Reputation]] = defaultdict(list) + + # Establish reputations at the start of the interval. + old_peer_scores = list( + peer_scores.filter(edited_at__lte=start).order_by("edited_at") + ) + for score in old_peer_scores: + scores_by_user[score.user_id][score.question_id] = score + for user_id in user_ids: + value = self.reputation_value(scores_by_user[user_id].values()) + reputations[user_id].append(Reputation(user_id, value, start)) + + # Then, for each new score, add a new reputation record + new_peer_scores = list( + peer_scores.filter(edited_at__gt=start).order_by("edited_at") + ) + for score in new_peer_scores: + # update the scores by user, then calculate the updated reputation + scores_by_user[score.user_id][score.question_id] = score + value = self.reputation_value(scores_by_user[score.user_id].values()) + reputations[score.user_id].append( + Reputation(score.user_id, value, score.edited_at) + ) + return reputations + + def get_weights(self, forecast_set: ForecastSet) -> np.ndarray | None: + # This is a custom weighting scheme via learned paramaeters + # so not implemented with "calculate_weights" + reps = self.get_reputations(forecast_set) + # TODO: make these learned parameters + a = 0.5 + b = 6.0 + decays = [ + np.exp( + -(forecast_set.timestep - start_time) + / (self.question.scheduled_close_time - self.question.open_time) + ) + for start_time in forecast_set.timesteps + ] + weights = np.array( + [ + (decay**a * reputation.value ** (1 - a)) ** b + for decay, reputation in zip(decays, reps) + ] + ) + if all(weights == 0): + return None + return weights if weights.size else None + + +class MedalistsAggregation(ReputationWeighted, Aggregation): + """ + unweighted + median + only medalists + """ + + reputations: dict[int, list[Reputation]] + question: Question + method = AggregationMethod.MEDALISTS + + def get_reputation_history( + self, user_ids: list[int] + ) -> dict[int, list[Reputation]]: + """returns a dict reputations. Each one is a record of what a particular + user's reputation was at a particular time. + The reputation can change during the interval.""" + start = self.question.open_time + end = self.question.scheduled_close_time + if end is None: + end = timezone.now() + medals = ( + LeaderboardEntry.objects.filter( + user_id__in=user_ids, + medal__isnull=False, + leaderboard__project__default_permission=ObjectPermission.FORECASTER, + ) + .annotate(set_time=F("leaderboard__finalize_time")) + .filter(set_time__lte=end) + .order_by("set_time") + ) + + # setup + reputations: dict[int, list[Reputation]] = defaultdict(list) + + # Establish initial reputations at the start of the interval. + old_medals = list(medals.filter(set_time__lte=start).order_by("set_time")) + for medal in old_medals: + user_id = medal.user_id + reputations[user_id] = [Reputation(user_id, 1, start)] + for user_id in user_ids: + if user_id not in reputations: + reputations[user_id] = [Reputation(user_id, 0, start)] + # Then, for each new medal, add a new reputation record + new_medals = list(medals.filter(set_time__gt=start).order_by("set_time")) + for medal in new_medals: + user_id = medal.user_id + if reputations[user_id][-1].value == 0: + reputations[user_id].append( + Reputation(user_id, 1, medal.edited_at or medal.created_at) + ) + return reputations + + +class Experienced25ResolvedAggregation(ReputationWeighted, Aggregation): + """ + unweighted + median + only forecasters with at least 25 Resolved + """ + + reputations: dict[int, list[Reputation]] + question: Question + method = AggregationMethod.MEDALISTS + + def get_reputation_history( + self, user_ids: list[int] + ) -> dict[int, list[Reputation]]: + """returns a dict reputations. Each one is a record of what a particular + user's reputation was at a particular time. + The reputation can change during the interval.""" + start = self.question.open_time + end = self.question.scheduled_close_time + if end is None: + end = timezone.now() + peer_scores = ( + Score.objects.filter( + user_id__in=user_ids, + score_type=Score.ScoreTypes.PEER, + question__in=Question.objects.filter_public(), + ) + .annotate(set_time=F("question__actual_resolve_time")) + .order_by("set_time") + .filter(set_time__lte=end) + .distinct() + ) + + # setup + resolved_per_user: dict[int, int] = defaultdict(int) + reputations: dict[int, list[Reputation]] = defaultdict(list) + + # Establish reputations at the start of the interval. + old_peer_scores = list( + peer_scores.filter(set_time__lte=start).order_by("set_time") + ) + for score in old_peer_scores: + resolved_per_user[score.user_id] += 1 + for user_id in user_ids: + reputations[user_id].append( + Reputation(user_id, 1 if resolved_per_user[user_id] >= 25 else 0, start) + ) + + # Then, for each new score, add a new reputation record + new_peer_scores = list( + peer_scores.filter(set_time__gt=start).order_by("set_time") + ) + for score in new_peer_scores: + # update the scores by user, then calculate the updated reputation + resolved_per_user[score.user_id] += 1 + reputations[score.user_id].append( + Reputation( + score.user_id, + 1 if resolved_per_user[score.user_id] >= 25 else 0, + score.set_time, + ) + ) + return reputations + + +class IgnoranceAggregation(Aggregation): + """ + always returns ignorance values + """ + + method = AggregationMethod.IGNORANCE + + def __init__(self, *args, question_type: Question.QuestionType, **kwargs): + super().__init__(*args, question_type=question_type, **kwargs) + self.question_type = question_type + + def calculate_forecast_values( + self, forecast_set: ForecastSet, weights: np.ndarray | None = None + ) -> np.ndarray: + values_count = len(forecast_set.forecasts_values[0]) + if self.question_type in QUESTION_CONTINUOUS_TYPES: + # prediction is a CDF + return np.linspace(0.05, 0.95, values_count) + else: + return np.ones_like(forecast_set.forecasts_values[0]) / values_count + + +class RecencyWeightedLogOddsAggregation( + LogOddsMeanValues, RecencyWeighted, Aggregation +): + """ + recency weighted + log odds mean + """ + + method = AggregationMethod.RECENCY_WEIGHTED_LOG_ODDS + + +class RecencyWeightedMeanNoOutliersAggregation( + NoOutliers, MeanValues, RecencyWeightedAggregation +): + """ + recency weighted + mean + remove 10% outliers + """ + + method = AggregationMethod.RECENCY_WEIGHTED_MEAN_NO_OUTLIERS + + +class RecencyWeightedMedalistsAggregation(RecencyWeighted, MedalistsAggregation): + """ + recency weighted + median + only medalists + """ + + method = AggregationMethod.RECENCY_WEIGHTED_MEDALISTS + + +class RecencyWeightedExperienced25ResolvedAggregation( + RecencyWeighted, Experienced25ResolvedAggregation +): + """ + recency weighted + median + only forecasters with at least 25 Resolved + """ + + method = AggregationMethod.RECENCY_WEIGHTED_EXPERIENCED_USERS_25_RESOLVED + + +class RecencyWeightedLogOddsNoOutliersAggregation( + LogOddsMeanValues, RecencyWeightedMeanNoOutliersAggregation +): + """ + recency weighted + log odds mean + remove 10% outliers + """ + + method = AggregationMethod.RECENCY_WEIGHTED_LOG_ODDS_NO_OUTLIERS + + +aggregation_method_map: dict[AggregationMethod, type[Aggregation]] = { + AggregationMethod.UNWEIGHTED: UnweightedAggregation, + AggregationMethod.RECENCY_WEIGHTED: RecencyWeightedAggregation, + AggregationMethod.SINGLE_AGGREGATION: SingleAggregation, + AggregationMethod.MEDALISTS: MedalistsAggregation, + AggregationMethod.EXPERIENCED_USERS_25_RESOLVED: Experienced25ResolvedAggregation, + AggregationMethod.IGNORANCE: IgnoranceAggregation, + AggregationMethod.RECENCY_WEIGHTED_LOG_ODDS: RecencyWeightedLogOddsAggregation, + AggregationMethod.RECENCY_WEIGHTED_MEAN_NO_OUTLIERS: RecencyWeightedMeanNoOutliersAggregation, + AggregationMethod.RECENCY_WEIGHTED_MEDALISTS: RecencyWeightedMedalistsAggregation, + AggregationMethod.RECENCY_WEIGHTED_EXPERIENCED_USERS_25_RESOLVED: ( + RecencyWeightedExperienced25ResolvedAggregation + ), + AggregationMethod.RECENCY_WEIGHTED_LOG_ODDS_NO_OUTLIERS: RecencyWeightedLogOddsNoOutliersAggregation, +} def get_aggregations_at_time( @@ -241,30 +778,19 @@ def get_aggregations_at_time( forecast_set = ForecastSet( forecasts_values=[forecast.get_prediction_values() for forecast in forecasts], timestep=time, - users=[forecast.author for forecast in forecasts], + user_ids=[forecast.author_id for forecast in forecasts], timesteps=[forecast.start_time for forecast in forecasts], ) aggregations: dict[AggregationMethod, AggregateForecast] = dict() for method in aggregation_methods: - match method: - case AggregationMethod.RECENCY_WEIGHTED: - weights = generate_recency_weights(len(forecast_set.forecasts_values)) - case AggregationMethod.UNWEIGHTED: - weights = None - case AggregationMethod.SINGLE_AGGREGATION: - repuatations = get_reputations_at_time(forecast_set.users, time) - weights = calculate_single_aggregation_weights( - forecast_set, - repuatations, - question.open_time, - question.scheduled_close_time, - ) - new_entry: AggregateForecast = calculate_aggregation_entry( + AggregationGenerator: Aggregation = aggregation_method_map[method]( + question=question, + user_ids=set(forecast_set.user_ids), + question_type=question.type, + ) + new_entry = AggregationGenerator.calculate_aggregation_entry( forecast_set, - question.type, - weights, - method=method, include_stats=include_stats, histogram=histogram, ) @@ -275,20 +801,18 @@ def get_aggregations_at_time( def summarize_array( - array: list, + array: list[float], size: int, -) -> list[datetime]: +) -> list[float]: """helper method to pick evenly distributed values from an ordered list array must be sorted in ascending order """ - if size <= 0: return [] elif len(array) <= size: return array elif size == 2 or len(array) == 2: return [array[0], array[-1]] - target_values = np.linspace(array[0], array[-1], size) summary = set() for target in target_values: @@ -310,16 +834,13 @@ def minimize_history( max_size: int = 400, ) -> list[datetime]: """This takes a sorted list of datetimes and returns a summarized version of it - The front end graphs have zoomed views on 1 day, 1 week, 2 months, and all time so this makes sure that the history contains sufficiently high resolution data for each interval. - max_size dictates the maximum numer of returned datetimes. """ if len(history) <= max_size: return history - h = [h.timestamp() for h in history] # determine how many datetimes we want to have in each interval day = timedelta(days=1).total_seconds() @@ -356,10 +877,8 @@ def minimize_history( month_size = even_spread week_size = even_spread day_size = even_spread - # start with smallest interval, populating it with timestamps up to it's size. If # interval isn't saturated, distribute the remaining allotment to smaller intervals. - # Day interval day_history = [] day_interval = [] @@ -403,10 +922,9 @@ def minimize_history( all_interval = h[:last_index] all_history = summarize_array(all_interval, all_size) remainder = all_size - len(all_history) - # put it all together minimized_history = all_history + month_history + week_history + day_history - return [datetime.fromtimestamp(h, tz=timezone.utc) for h in minimized_history] + return [datetime.fromtimestamp(h, tz=dt_timezone.utc) for h in minimized_history] def get_user_forecast_history( @@ -414,23 +932,21 @@ def get_user_forecast_history( minimize: bool = False, cutoff: datetime | None = None, ) -> list[ForecastSet]: - timesteps = set() + timestep_set: set[datetime] = set() for forecast in forecasts: - timesteps.add(forecast.start_time) + timestep_set.add(forecast.start_time) if forecast.end_time: if cutoff and forecast.end_time > cutoff: continue - timesteps.add(forecast.end_time) - - timesteps = sorted(timesteps) + timestep_set.add(forecast.end_time) + timesteps = sorted(timestep_set) if minimize: timesteps = minimize_history(timesteps) - forecast_sets: dict[datetime, ForecastSet] = { timestep: ForecastSet( forecasts_values=[], timestep=timestep, - users=[], + user_ids=[], timesteps=[], ) for timestep in timesteps @@ -446,43 +962,16 @@ def get_user_forecast_history( forecast_values = forecast.get_prediction_values() for timestep in timesteps[start_index:end_index]: forecast_sets[timestep].forecasts_values.append(forecast_values) - forecast_sets[timestep].users.append(forecast.author) + forecast_sets[timestep].user_ids.append(forecast.author_id) forecast_sets[timestep].timesteps.append(forecast.start_time) return sorted(list(forecast_sets.values()), key=lambda x: x.timestep) -def generate_recency_weights(number_of_forecasts: int) -> np.ndarray: - if number_of_forecasts <= 2: - return None - return np.exp( - np.sqrt(np.arange(number_of_forecasts) + 1) - np.sqrt(number_of_forecasts) - ) - - -def calculate_single_aggregation_weights( - forecast_set: ForecastSet, - reputations: list[Reputation], - open_time: datetime, - close_time: datetime, -) -> Weights: - # TODO: make these learned parameters - a = 0.5 - b = 6.0 - decays = [ - np.exp(-(forecast_set.timestep - start_time) / (close_time - open_time)) - for start_time in forecast_set.timesteps - ] - weights = [ - (decay**a * reputation.value ** (1 - a)) ** b - for decay, reputation in zip(decays, reputations) - ] - return weights - - def get_aggregation_history( question: Question, aggregation_methods: list[AggregationMethod], + forecasts: QuerySet[Forecast] | None = None, user_ids: list[int] | None = None, minimize: bool = True, include_stats: bool = True, @@ -491,87 +980,65 @@ def get_aggregation_history( ) -> dict[AggregationMethod, list[AggregateForecast]]: full_summary: dict[AggregationMethod, list[AggregateForecast]] = dict() - # get input forecasts - forecasts = ( - Forecast.objects.filter(question_id=question.id) - .order_by("start_time") - .select_related("author") - ) - if question.actual_close_time: - forecasts = forecasts.filter(start_time__lte=question.actual_close_time) + if not forecasts: + # get input forecasts + forecasts = ( + Forecast.objects.filter(question_id=question.id) + .order_by("start_time") + .select_related("author") + ) + if question.actual_close_time: + forecasts = forecasts.filter(start_time__lte=question.actual_close_time) - if user_ids: - forecasts = forecasts.filter(author_id__in=user_ids) - if not include_bots: - forecasts = forecasts.exclude(author__is_bot=True) + if user_ids: + forecasts = forecasts.filter(author_id__in=user_ids) + if not include_bots: + forecasts = forecasts.exclude(author__is_bot=True) forecast_history = get_user_forecast_history( forecasts, minimize, cutoff=question.actual_close_time ) + forecaster_ids = set(forecast.author_id for forecast in forecasts) for method in aggregation_methods: - aggregation_history: list[AggregateForecast] = [] - match method: - case AggregationMethod.RECENCY_WEIGHTED: - - def get_weights(forecast_set: ForecastSet) -> Weights | None: - return generate_recency_weights(len(forecast_set.forecasts_values)) - - case AggregationMethod.UNWEIGHTED: + if method == AggregationMethod.METACULUS_PREDICTION: + # saved in the database - not reproducable or updateable + full_summary[method] = list( + AggregateForecast.objects.filter( + question_id=question.id, method=method + ).order_by("start_time") + ) + continue - def get_weights(forecast_set: ForecastSet) -> Weights | None: - return None + aggregation_history: list[AggregateForecast] = [] + AggregationGenerator: Aggregation = aggregation_method_map[method]( + question=question, + user_ids=forecaster_ids, + question_type=question.type, + ) - case AggregationMethod.SINGLE_AGGREGATION: - users = list(set(forecast.author for forecast in forecasts)) - reputations = get_reputations_during_interval( - users, question.open_time, question.scheduled_close_time + for i, forecast_set in enumerate(forecast_history): + if histogram is not None: + include_histogram = histogram and ( + question.type + in [ + Question.QuestionType.BINARY, + Question.QuestionType.MULTIPLE_CHOICE, + ] ) - - def get_weights(forecast_set: ForecastSet) -> Weights | None: - reps = [] - for user in forecast_set.users: - for rep in reputations[user][::-1]: - if rep.time <= forecast_set.timestep: - reps.append(rep) - break - return calculate_single_aggregation_weights( - forecast_set, - reps, - question.open_time, - question.scheduled_close_time, - ) - - case AggregationMethod.METACULUS_PREDICTION: - full_summary[method] = list( - AggregateForecast.objects.filter( - question=question, method=method - ).order_by("start_time") + else: + include_histogram = ( + question.type == Question.QuestionType.BINARY + and i == (len(forecast_history) - 1) ) - continue - - for i, forecast_set in enumerate(forecast_history): - weights = get_weights(forecast_set) - include_histogram = ( - question.type - in [Question.QuestionType.BINARY, Question.QuestionType.MULTIPLE_CHOICE] - and histogram - if histogram is not None - else question.type == Question.QuestionType.BINARY - and i == (len(forecast_history) - 1) - ) if forecast_set.forecasts_values: - new_entry: AggregateForecast = calculate_aggregation_entry( + new_entry = AggregationGenerator.calculate_aggregation_entry( forecast_set, - question.type, - weights, - method=method, include_stats=include_stats, histogram=include_histogram, ) new_entry.question = question - new_entry.question_type = question.type new_entry.method = method if aggregation_history and aggregation_history[-1].end_time is None: aggregation_history[-1].end_time = new_entry.start_time @@ -579,7 +1046,6 @@ def get_weights(forecast_set: ForecastSet) -> Weights | None: else: if aggregation_history: aggregation_history[-1].end_time = forecast_set.timestep - full_summary[method] = aggregation_history return full_summary diff --git a/utils/the_math/measures.py b/utils/the_math/measures.py index fcdea83fec..53aaf7f8fe 100644 --- a/utils/the_math/measures.py +++ b/utils/the_math/measures.py @@ -90,13 +90,13 @@ def percent_point_function( def prediction_difference_for_sorting( - p1: ForecastValues, p2: ForecastValues, question: "Question" + p1: ForecastValues, p2: ForecastValues, question_type: "Question.QuestionType" ) -> float: """for binary and multiple choice, takes pmfs for continuous takes cdfs""" p1, p2 = np.array(p1), np.array(p2) # Uses Jeffrey's Divergence - if question.type in ["binary", "multiple_choice"]: + if question_type in ["binary", "multiple_choice"]: return sum([(p - q) * np.log2(p / q) for p, q in zip(p1, p2)]) cdf1 = np.array([1 - np.array(p1), p1]) cdf2 = np.array([1 - np.array(p2), p2])