Skip to content

fix(destination-motherduck): use normalized stream name for final table in DuckDB processor #50428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ def write_stream_data_from_buffer(
sync_mode: DestinationSyncMode,
) -> None:
temp_table_name = self._create_table_for_loading(stream_name, batch_id=None)
final_table_name = self.normalizer.normalize(stream_name)
try:
pa_table = pa.Table.from_pydict(buffer[stream_name])
except Exception:
Expand All @@ -375,7 +376,7 @@ def write_stream_data_from_buffer(
self._write_temp_table_to_target_table(
stream_name=stream_name,
temp_table_name=temp_table_name_dedup,
final_table_name=stream_name,
final_table_name=final_table_name,
sync_mode=sync_mode,
)
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
SyncMode,
Type,
)
from airbyte_cdk.models.airbyte_protocol import AirbyteMessage
from airbyte_cdk.sql._util.name_normalizers import LowerCaseNormalizer
from airbyte_cdk.sql.secrets import SecretString


Expand Down Expand Up @@ -90,23 +92,21 @@ def disable_destination_modification(monkeypatch, request):
monkeypatch.setattr(DestinationMotherDuck, "_get_destination_path", lambda _, x: x)


@pytest.fixture(scope="module")
def test_table_name() -> str:
@pytest.fixture
def stream_name() -> str:
letters = string.ascii_lowercase
rand_string = "".join(random.choice(letters) for _ in range(10))
return f"airbyte_integration_{rand_string}"
return f"airbyte-integration.{rand_string}"


@pytest.fixture(scope="module")
def other_test_table_name(test_table_name) -> str:
return test_table_name + "_other"
@pytest.fixture
def destination_table_name(stream_name) -> str:
return LowerCaseNormalizer.normalize(stream_name)


@pytest.fixture
def test_large_table_name() -> str:
letters = string.ascii_lowercase
rand_string = "".join(random.choice(letters) for _ in range(10))
return f"airbyte_integration_{rand_string}"
@pytest.fixture(scope="module")
def other_test_table_name(destination_table_name) -> str:
return destination_table_name + "_other"


@pytest.fixture
Expand Down Expand Up @@ -135,15 +135,15 @@ def other_table_schema() -> str:

@pytest.fixture
def configured_catalogue(
test_table_name: str,
stream_name: str,
destination_table_name: str,
other_test_table_name: str,
test_large_table_name: str,
table_schema: str,
other_table_schema: str,
) -> ConfiguredAirbyteCatalog:
append_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(
name=test_table_name,
name=destination_table_name,
json_schema=table_schema,
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
Expand All @@ -163,7 +163,7 @@ def configured_catalogue(
)
append_stream_large = ConfiguredAirbyteStream(
stream=AirbyteStream(
name=test_large_table_name,
name=stream_name,
json_schema=table_schema,
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
Expand All @@ -176,13 +176,12 @@ def configured_catalogue(

@pytest.fixture
def configured_catalogue_append_dedup(
test_table_name: str,
test_large_table_name: str,
stream_name: str,
table_schema: str,
) -> ConfiguredAirbyteCatalog:
append_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(
name=test_table_name,
name=stream_name,
json_schema=table_schema,
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
Expand All @@ -192,7 +191,7 @@ def configured_catalogue_append_dedup(
)
append_stream_large = ConfiguredAirbyteStream(
stream=AirbyteStream(
name=test_large_table_name,
name=stream_name,
json_schema=table_schema,
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
Expand All @@ -204,41 +203,48 @@ def configured_catalogue_append_dedup(


@pytest.fixture
def airbyte_message1(test_table_name: str):
def airbyte_message1(
stream_name: str,
) -> AirbyteMessage:
fake = Faker()
Faker.seed(0)
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=test_table_name,
stream=stream_name,
data={"key1": fake.unique.first_name(), "key2": str(fake.ssn())},
emitted_at=int(datetime.now().timestamp()) * 1000,
),
)


@pytest.fixture
def airbyte_message2(test_table_name: str):
def airbyte_message2(
stream_name: str,
) -> AirbyteMessage:
fake = Faker()
Faker.seed(1)
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=test_table_name,
stream=stream_name,
data={"key1": fake.unique.first_name(), "key2": str(fake.ssn())},
emitted_at=int(datetime.now().timestamp()) * 1000,
),
)


@pytest.fixture
def airbyte_message2_update(airbyte_message2: AirbyteMessage, test_table_name: str):
def airbyte_message2_update(
airbyte_message2: AirbyteMessage,
stream_name: str,
) -> AirbyteMessage:
fake = Faker()
Faker.seed(1)
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=test_table_name,
stream=stream_name,
data={
"key1": airbyte_message2.record.data["key1"],
"key2": str(fake.ssn()),
Expand Down Expand Up @@ -330,11 +336,14 @@ def test_write(
airbyte_message3: AirbyteMessage,
airbyte_message4: AirbyteMessage,
airbyte_message5: AirbyteMessage,
test_table_name: str,
destination_table_name: str,
test_schema_name: str,
test_large_table_name: str,
stream_name: str,
sql_processor,
):
assert (
LowerCaseNormalizer.normalize(stream_name) == destination_table_name
), f"Inputs appear invalid. {stream_name=}, {destination_table_name=}"
destination = DestinationMotherDuck()
generator = destination.write(
config,
Expand All @@ -347,7 +356,7 @@ def test_write(

sql_result = sql_processor._execute_sql(
"SELECT key1, key2, _airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta "
f"FROM {test_schema_name}.{test_table_name} ORDER BY key1"
f"FROM {test_schema_name}.{destination_table_name} ORDER BY key1"
)

assert len(sql_result) == 2
Expand All @@ -365,7 +374,7 @@ def test_write_dupe(
airbyte_message2: AirbyteMessage,
airbyte_message2_update: AirbyteMessage,
airbyte_message3: AirbyteMessage,
test_table_name: str,
destination_table_name: str,
test_schema_name: str,
sql_processor,
):
Expand All @@ -381,7 +390,7 @@ def test_write_dupe(

sql_result = sql_processor._execute_sql(
"SELECT key1, key2, _airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta "
f"FROM {test_schema_name}.{test_table_name} ORDER BY key1"
f"FROM {test_schema_name}.{destination_table_name} ORDER BY key1"
)

assert len(sql_result) == 2
Expand Down Expand Up @@ -481,7 +490,8 @@ def test_large_number_of_writes(
config: Dict[str, str],
request,
configured_catalogue: ConfiguredAirbyteCatalog,
test_large_table_name: str,
stream_name: str,
destination_table_name: str,
test_schema_name: str,
airbyte_message_generator: Callable[[int, int, str], Iterable[AirbyteMessage]],
explanation: str,
Expand All @@ -491,11 +501,11 @@ def test_large_number_of_writes(
generator = destination.write(
config,
configured_catalogue,
airbyte_message_generator(TOTAL_RECORDS, BATCH_WRITE_SIZE, test_large_table_name),
airbyte_message_generator(TOTAL_RECORDS, BATCH_WRITE_SIZE, stream_name),
)

result = list(generator)
assert len(result) == TOTAL_RECORDS // (BATCH_WRITE_SIZE + 1)

sql_result = sql_processor._execute_sql("SELECT count(1) " f"FROM {test_schema_name}.{test_large_table_name}")
sql_result = sql_processor._execute_sql("SELECT count(1) " f"FROM {test_schema_name}.{destination_table_name}")
assert sql_result[0][0] == TOTAL_RECORDS - TOTAL_RECORDS // (BATCH_WRITE_SIZE + 1)
Loading