Skip to content

Commit

Permalink
Merge pull request #2316 from estuary/bair/source-iterate
Browse files Browse the repository at this point in the history
source iterate: new connector
  • Loading branch information
dyaffe authored Jan 31, 2025
2 parents 10ea902 + 808eb8b commit 85638f0
Show file tree
Hide file tree
Showing 17 changed files with 2,149 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ on:
- "source-intercom-native/**"
- "source-shopify-native/**"
- "source-zendesk-support-native/**"
- "source-iterate/**"

pull_request:
branches: [main]
Expand Down Expand Up @@ -70,6 +71,7 @@ on:
- "source-intercom-native/**"
- "source-shopify-native/**"
- "source-zendesk-support-native/**"
- "source-iterate/**"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down Expand Up @@ -201,6 +203,10 @@ jobs:
type: capture
version: v1
usage_rate: "1.0"
- name: source-iterate
type: capture
version: v1
usage_rate: "1.0"

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions source-iterate/VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v1
10 changes: 10 additions & 0 deletions source-iterate/acmeCo/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
collections:
acmeCo/survey_responses:
schema: survey_responses.schema.yaml
key:
- /_meta/row_id
acmeCo/surveys:
schema: surveys.schema.yaml
key:
- /_meta/row_id
31 changes: 31 additions & 0 deletions source-iterate/acmeCo/survey_responses.schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
$defs:
Meta:
properties:
op:
default: u
description: "Operation type (c: Create, u: Update, d: Delete)"
enum:
- c
- u
- d
title: Op
type: string
row_id:
default: -1
description: "Row ID of the Document, counting up from zero, or -1 if not known"
title: Row Id
type: integer
title: Meta
type: object
additionalProperties: true
properties:
_meta:
$ref: "#/$defs/Meta"
default:
op: u
row_id: -1
description: Document metadata
title: FullRefreshResource
type: object
x-infer-schema: true
31 changes: 31 additions & 0 deletions source-iterate/acmeCo/surveys.schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
$defs:
Meta:
properties:
op:
default: u
description: "Operation type (c: Create, u: Update, d: Delete)"
enum:
- c
- u
- d
title: Op
type: string
row_id:
default: -1
description: "Row ID of the Document, counting up from zero, or -1 if not known"
title: Row Id
type: integer
title: Meta
type: object
additionalProperties: true
properties:
_meta:
$ref: "#/$defs/Meta"
default:
op: u
row_id: -1
description: Document metadata
title: FullRefreshResource
type: object
x-infer-schema: true
1,513 changes: 1,513 additions & 0 deletions source-iterate/poetry.lock

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions source-iterate/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[tool.poetry]
version = "0.1.0"
name = "source_iterate"
description = ""
authors = [ "Alex Bair <[email protected]>"]

[tool.poetry.dependencies]
estuary-cdk = {path="../estuary-cdk", develop = true}
python = "^3.11"
pydantic = "^2"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
mypy = "^1.8.0"
pytest = "^7.4.3"
pytest-insta = "^0.3.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
64 changes: 64 additions & 0 deletions source-iterate/source_iterate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from logging import Logger
from typing import Callable, Awaitable

from estuary_cdk.flow import (
ConnectorSpec,
)
from estuary_cdk.capture import (
BaseCaptureConnector,
Request,
Task,
common,
request,
response,
)
from estuary_cdk.http import HTTPMixin

from .resources import all_resources, validate_credentials
from .models import (
ConnectorState,
EndpointConfig,
ResourceConfig,
)


class Connector(
BaseCaptureConnector[EndpointConfig, ResourceConfig, ConnectorState],
HTTPMixin,
):
def request_class(self):
return Request[EndpointConfig, ResourceConfig, ConnectorState]

async def spec(self, log: Logger, _: request.Spec) -> ConnectorSpec:
return ConnectorSpec(
configSchema=EndpointConfig.model_json_schema(),
oauth2=None,
documentationUrl="https://go.estuary.dev/source-iterate",
resourceConfigSchema=ResourceConfig.model_json_schema(),
resourcePathPointers=ResourceConfig.PATH_POINTERS,
)

