-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathapi.py
232 lines (182 loc) · 6.75 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from datetime import datetime, timedelta, UTC
from logging import Logger
from typing import Any, AsyncGenerator, Generator
from zoneinfo import ZoneInfo
from estuary_cdk.capture.common import LogCursor, PageCursor
from estuary_cdk.http import HTTPSession
from estuary_cdk.incremental_json_processor import IncrementalJsonProcessor
from .models import (
DimensionHeader,
MetricHeader,
Report,
ReportDocument,
Row,
RunReportResponse,
MetadataResponse,
)
from .utils import (
dt_to_date_str,
dt_to_str,
str_to_dt,
)
API = "https://analyticsdata.googleapis.com/v1beta/properties"
MAX_REPORT_RESULTS_LIMIT = 250_000
def _transform_into_record(
row: Row,
dimension_headers: list[DimensionHeader],
metric_headers: list[MetricHeader],
property_id: str,
report_date: datetime
) -> dict[str, str | float | int]:
record: dict[str, str | float | int] = {}
for index, dimension_value in enumerate(row.dimensionValues):
dimension_name = dimension_headers[index].name
record[dimension_name] = dimension_value.value
for index, metric_value in enumerate(row.metricValues):
metric_name = metric_headers[index].name
record[metric_name] = metric_value.value
# Add report identifying fields.
record["property_id"] = property_id
record["report_date"] = dt_to_date_str(report_date)
return record
def _build_report_body(
dt: datetime,
report: Report,
offset: int = 0,
limit: int = MAX_REPORT_RESULTS_LIMIT,
) -> dict[str, Any]:
body = {
"dateRanges": [
{
"startDate": dt_to_date_str(dt),
"endDate": dt_to_date_str(dt),
}
],
"dimensions": [{"name": d} for d in report.dimensions],
"metrics": [{"name": m} for m in report.metrics],
"limit": limit,
"offset": offset,
}
if report.dimensionFilter:
body["dimensionFilter"] = report.dimensionFilter
if report.metricFilter:
body["metricFilter"] = report.metricFilter
return body
def _are_same_day(dt1: datetime, dt2: datetime) -> bool:
return dt1.date() == dt2.date()
async def fetch_property_timezone(
http: HTTPSession,
property_id: str,
log: Logger,
) -> str:
url = f"{API}/{property_id}:runReport"
basic_report = Report.model_validate({"name": "test_report", "dimensions": ["date"], "metrics": ["active1DayUsers"]})
body = _build_report_body(datetime.now(tz=UTC), basic_report)
response = RunReportResponse.model_validate_json(
await http.request(log, url, method="POST", json=body)
)
return response.metadata.timeZone
async def _fetch_headers(
http: HTTPSession,
url: str,
date: datetime,
report: Report,
log: Logger,
) -> tuple[list[DimensionHeader], list[MetricHeader]]:
body = _build_report_body(date, report, offset=0, limit=1)
response = RunReportResponse.model_validate_json(
await http.request(log, url, method="POST", json=body)
)
return (response.dimensionHeaders, response.metricHeaders)
async def _paginate_through_report_results(
http: HTTPSession,
property_id: str,
report_doc_model: type[ReportDocument],
date: datetime,
report: Report,
log: Logger,
) -> AsyncGenerator[ReportDocument, None]:
url = f"{API}/{property_id}:runReport"
offset = 0
dimension_headers, metric_headers = await _fetch_headers(http, url, date, report, log)
while True:
body = _build_report_body(date, report, offset)
processor = IncrementalJsonProcessor(
await http.request_stream(log, url, method="POST", json=body),
f"rows.item",
Row,
RunReportResponse,
)
async for row in processor:
record = _transform_into_record(row, dimension_headers, metric_headers, property_id, date)
yield report_doc_model.model_validate(record)
remainder = processor.get_remainder()
if remainder.rowCount is None:
return
offset += MAX_REPORT_RESULTS_LIMIT
if offset >= remainder.rowCount:
return
async def fetch_report(
http: HTTPSession,
property_id: str,
timezone: ZoneInfo,
report_doc_model: type[ReportDocument],
report: Report,
start_date: datetime,
lookback_window_size: int,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[ReportDocument | LogCursor, None]:
assert isinstance(log_cursor, datetime)
lookback_start = max(log_cursor - timedelta(days=lookback_window_size), start_date)
start = log_cursor
end = datetime.now(tz=timezone)
# Recapture reports from the past lookback_window_size days. The underlying data for reports in this window could have changed
# for various reasons, so we recapture reports within the lookback window.
# https://support.google.com/analytics/answer/11198161
while not _are_same_day(lookback_start, start):
async for record in _paginate_through_report_results(http, property_id, report_doc_model, lookback_start, report, log):
yield record
lookback_start = min(lookback_start + timedelta(days=1), start)
# Catch up to the present day.
while not _are_same_day(start, end):
async for record in _paginate_through_report_results(http, property_id, report_doc_model, start, report, log):
yield record
start = min(start + timedelta(days=1), end)
if start < end:
yield start
# Fetch the current day's results.
async for record in _paginate_through_report_results(http, property_id, report_doc_model, start, report, log):
yield record
yield end
async def backfill_report(
http: HTTPSession,
property_id: str,
report_doc_model: type[ReportDocument],
report: Report,
log: Logger,
page: PageCursor,
cutoff: LogCursor,
) -> AsyncGenerator[ReportDocument | PageCursor, None]:
assert isinstance(page, str)
assert isinstance(cutoff, datetime)
start = str_to_dt(page)
# The incremental task will capture records for the cutoff day.
if start >= cutoff or _are_same_day(start, cutoff):
return
async for record in _paginate_through_report_results(http, property_id, report_doc_model, start, report, log):
yield record
next_start = min(start + timedelta(days=1), cutoff)
yield dt_to_str(next_start)
async def fetch_metadata(
http: HTTPSession,
property_id: str,
log: Logger,
) -> tuple[list[str], list[str]]:
url = f"{API}/{property_id}/metadata"
response = MetadataResponse.model_validate_json(
await http.request(log, url)
)
dimensions = [dimension.apiName for dimension in response.dimensions]
metrics = [metric.apiName for metric in response.metrics]
return (dimensions, metrics)