Skip to content

Commit 355ac15

Browse files
authored
Destination Redshift: Implement refreshes (#40567)
1 parent e07314c commit 355ac15

File tree

16 files changed

+393
-37
lines changed

16 files changed

+393
-37
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.41.2 | 2024-07-12 | [\#40567](https://github.com/airbytehq/airbyte/pull/40567) | Fix BaseSqlGenerator test case (generation_id support); update minimum platform version for refreshes support. |
177178
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
178179
| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm |
179180
| 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.41.1
1+
version=0.41.2

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ constructor(
137137
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
138138
if (stream.generationId == null || stream.minimumGenerationId == null) {
139139
throw ConfigErrorException(
140-
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0"
140+
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.7"
141141
)
142142
}
143143
if (

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt

+3
Original file line numberDiff line numberDiff line change
@@ -1814,6 +1814,9 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
18141814
)
18151815
)
18161816
)
1817+
.withSyncId(42)
1818+
.withGenerationId(43)
1819+
.withMinimumGenerationId(0)
18171820
.withSyncMode(SyncMode.INCREMENTAL)
18181821
.withDestinationSyncMode(DestinationSyncMode.APPEND)
18191822
)

airbyte-integrations/connectors/destination-redshift/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.38.3'
7+
cdkVersionRequired = '0.41.2'
88
features = ['db-destinations', 's3-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-redshift/metadata.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
8-
dockerImageTag: 3.2.0
8+
dockerImageTag: 3.3.0
99
dockerRepository: airbyte/destination-redshift
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
1111
githubIssueLabel: destination-redshift
@@ -37,6 +37,7 @@ data:
3737
releaseStage: generally_available
3838
supportLevel: certified
3939
supportsDbt: true
40+
supportsRefreshes: true
4041
tags:
4142
- language:java
4243
connectorTestSuitesOptions:

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt

+8-1
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,12 @@ class RedshiftDestination : BaseConnector(), Destination {
184184
hasUnprocessedRecords = true,
185185
maxProcessedTimestamp = Optional.empty(),
186186
),
187+
initialTempRawTableStatus =
188+
InitialRawTableStatus(
189+
rawTableExists = false,
190+
hasUnprocessedRecords = true,
191+
maxProcessedTimestamp = Optional.empty(),
192+
),
187193
isSchemaMismatch = true,
188194
isFinalTableEmpty = true,
189195
destinationState =
@@ -284,7 +290,8 @@ class RedshiftDestination : BaseConnector(), Destination {
284290
)
285291
}
286292

