Skip to content

Commit

Permalink
source-zendesk-support-native: refactor satisfaction_ratings stream…
Browse files Browse the repository at this point in the history
… to use date windows

Zendesk's `/satisfaction_ratings` endpoint returns results in descending
order, and it can take a long time to backfill. The
`satisfaction_ratings` stream has been refactored to use date windows so
the connector can perform backfills in checkpoint-able chunks rather
than having to process all results in a single shot.
  • Loading branch information
Alex-Bair committed Jan 30, 2025
1 parent 910862d commit 7a01923
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import base64
from datetime import datetime, UTC
from datetime import datetime, timedelta, UTC
import json
from logging import Logger
from typing import Any, AsyncGenerator
Expand All @@ -17,13 +17,17 @@
UsersResponse,
ClientSideIncrementalCursorPaginatedResponse,
IncrementalCursorPaginatedResponse,
SatisfactionRatingsResponse,
AuditLog,
AuditLogsResponse,
INCREMENTAL_CURSOR_EXPORT_TYPES,
)

CURSOR_PAGINATION_PAGE_SIZE = 100
MIN_CHECKPOINT_COUNT = 1500
MAX_SATISFACTION_RATINGS_WINDOW_SIZE = timedelta(days=30)
# Zendesk errors out if a start or end time parameter is more recent than 60 seconds in the past.
TIME_PARAMETER_DELAY = timedelta(seconds=60)

DATETIME_STRING_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

Expand Down Expand Up @@ -224,6 +228,90 @@ async def backfill_incremental_cursor_paginated_resources(
yield response.meta.after_cursor


async def _fetch_satisfaction_ratings_between(
http: HTTPSession,
subdomain: str,
start: int,
end: int,
log: Logger,
) -> AsyncGenerator[ZendeskResource, None]:
url = f"{url_base(subdomain)}/satisfaction_ratings"

params: dict[str, str | int] = {
"start_time": start,
"end_time": end,
"page[size]": CURSOR_PAGINATION_PAGE_SIZE,
}

while True:
response = SatisfactionRatingsResponse.model_validate_json(
await http.request(log, url, params=params)
)

for satisfaction_rating in response.resources:
yield satisfaction_rating

if not response.meta.has_more:
break

if response.meta.after_cursor:
params["page[after]"] = response.meta.after_cursor


async def fetch_satisfaction_ratings(
http: HTTPSession,
subdomain: str,
log: Logger,
log_cursor: LogCursor,
) -> AsyncGenerator[ZendeskResource | LogCursor, None]:
assert isinstance(log_cursor, datetime)

end = min(datetime.now(tz=UTC) - TIME_PARAMETER_DELAY, log_cursor + MAX_SATISFACTION_RATINGS_WINDOW_SIZE)

generator = _fetch_satisfaction_ratings_between(
http=http,
subdomain=subdomain,
start=_dt_to_s(log_cursor),
end=_dt_to_s(end),
log=log,
)

async for satisfaction_rating in generator:
yield satisfaction_rating

yield end


async def backfill_satisfaction_ratings(
http: HTTPSession,
subdomain: str,
log: Logger,
page: PageCursor,
cutoff: LogCursor,
) -> AsyncGenerator[ZendeskResource | PageCursor, None]:
assert isinstance(page, int)
assert isinstance(cutoff, datetime)
cutoff_ts = _dt_to_s(cutoff)

if page >= cutoff_ts:
return

end = min(cutoff_ts, page + int(MAX_SATISFACTION_RATINGS_WINDOW_SIZE.total_seconds()))

generator = _fetch_satisfaction_ratings_between(
http=http,
subdomain=subdomain,
start=page,
end=end,
log=log,
)

async for satisfaction_rating in generator:
yield satisfaction_rating

yield end


async def _make_incremental_cursor_export_request(
http: HTTPSession,
url: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class TicketMetricEventsResponse(IncrementalCursorPaginatedResponse):
# Incremental resources that can be filtered by a start_time query param.
# Tuples contain the name, path, cursor field, and response model for each resource.
INCREMENTAL_CURSOR_PAGINATED_RESOURCES: list[tuple[str, str, str, type[IncrementalCursorPaginatedResponse]]] = [
("satisfaction_ratings", "satisfaction_ratings", "updated_at", SatisfactionRatingsResponse),
("ticket_skips", "skips", "updated_at", TicketSkipsResponse),
("ticket_metric_events", "incremental/ticket_metric_events", "time", TicketMetricEventsResponse)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@
backfill_audit_logs,
backfill_incremental_cursor_export_resources,
backfill_incremental_cursor_paginated_resources,
backfill_satisfaction_ratings,
backfill_ticket_child_resources,
backfill_ticket_metrics,
fetch_audit_logs,
fetch_client_side_incremental_cursor_paginated_resources,
fetch_incremental_cursor_export_resources,
fetch_incremental_cursor_paginated_resources,
fetch_satisfaction_ratings,
fetch_ticket_child_resources,
fetch_ticket_metrics,
snapshot_cursor_paginated_resources,
url_base,
_dt_to_s,
TIME_PARAMETER_DELAY,
)

# Zendesk errors out if a start or end time parameter is more recent than 60 seconds in the past.
TIME_PARAMETER_DELAY = timedelta(seconds=60)

EPOCH = datetime(1970, 1, 1, tzinfo=UTC)

async def validate_credentials(
Expand Down Expand Up @@ -262,6 +262,52 @@ def open(
return resources


def satisfaction_ratings(
log: Logger, http: HTTPMixin, config: EndpointConfig
) -> common.Resource:

def open(
binding: CaptureBinding[ResourceConfig],
binding_index: int,
state: ResourceState,
task: Task,
all_bindings,
):
common.open_binding(
binding,
binding_index,
state,
task,
fetch_changes=functools.partial(
fetch_satisfaction_ratings,
http,
config.subdomain,
),
fetch_page=functools.partial(
backfill_satisfaction_ratings,
http,
config.subdomain,
)
)

cutoff = datetime.now(tz=UTC) - TIME_PARAMETER_DELAY

return common.Resource(
name="satisfaction_ratings",
key=["/id"],
model=ZendeskResource,
open=open,
initial_state=ResourceState(
inc=ResourceState.Incremental(cursor=cutoff),
backfill=ResourceState.Backfill(cutoff=cutoff, next_page=_dt_to_s(config.start_date))
),
initial_config=ResourceConfig(
name="satisfaction_ratings", interval=timedelta(minutes=5)
),
schema_inference=True,
)


def incremental_cursor_paginated_resources(
log: Logger, http: HTTPMixin, config: EndpointConfig
) -> list[common.Resource]:
Expand Down Expand Up @@ -456,6 +502,7 @@ async def all_resources(
ticket_metrics(log, http, config),
*full_refresh_cursor_paginated_resources(log, http, config),
*client_side_filtered_cursor_paginated_resources(log, http, config),
satisfaction_ratings(log, http, config),
*incremental_cursor_paginated_resources(log, http, config),
*incremental_cursor_export_resources(log, http, config),
*ticket_child_resources(log, http, config),
Expand Down

0 comments on commit 7a01923

Please sign in to comment.