Skip to content

Commit

Permalink
DP-2918: Add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
nicmart committed Mar 8, 2024
1 parent 76063c6 commit 1f1b91f
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
package com.celonis.kafka.connect.ems

import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants._
import com.celonis.kafka.connect.ems.parquet.ParquetLocalInputFile
import com.celonis.kafka.connect.ems.parquet.extractParquetFromRequest
import com.celonis.kafka.connect.ems.testcontainers.connect.EmsConnectorConfiguration
import com.celonis.kafka.connect.ems.testcontainers.connect.EmsConnectorConfiguration.TOPICS_KEY
import com.celonis.kafka.connect.ems.testcontainers.scalatest.KafkaConnectContainerPerSuite
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withConnectionCut
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withConnector
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withParquetUploadLatency
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.mockserver.withMockResponse
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.parquet.hadoop.ParquetFileReader
import org.mockserver.verify.VerificationTimes
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -159,4 +163,51 @@ class ErrorPolicyTests extends AnyFunSuite with KafkaConnectContainerPerSuite wi
}
}
}

test("continue on invalid input") {

val sourceTopic = randomTopicName()
val emsTable = randomEmsTable()

val emsConnector = new EmsConnectorConfiguration("ems")
.withConfig(TOPICS_KEY, sourceTopic)
.withConfig(ENDPOINT_KEY, proxyServerUrl)
.withConfig(AUTHORIZATION_KEY, "AppKey key")
.withConfig(TARGET_TABLE_KEY, emsTable)
.withConfig(COMMIT_RECORDS_KEY, 2)
.withConfig(COMMIT_SIZE_KEY, 1000000L)
.withConfig(COMMIT_INTERVAL_KEY, 3600000)
.withConfig(TMP_DIRECTORY_KEY, "/tmp/")
.withConfig(ERROR_POLICY_KEY, "THROW")
.withConfig(ERROR_CONTINUE_ON_INVALID_INPUT_KEY, true)
.withConfig("value.converter.schemas.enable", "false")
.withConfig("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.withConfig("key.converter", "org.apache.kafka.connect.storage.StringConverter")

println(proxyServerUrl)

withMockResponse(emsRequestForTable(emsTable), mockEmsResponse) {
withConnector(emsConnector) {

// The first error should not prevent other records to be ingested, even if they are part of the same put batch
withStringStringProducer { producer =>
producer.send(new ProducerRecord(sourceTopic, """{"":"missingKey"}"""))
producer.send(new ProducerRecord(sourceTopic, """{"x":"validKey"}"""))
producer.send(new ProducerRecord(sourceTopic, """{"x":"validKey"}"""))
}

eventually(timeout(20 seconds), interval(1 seconds)) {
mockServerClient.verify(emsRequestForTable(emsTable), VerificationTimes.once())
val status = kafkaConnectClient.getConnectorStatus(emsConnector.name)
status.tasks.head.state should be("RUNNING")
}

val httpRequests = mockServerClient.retrieveRecordedRequests(emsRequestForTable(emsTable))
val parquetFile = extractParquetFromRequest(httpRequests.head)
val fileReader = ParquetFileReader.open(new ParquetLocalInputFile(parquetFile))

fileReader.getRecordCount shouldBe 2
}
}
}
}

0 comments on commit 1f1b91f

Please sign in to comment.