287-
private fun getDatabase(dataSource: DataSource): JdbcDatabase {
293+
@VisibleForTesting
294+
fun getDatabase(dataSource: DataSource): JdbcDatabase {
288295
return DefaultJdbcDatabase(dataSource)
289296
}
290297

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operation/RedshiftStagingStorageOperation.kt

+73-18
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import io.airbyte.integrations.destination.redshift.manifest.Entry
1919
import io.airbyte.integrations.destination.redshift.manifest.Manifest
2020
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler
2121
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator
22-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2322
import io.github.oshai.kotlinlogging.KotlinLogging
2423
import java.time.Instant
2524
import java.time.ZoneOffset
@@ -41,17 +40,70 @@ class RedshiftStagingStorageOperation(
4140
private val writeDatetime: ZonedDateTime = Instant.now().atZone(ZoneOffset.UTC)
4241
private val objectMapper = ObjectMapper()
4342

44-
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
43+
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
4544
// create raw table
46-
destinationHandler.execute(Sql.of(createRawTableQuery(streamId)))
47-
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
48-
destinationHandler.execute(Sql.of(truncateRawTableQuery(streamId)))
45+
destinationHandler.execute(Sql.of(createRawTableQuery(streamId, suffix)))
46+
if (replace) {
47+
destinationHandler.execute(Sql.of(truncateRawTableQuery(streamId, suffix)))
4948
}
5049
// create bucket for staging files
5150
s3StorageOperations.createBucketIfNotExists()
5251
}
5352

54-
override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
53+
override fun overwriteStage(streamId: StreamId, suffix: String) {
54+
destinationHandler.execute(
55+
Sql.transactionally(
56+
"""DROP TABLE IF EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" """,
57+
"""ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName}$suffix" RENAME TO "${streamId.rawName}" """
58+
)
59+
)
60+
}
61+
62+
override fun transferFromTempStage(streamId: StreamId, suffix: String) {
63+
destinationHandler.execute(
64+
// ALTER TABLE ... APPEND is an efficient way to move records from one table to another.
65+
// Instead of naively duplicating the data, it actually moves the underlying data
66+
// blocks.
67+
// (https://docs.aws.amazon.com/redshift/latest/dg/r_ALTER_TABLE_APPEND.html)
68+
// But it can't run inside transactions, so run these statements separately.
69+
Sql.separately(
70+
// Note for future developers:
71+
// ALTER TABLE ... APPEND has some interesting restrictions where both tables need
72+
// the exact same structure (clustering, columns, etc.), so if we want to change
73+
// those in the future, this might be tricky/annoying?
74+
// If we have issues at that point, we can always switch to a simple
75+
// `INSERT INTO ... SELECT * FROM ...` query.
76+
"""
77+
ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName}"
78+
APPEND FROM "${streamId.rawNamespace}"."${streamId.rawName}$suffix"
79+
""".trimIndent(),
80+
"""DROP TABLE IF EXISTS "${streamId.rawNamespace}"."${streamId.rawName}$suffix" """,
81+
),
82+
// Skip the case-sensitivity thing - ALTER TABLE ... APPEND can't be run in a
83+
// transaction, so we can't run the SET statement.
84+
// We're only working with schema/table names, so it's fine to just quote the
85+
// identifiers instead of relying on this option.
86+
forceCaseSensitiveIdentifier = false
87+
)
88+
}
89+
90+
override fun getStageGeneration(streamId: StreamId, suffix: String): Long? {
91+
val generation =
92+
destinationHandler.query(
93+
"""SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID} FROM "${streamId.rawNamespace}"."${streamId.rawName}$suffix" LIMIT 1"""
94+
)
95+
if (generation.isEmpty()) {
96+
return null
97+
}
98+
99+
return generation.first()[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID].asLong()
100+
}
101+
102+
override fun writeToStage(
103+
streamConfig: StreamConfig,
104+
suffix: String,
105+
data: SerializableBuffer
106+
) {
55107
val streamId = streamConfig.id
56108
val objectPath: String = getStagingPath(streamId)
57109
log.info {
@@ -61,13 +113,19 @@ class RedshiftStagingStorageOperation(
61113
s3StorageOperations.uploadRecordsToBucket(data, streamId.rawNamespace, objectPath)
62114

63115
log.info {
64-
"Starting copy to target table from stage: ${streamId.rawName} in destination from stage: $objectPath/$filename."
116+
"Starting copy to target table from stage: ${streamId.rawName}$suffix in destination from stage: $objectPath/$filename."
65117
}
66118
val manifestContents = createManifest(listOf(filename), objectPath)
67119
val manifestPath = putManifest(manifestContents, objectPath)
68-
executeCopy(manifestPath, destinationHandler, streamId.rawNamespace, streamId.rawName)
120+
executeCopy(
121+
manifestPath,
122+
destinationHandler,
123+
streamId.rawNamespace,
124+
streamId.rawName,
125+
suffix
126+
)
69127
log.info {
70-
"Copy to target table ${streamId.rawNamespace}.${streamId.rawName} in destination complete."
128+
"Copy to target table ${streamId.rawNamespace}.${streamId.rawName}$suffix in destination complete."
71129
}
72130
}
73131

@@ -172,6 +230,7 @@ class RedshiftStagingStorageOperation(
172230
destinationHandler: RedshiftDestinationHandler,
173231
schemaName: String,
174232
tableName: String,
233+
suffix: String,
175234
) {
176235
val accessKeyId =
177236
s3Config.s3CredentialConfig!!.s3CredentialsProvider.credentials.awsAccessKeyId
@@ -180,7 +239,7 @@ class RedshiftStagingStorageOperation(
180239

181240
val copyQuery =
182241
"""
183-
COPY $schemaName.$tableName FROM '${getFullS3Path(s3Config.bucketName!!, manifestPath)}'
242+
COPY $schemaName.$tableName$suffix FROM '${getFullS3Path(s3Config.bucketName!!, manifestPath)}'
184243
CREDENTIALS 'aws_access_key_id=$accessKeyId;aws_secret_access_key=$secretAccessKey'
185244
CSV GZIP
186245
REGION '${s3Config.bucketRegion}' TIMEFORMAT 'auto'
@@ -195,9 +254,9 @@ class RedshiftStagingStorageOperation(
195254
companion object {
196255
private val nameTransformer = RedshiftSQLNameTransformer()
197256

198-
private fun createRawTableQuery(streamId: StreamId): String {
257+
private fun createRawTableQuery(streamId: StreamId, suffix: String): String {
199258
return """
200-
CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" (
259+
CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}$suffix" (
201260
${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID} VARCHAR(36),
202261
${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT} TIMESTAMPTZ DEFAULT GETDATE(),
203262
${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} TIMESTAMPTZ,
@@ -208,12 +267,8 @@ class RedshiftStagingStorageOperation(
208267
""".trimIndent()
209268
}
210269

211-
private fun truncateRawTableQuery(streamId: StreamId): String {
212-
return String.format(
213-
"""TRUNCATE TABLE "%s"."%s";""",
214-
streamId.rawNamespace,
215-
streamId.rawName
216-
)
270+
private fun truncateRawTableQuery(streamId: StreamId, suffix: String): String {
271+
return """TRUNCATE TABLE "${streamId.rawNamespace}"."${streamId.rawName}$suffix" """
217272
}
218273

219274
private fun getFullS3Path(s3BucketName: String, s3StagingFile: String): String {

airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.kt

+26-6
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,17 @@ class RedshiftDestinationHandler(
8484
execute(sql, logStatements = true)
8585
}
8686

87-
fun execute(sql: Sql, logStatements: Boolean) {
87+
/**
88+
* @param forceCaseSensitiveIdentifier Whether to enable `forceCaseSensitiveIdentifier` on all
89+
* transactions. This option is most useful for accessing fields within a `SUPER` value; for
90+
* accessing schemas/tables/columns, quoting the identifier is sufficient to force
91+
* case-sensitivity, so this option is not necessary.
92+
*/
93+
fun execute(
94+
sql: Sql,
95+
logStatements: Boolean = true,
96+
forceCaseSensitiveIdentifier: Boolean = true
97+
) {
8898
val transactions = sql.transactions
8999
val queryId = UUID.randomUUID()
90100
for (transaction in transactions) {
@@ -103,12 +113,20 @@ class RedshiftDestinationHandler(
103113
// characters, even after
104114
// specifying quotes.
105115
// see https://github.com/airbytehq/airbyte/issues/33900
106-
modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n")
116+
if (forceCaseSensitiveIdentifier) {
117+
modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n")
118+
}
107119
modifiedStatements.addAll(transaction)
108-
jdbcDatabase.executeWithinTransaction(
109-
modifiedStatements,
110-
logStatements = logStatements
111-
)
120+
if (modifiedStatements.size != 1) {
121+
jdbcDatabase.executeWithinTransaction(
122+
modifiedStatements,
123+
logStatements = logStatements
124+
)
125+
} else {
126+
// Redshift doesn't allow some statements to run in a transaction at all,
127+
// so handle the single-statement case specially.
128+
jdbcDatabase.execute(modifiedStatements.first())
129+
}
112130
} catch (e: SQLException) {
113131
log.error(e) { "Sql $queryId-$transactionId failed" }
114132
// This is a big hammer for something that should be much more targetted, only when
@@ -155,6 +173,8 @@ class RedshiftDestinationHandler(
155173
)
156174
}
157175

176+
fun query(sql: String): List<JsonNode> = jdbcDatabase.queryJsons(sql)
177+
158178
private fun toJdbcTypeName(airbyteProtocolType: AirbyteProtocolType): String {
159179
return when (airbyteProtocolType) {
160180
AirbyteProtocolType.STRING -> "varchar"

0 commit comments

Comments
 (0)