Skip to content
53 changes: 50 additions & 3 deletions src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def _map_binned_logs_to_dandiset(

all_reduced_s3_logs_per_blob_id_aggregated_by_day = dict()
all_reduced_s3_logs_per_blob_id_aggregated_by_region = dict()
all_reduced_s3_logs_per_blob_id_aggregated_by_ip = dict()

blob_id_to_asset_path = dict()
total_bytes_across_versions_by_blob_id = dict()
dandiset_versions = list(dandiset.get_versions())
Expand Down Expand Up @@ -180,10 +182,10 @@ def _map_binned_logs_to_dandiset(
for ip_address in reduced_s3_log_binned_by_blob_id["ip_address"]
]

reordered_reduced_s3_log = reduced_s3_log_binned_by_blob_id.reindex(
columns=("timestamp", "bytes_sent", "region")
reordered_reduced_s3_log = reduced_s3_log_binned_by_blob_id.sort_values(
by="timestamp",
key=natsort.natsort_keygen(),
)
reordered_reduced_s3_log.sort_values(by="timestamp", key=natsort.natsort_keygen(), inplace=True)
reordered_reduced_s3_log.index = range(len(reordered_reduced_s3_log))

dandiset_version_log_folder_path.mkdir(parents=True, exist_ok=True)
Expand All @@ -205,6 +207,11 @@ def _map_binned_logs_to_dandiset(
all_reduced_s3_logs_aggregated_by_region_for_version.append(aggregated_activity_by_region)
all_reduced_s3_logs_per_blob_id_aggregated_by_region[blob_id] = aggregated_activity_by_region

aggregated_activity_by_ip = _aggregate_activity_by_ip_per_asset(
reduced_s3_logs_per_asset=reordered_reduced_s3_log
)
all_reduced_s3_logs_per_blob_id_aggregated_by_ip[blob_id] = aggregated_activity_by_ip

total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"])
total_bytes_per_asset_path[asset.path] = total_bytes

Expand Down Expand Up @@ -256,6 +263,12 @@ def _map_binned_logs_to_dandiset(
total_bytes_per_asset_path=total_bytes_across_versions_by_asset, file_path=dandiset_summary_by_asset_file_path
)

dandiset_summary_by_ip_file_path = dandiset_log_folder_path / "dandiset_summary_by_ip.tsv"
_write_aggregated_activity_by_ip(
reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_ip.values(),
file_path=dandiset_summary_by_ip_file_path,
)

return None


Expand Down Expand Up @@ -295,6 +308,31 @@ def _aggregate_activity_by_asset(total_bytes_per_asset_path: dict[str, int]) ->
return aggregated_activity_by_asset


def _aggregate_activity_by_ip_per_asset(reduced_s3_logs_per_asset: pandas.DataFrame) -> pandas.DataFrame:
reduced_s3_logs_clipped = reduced_s3_logs_per_asset.reindex(columns=("date", "ip_address"))
pre_aggregated = reduced_s3_logs_clipped.groupby(by="date", as_index=False)["ip_address"].agg([list, "nunique"])
pre_aggregated.rename(columns={"nunique": "num_unique_access"}, inplace=True)
pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)
aggregated_activity_by_ip = pre_aggregated.reindex(columns=("date", "num_unique_access"))

return aggregated_activity_by_ip


def _aggregate_activity_by_ip(reduced_s3_logs_per_day: Iterable[pandas.DataFrame]) -> pandas.DataFrame:
all_reduced_s3_logs = pandas.concat(objs=reduced_s3_logs_per_day, ignore_index=True)
all_reduced_s3_logs_clipped = all_reduced_s3_logs.reindex(columns=("date", "num_unique_access"))

pre_aggregated = all_reduced_s3_logs_clipped.groupby(by="date", as_index=False)["num_unique_access"].agg(
[list, "sum"]
)
pre_aggregated.rename(columns={"sum": "num_unique_access"}, inplace=True)
pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)

aggregated_activity_by_ip = pre_aggregated.reindex(columns=("date", "num_unique_access"))

return aggregated_activity_by_ip


def _write_aggregated_activity_by_day(
reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path
) -> None:
Expand All @@ -304,6 +342,15 @@ def _write_aggregated_activity_by_day(
return None


def _write_aggregated_activity_by_ip(
reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path
) -> None:
aggregated_activity_by_ip = _aggregate_activity_by_ip(reduced_s3_logs_per_day=reduced_s3_logs_per_day)
aggregated_activity_by_ip.to_csv(path_or_buf=file_path, mode="w", sep="\t", header=True, index=False)

return None


def _write_aggregated_activity_by_region(
reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path
) -> None:
Expand Down
Loading