async def discover(
self, log: Logger, discover: request.Discover[EndpointConfig]
) -> response.Discovered[ResourceConfig]:
resources = await all_resources(log, self, discover.config)
return common.discovered(resources)

async def validate(
self,
log: Logger,
validate: request.Validate[EndpointConfig, ResourceConfig],
) -> response.Validated:
await validate_credentials(log, self, validate.config)
resources = await all_resources(log, self, validate.config)
resolved = common.resolve_bindings(validate.bindings, resources)
return common.validated(resolved)

async def open(
self,
log: Logger,
open: request.Open[EndpointConfig, ResourceConfig, ConnectorState],
) -> tuple[response.Opened, Callable[[Task], Awaitable[None]]]:
resources = await all_resources(log, self, open.capture.config)
resolved = common.resolve_bindings(open.capture.bindings, resources)
return common.open(open, resolved)
4 changes: 4 additions & 0 deletions source-iterate/source_iterate/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import asyncio
import source_iterate

asyncio.run(source_iterate.Connector().serve())
70 changes: 70 additions & 0 deletions source-iterate/source_iterate/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from logging import Logger
from typing import AsyncGenerator
from urllib.parse import urlparse, parse_qs

from estuary_cdk.http import HTTPSession

from .models import (
FullRefreshResource,
SurveysResponse,
SurveyResponsesResponse,
)

API = "https://iteratehq.com/api/v1"

# Iterate docs: https://iterate.docs.apiary.io/#introduction/overview
# The docs mention that every request needs a "v" query param that's the date
# of the implementation. It doesn't seem like this is required for API requests
# to succeed, but I assume it's used somehow to avoid breaking changes.
VERSION = 20250130


def _extract_next_page_cursor(url: str) -> str:
query_params = parse_qs(urlparse(url).query)

cursor = query_params.get("page[cursor]", None)
if cursor is None:
msg = f"Did not find a page[cursor] parameter in URL: {url}"
raise RuntimeError(msg)

return cursor[0]


async def snapshot_surveys(
http: HTTPSession,
log: Logger,
) -> AsyncGenerator[FullRefreshResource, None]:
url = f"{API}/surveys"
params = {"v": VERSION}

response = SurveysResponse.model_validate_json(
await http.request(log, url, params=params)
)

for survey in response.results:
yield survey


async def snapshot_survey_responses(
http: HTTPSession,
log: Logger,
) -> AsyncGenerator[FullRefreshResource, None]:
async for survey in snapshot_surveys(http, log):
survey_id = getattr(survey, "id")
url = f"{API}/surveys/{survey_id}/responses"
params: dict[str, str | int] = {"v": VERSION}

while True:
response = SurveyResponsesResponse.model_validate_json(
await http.request(log, url, params=params)
)

for record in response.results.list:
yield record

if not response.links:
break

cursor = _extract_next_page_cursor(response.links.next)

params["page[cursor]"] = cursor
53 changes: 53 additions & 0 deletions source-iterate/source_iterate/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import AsyncGenerator, Callable

from estuary_cdk.capture.common import (
BaseDocument,
ResourceConfig,
ResourceState,
)
from estuary_cdk.capture.common import (
ConnectorState as GenericConnectorState,
LogCursor,
Logger,
)
from estuary_cdk.http import HTTPSession, AccessToken

from pydantic import BaseModel, Field


class EndpointConfig(BaseModel):
credentials: AccessToken = Field(
discriminator="credentials_title",
title="Authentication",
)


ConnectorState = GenericConnectorState[ResourceState]


class FullRefreshResource(BaseDocument, extra="allow"):
pass


class SurveysResponse(BaseModel, extra="forbid"):
results: list[FullRefreshResource]


class SurveyResponsesResponse(BaseModel, extra="allow"):
class Results(BaseModel, extra="forbid"):
count: int
list: list[FullRefreshResource]

results: Results

class Links(BaseModel, extra="forbid"):
next: str

# links is not present on the last page of results.
links: Links | None = None


FullRefreshFn = Callable[
[HTTPSession, Logger],
AsyncGenerator[FullRefreshResource, None],
]
Loading

0 comments on commit 85638f0

Please sign in to comment.