From f020cc8549ddee454f1f3e6b6db48a5d3482e3d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Martini?= Date: Mon, 11 Mar 2024 14:43:38 +0100 Subject: [PATCH] DP-2918: continue on data error (#62) * DP-2918: drop or serialise collections in schema inference as well * DP-2918: add a test for dropping arrays in normalization * DP-2918: Return Either in normalizer, and fail in the task when normalization fails * DP-2918: Put failing records in DLQ * DP-2918: Change exception name * DP-2918: Introduce ErrorPolicyConfig class * DP-2918: Tests for ErrorPolicyConfig * DP-2918: Add continueOninvalidInput configuration * DP-2918: Call errand record reporter only for InvalidInputException * DP-2918: Change names of a test * DP-2918: Scalafmt * DP-2918: Fix maxretries reset in case of continue on error * DP-2918: Add comment * DP-2918: InvalidInputErrorHandler test * DP-2918: Fix InvalidInputErrorHandler * DP-2918: Integrate InvalidInputErrorHandler into EmsSinkTask * DP-2918: Add a comment * DP-2918: Add e2e test * DP-2918: Remove a comment * DP-2918: Scalafmt * DP-2918: remove a println --- .../connect/ems/config/EmsSinkConfig.scala | 10 +- .../ems/config/EmsSinkConfigConstants.scala | 13 ++- .../connect/ems/config/EmsSinkConfigDef.scala | 15 ++- .../ems/config/ErrorPolicyConfig.scala | 75 +++++++++++++++ .../connect/ems/config/RetryConfig.scala | 6 +- .../connect/ems/errors/EmsSinkException.scala | 13 +-- .../connect/ems/errors/ErrorPolicy.scala | 22 +---- .../ems/errors/InvalidInputErrorHandler.scala | 36 ++++++++ .../kafka/connect/ems/sink/EmsSinkTask.scala | 60 ++++++++---- .../InferSchemaAndNormaliseValue.scala | 71 ++++++++------ .../connect/transform/RecordTransformer.scala | 2 +- .../connect/transform/flatten/Flattener.scala | 6 +- .../flatten/NormalisingFlattener.scala | 7 +- .../ems/config/EmsSinkConfigTest.scala | 20 ++-- .../ems/config/ErrorPolicyConfigTests.scala | 92 +++++++++++++++++++ .../connect/ems/config/ErrorPolicyTests.scala | 38 +------- ...etryTests.scala => RetryConfigTests.scala} | 16 ++-- .../SchemaLessJsonValueConverterTest.scala | 14 +-- .../errors/InvalidInputErrorHandlerTest.scala | 82 +++++++++++++++++ .../ems/sink/EmsSinkTaskObfuscationTest.scala | 11 +-- .../formats/ParquetFormatWriterTests.scala | 3 +- .../InferSchemaAndNormaliseValueTest.scala | 87 +++++++++++------- .../transform/RecordTransformerTest.scala | 38 ++++---- .../kafka/connect/ems/ErrorPolicyTests.scala | 49 ++++++++++ 24 files changed, 562 insertions(+), 224 deletions(-) create mode 100644 connector/src/main/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfig.scala create mode 100644 connector/src/main/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandler.scala create mode 100644 connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfigTests.scala rename connector/src/test/scala/com/celonis/kafka/connect/ems/config/{RetryTests.scala => RetryConfigTests.scala} (72%) create mode 100644 connector/src/test/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandlerTest.scala diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala index 0ae92318..40e65aef 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala @@ -18,7 +18,6 @@ package com.celonis.kafka.connect.ems.config import cats.implicits._ import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants._ -import com.celonis.kafka.connect.ems.errors.ErrorPolicy import com.celonis.kafka.connect.ems.storage.FileSystemOperations import com.celonis.kafka.connect.transform.FlattenerConfig import com.celonis.kafka.connect.transform.PreConversionConfig @@ -36,9 +35,8 @@ final case class EmsSinkConfig( target: String, connectionId: Option[String], authorization: AuthorizationHeader, - errorPolicy: ErrorPolicy, + errorPolicyConfig: ErrorPolicyConfig, commitPolicy: CommitPolicyConfig, - retries: RetryConfig, workingDir: Path, parquet: ParquetConfig, primaryKeys: List[String], @@ -106,8 +104,7 @@ object EmsSinkConfig { url <- extractURL(props) table <- extractTargetTable(props) authorization <- AuthorizationHeader.extract(props) - error <- ErrorPolicy.extract(props) - retry <- RetryConfig.extractRetry(props) + errorPolicyConfig <- ErrorPolicyConfig.extract(props) useInMemoryFs = PropertiesHelper.getBoolean(props, USE_IN_MEMORY_FS_KEY).getOrElse(USE_IN_MEMORY_FS_DEFAULT) allowNullsAsPks = PropertiesHelper.getBoolean(props, NULL_PK_KEY).getOrElse(NULL_PK_KEY_DEFAULT) tempDir <- if (useInMemoryFs) Right(FileSystemOperations.InMemoryPseudoDir) else extractWorkingDirectory(props) @@ -135,9 +132,8 @@ object EmsSinkConfig { table, connectionId, authorization, - error, + errorPolicyConfig, commitPolicy, - retry, tempDir, parquetConfig, primaryKeys, diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala index 3f3c12bd..ffc18241 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala @@ -64,10 +64,10 @@ object EmsSinkConfigConstants { | Default is $PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT. | """.stripMargin - val NBR_OF_RETRIES_KEY = s"$CONNECTOR_PREFIX.max.retries" - val NBR_OF_RETRIES_DOC = + val ERROR_POLICY_RETRIES_KEY = s"$CONNECTOR_PREFIX.max.retries" + val ERROR_POLICY_RETRIES_DOC = "The maximum number of times to re-attempt to write the records before the task is marked as failed." - val NBR_OF_RETIRES_DEFAULT: Int = 10 + val ERROR_POLICY_RETRIES_DEFAULT: Int = 10 val ERROR_POLICY_KEY = s"$CONNECTOR_PREFIX.error.policy" val ERROR_POLICY_DOC: String = @@ -76,7 +76,7 @@ object EmsSinkConfigConstants { | There are three available options: | CONTINUE - the error is swallowed | THROW - the error is allowed to propagate. - | RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by $NBR_OF_RETRIES_KEY. + | RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by $ERROR_POLICY_RETRIES_KEY. |All errors will be logged automatically, even if the code swallows them. """.stripMargin val ERROR_POLICY_DEFAULT = "THROW" @@ -85,6 +85,11 @@ object EmsSinkConfigConstants { val ERROR_RETRY_INTERVAL_DOC = "The time in milliseconds between retries." val ERROR_RETRY_INTERVAL_DEFAULT: Long = 60000L + val ERROR_CONTINUE_ON_INVALID_INPUT_KEY = s"$CONNECTOR_PREFIX.error.policy.continue.on.invalid.input" + val ERROR_CONTINUE_ON_INVALID_INPUT_DOC: String = + "If set to 'true', connector will continue when invalid input errors occur. Invalid records will be sent to the DLQ, if present. " + val ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT = false + val FALLBACK_VARCHAR_LENGTH_KEY = s"$CONNECTOR_PREFIX.data.fallback.varchar.length" val FALLBACK_VARCHAR_LENGTH_DOC = "Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS. Must be greater than 0 and smaller or equal than 65000" diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala index 449bb1f6..6651252b 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala @@ -134,15 +134,15 @@ object EmsSinkConfigDef { ERROR_POLICY_DOC, ) .define( - NBR_OF_RETRIES_KEY, + ERROR_POLICY_RETRIES_KEY, Type.INT, - NBR_OF_RETIRES_DEFAULT, + ERROR_POLICY_RETRIES_DEFAULT, Importance.MEDIUM, - NBR_OF_RETRIES_DOC, + ERROR_POLICY_RETRIES_DOC, "Error", 2, ConfigDef.Width.LONG, - NBR_OF_RETRIES_KEY, + ERROR_POLICY_RETRIES_KEY, ) .define( ERROR_RETRY_INTERVAL, @@ -155,6 +155,13 @@ object EmsSinkConfigDef { ConfigDef.Width.LONG, ERROR_RETRY_INTERVAL, ) + .define( + ERROR_CONTINUE_ON_INVALID_INPUT_KEY, + Type.BOOLEAN, + ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT, + Importance.MEDIUM, + ERROR_CONTINUE_ON_INVALID_INPUT_DOC, + ) .define( PARQUET_ROW_GROUP_SIZE_BYTES_KEY, Type.INT, diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfig.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfig.scala new file mode 100644 index 00000000..985bda84 --- /dev/null +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfig.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2024 Celonis SE + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.celonis.kafka.connect.ems.config + +import cats.syntax.either._ +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_CONTINUE_ON_INVALID_INPUT_KEY +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_DOC +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_KEY +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType.CONTINUE +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType.RETRY +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType.THROW +import com.celonis.kafka.connect.ems.config.PropertiesHelper.error +import com.celonis.kafka.connect.ems.config.PropertiesHelper.getBoolean +import com.celonis.kafka.connect.ems.config.PropertiesHelper.nonEmptyStringOr +import com.celonis.kafka.connect.ems.errors.ErrorPolicy +import com.celonis.kafka.connect.ems.errors.InvalidInputErrorHandler +import org.apache.kafka.connect.sink.ErrantRecordReporter + +final case class ErrorPolicyConfig( + policyType: ErrorPolicyType, + retryConfig: RetryConfig, + continueOnInvalidInput: Boolean, +) { + lazy val errorPolicy: ErrorPolicy = + policyType match { + case ErrorPolicyType.THROW => ErrorPolicy.Throw + case ErrorPolicyType.CONTINUE => ErrorPolicy.Continue + case ErrorPolicyType.RETRY => ErrorPolicy.Retry + } + + def invalidInputErrorHandler(reporter: Option[ErrantRecordReporter]): InvalidInputErrorHandler = + new InvalidInputErrorHandler(continueOnInvalidInput, reporter) +} + +object ErrorPolicyConfig { + sealed trait ErrorPolicyType + object ErrorPolicyType { + case object THROW extends ErrorPolicyType + case object CONTINUE extends ErrorPolicyType + case object RETRY extends ErrorPolicyType + } + + def extract(props: Map[String, _]): Either[String, ErrorPolicyConfig] = + for { + policyType <- extractType(props) + retryConfig <- RetryConfig.extractRetry(props) + continueOnInvalidInput = + getBoolean(props, ERROR_CONTINUE_ON_INVALID_INPUT_KEY).getOrElse(ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT) + } yield ErrorPolicyConfig(policyType, retryConfig, continueOnInvalidInput = continueOnInvalidInput) + + private def extractType(props: Map[String, _]): Either[String, ErrorPolicyType] = + nonEmptyStringOr(props, ERROR_POLICY_KEY, ERROR_POLICY_DOC).map(_.toUpperCase) + .flatMap { + case "THROW" => THROW.asRight + case "RETRY" => RETRY.asRight + case "CONTINUE" => CONTINUE.asRight + case _ => error(ERROR_POLICY_KEY, ERROR_POLICY_DOC) + } +} diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/RetryConfig.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/RetryConfig.scala index 03efe6ab..30320945 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/RetryConfig.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/RetryConfig.scala @@ -32,11 +32,11 @@ object RetryConfig { else value.asRight[String] case None => ERROR_RETRY_INTERVAL_DEFAULT.asRight[String] } - retries <- PropertiesHelper.getInt(props, NBR_OF_RETRIES_KEY) match { + retries <- PropertiesHelper.getInt(props, ERROR_POLICY_RETRIES_KEY) match { case Some(value) => - if (value <= 0) error(NBR_OF_RETRIES_KEY, "Number of retries needs to be greater than 0.") + if (value <= 0) error(ERROR_POLICY_RETRIES_KEY, "Number of retries needs to be greater than 0.") else value.asRight[String] - case None => NBR_OF_RETIRES_DEFAULT.asRight[String] + case None => ERROR_POLICY_RETRIES_DEFAULT.asRight[String] } } yield RetryConfig(retries, interval) diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/EmsSinkException.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/EmsSinkException.scala index 2bedf27c..c3546fa9 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/EmsSinkException.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/EmsSinkException.scala @@ -17,23 +17,16 @@ package com.celonis.kafka.connect.ems.errors import org.apache.kafka.connect.errors.ConnectException -import org.http4s.DecodeFailure import org.http4s.Status sealed trait EmsSinkException { def getMessage: String } -case class UploadFailedException(status: Status, msg: String, throwable: Throwable) +final case class UploadFailedException(status: Status, msg: String, throwable: Throwable) extends ConnectException(msg, throwable) with EmsSinkException -case class UnexpectedUploadException(msg: String, throwable: Throwable) extends ConnectException(msg, throwable) +final case class InvalidInputException(msg: String) extends ConnectException(msg) with EmsSinkException -case class UploadInvalidResponseException(failure: DecodeFailure) - extends ConnectException(failure.getMessage(), failure) - with EmsSinkException - -case class InvalidInputException(msg: String) extends ConnectException(msg) with EmsSinkException - -case class FailedObfuscationException(msg: String) extends ConnectException(msg) with EmsSinkException +final case class FailedObfuscationException(msg: String) extends ConnectException(msg) with EmsSinkException diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/ErrorPolicy.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/ErrorPolicy.scala index b1a9acd2..d5e928ac 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/ErrorPolicy.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/ErrorPolicy.scala @@ -16,24 +16,15 @@ package com.celonis.kafka.connect.ems.errors -import cats.syntax.either._ -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_DOC -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_KEY -import com.celonis.kafka.connect.ems.config.PropertiesHelper.error -import com.celonis.kafka.connect.ems.config.PropertiesHelper.nonEmptyStringOr import com.typesafe.scalalogging.StrictLogging - -import enumeratum._ -import enumeratum.EnumEntry.Uppercase import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.errors.RetriableException -sealed trait ErrorPolicy extends EnumEntry with Uppercase { +sealed trait ErrorPolicy { def handle(error: Throwable, retries: Int): Unit } -object ErrorPolicy extends Enum[ErrorPolicy] { - val values: IndexedSeq[ErrorPolicy] = findValues +object ErrorPolicy { case object Continue extends ErrorPolicy with StrictLogging { override def handle(error: Throwable, retries: Int): Unit = @@ -50,6 +41,7 @@ object ErrorPolicy extends Enum[ErrorPolicy] { case object Retry extends ErrorPolicy with StrictLogging { override def handle(error: Throwable, retries: Int): Unit = if (retries == 0) { + logger.warn(s"Error policy is set to RETRY and no more attempts left.", error) throw new ConnectException(error) } else { logger.warn(s"Error policy is set to RETRY. Remaining attempts [$retries]", error) @@ -57,12 +49,4 @@ object ErrorPolicy extends Enum[ErrorPolicy] { } } - def extract(props: Map[String, _]): Either[String, ErrorPolicy] = - nonEmptyStringOr(props, ERROR_POLICY_KEY, ERROR_POLICY_DOC) - .flatMap { constant => - ErrorPolicy.withNameInsensitiveOption(constant) match { - case Some(value) => value.asRight[String] - case None => error(ERROR_POLICY_KEY, ERROR_POLICY_DOC) - } - } } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandler.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandler.scala new file mode 100644 index 00000000..2465498e --- /dev/null +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandler.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Celonis SE + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.celonis.kafka.connect.ems.errors + +import com.typesafe.scalalogging.StrictLogging +import org.apache.kafka.connect.sink.ErrantRecordReporter +import org.apache.kafka.connect.sink.SinkRecord + +/** Error policies work at the batch level, while this handler works at the record level. It works only for + * InvalidInputExceptions, that are errors due to defects of single records. + */ +final class InvalidInputErrorHandler( + continueOnInvalidInput: Boolean, + errantRecordReporter: Option[ErrantRecordReporter], +) extends StrictLogging { + def handle(record: SinkRecord, error: Throwable): Unit = error match { + case _: InvalidInputException if continueOnInvalidInput => + logger.warn("Error policy is set to CONTINUE on InvalidInput", error) + errantRecordReporter.foreach(errantRecordReporter => errantRecordReporter.report(record, error)) + case _ => throw error + } +} diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala index 79fa8d71..94af742f 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala @@ -21,9 +21,10 @@ import cats.effect.IO import cats.effect.kernel.Ref import cats.effect.unsafe.implicits.global import cats.implicits._ +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType.RETRY import com.celonis.kafka.connect.ems.config.EmsSinkConfig import com.celonis.kafka.connect.ems.errors.ErrorPolicy -import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Retry +import com.celonis.kafka.connect.ems.errors.InvalidInputErrorHandler import com.celonis.kafka.connect.ems.model._ import com.celonis.kafka.connect.ems.sink.EmsSinkTask.PutTimeoutException import com.celonis.kafka.connect.ems.sink.EmsSinkTask.StopTimeout @@ -38,6 +39,7 @@ import okhttp3.OkHttpClient import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition } import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.ErrantRecordReporter import org.apache.kafka.connect.sink.SinkRecord import org.apache.kafka.connect.sink.SinkTask @@ -45,6 +47,7 @@ import java.util import scala.concurrent.TimeoutException import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.util.Try object EmsSinkTask { private val StopTimeout: FiniteDuration = 5.seconds @@ -66,9 +69,15 @@ class EmsSinkTask extends SinkTask with StrictLogging { private var writerManager: WriterManager[IO] = _ private var sinkName: String = _ - private var maxRetries: Int = 0 - private var retriesLeft: Int = maxRetries - private var errorPolicy: ErrorPolicy = ErrorPolicy.Retry + private var maxRetries: Int = 0 + private var retriesLeft: Int = maxRetries + + // Error Policy acts at the batch level + private var errorPolicy: ErrorPolicy = ErrorPolicy.Retry + + // InvalidInput acts at the record level + private var invalidInputErrorHandler: InvalidInputErrorHandler = _ + private var transformer: RecordTransformer = _ private val emsSinkConfigurator: EmsSinkConfigurator = new DefaultEmsSinkConfigurator private var okHttpClient: OkHttpClient = _ @@ -121,11 +130,12 @@ class EmsSinkTask extends SinkTask with StrictLogging { ) } - maxRetries = config.retries.retries - retriesLeft = maxRetries - errorPolicy = config.errorPolicy - transformer = RecordTransformer.fromConfig(config) - emsSinkConfig = config + maxRetries = config.errorPolicyConfig.retryConfig.retries + retriesLeft = maxRetries + errorPolicy = config.errorPolicyConfig.errorPolicy + invalidInputErrorHandler = config.errorPolicyConfig.invalidInputErrorHandler(errantRecordReporter()) + transformer = RecordTransformer.fromConfig(config) + emsSinkConfig = config } override def put(records: util.Collection[SinkRecord]): Unit = { @@ -135,14 +145,7 @@ class EmsSinkTask extends SinkTask with StrictLogging { // filter our "deletes" for now .filter(_.value() != null) .toList - .traverse { record => - for { - avroRecord <- transformer.transform(record) - tp = TopicPartition(new Topic(record.topic()), new Partition(record.kafkaPartition())) - metadata = RecordMetadata(tp, new Offset(record.kafkaOffset())) - _ <- writerManager.write(Record(avroRecord, metadata)) - } yield () - } + .traverse(processSingleRecordOrReport) _ <- if (records.isEmpty) writerManager.maybeUploadData else IO(()) } yield () @@ -155,11 +158,28 @@ class EmsSinkTask extends SinkTask with StrictLogging { case Left(error) => retriesLeft -= 1 errorPolicy.handle(error, retriesLeft) + retriesLeft = maxRetries case Right(_) => retriesLeft = maxRetries } } + private def errantRecordReporter(): Option[ErrantRecordReporter] = + Option(context).flatMap(context => Option(context.errantRecordReporter())) + + private def processSingleRecord(record: SinkRecord): IO[Unit] = for { + avroRecord <- transformer.transform(record) + tp = TopicPartition(new Topic(record.topic()), new Partition(record.kafkaPartition())) + metadata = RecordMetadata(tp, new Offset(record.kafkaOffset())) + _ <- writerManager.write(Record(avroRecord, metadata)) + } yield () + + private def processSingleRecordOrReport(record: SinkRecord): IO[Unit] = + processSingleRecord(record).attempt.flatMap { + case Left(throwable) => IO.fromTry(Try(invalidInputErrorHandler.handle(record, throwable))) + case _ => IO.unit + } + override def preCommit( currentOffsets: util.Map[KafkaTopicPartition, OffsetAndMetadata]): util.Map[KafkaTopicPartition, OffsetAndMetadata, @@ -224,7 +244,7 @@ class EmsSinkTask extends SinkTask with StrictLogging { } yield ()).attempt.unsafeRunSync() match { case Left(value) => logger.warn( - s"[$sinkName]There was an error closing the partitions: ${topicPartitions.map(_.show).mkString(",")}]]", + s"[$sinkName] There was an error closing the partitions: ${topicPartitions.map(_.show).mkString(",")}]]", value, ) case Right(_) => @@ -248,8 +268,8 @@ class EmsSinkTask extends SinkTask with StrictLogging { private def maybeSetErrorInterval(config: EmsSinkConfig): Unit = // if error policy is retry set retry interval - config.errorPolicy match { - case Retry => Option(context).foreach(_.timeout(config.retries.interval)) + config.errorPolicyConfig.policyType match { + case RETRY => Option(context).foreach(_.timeout(config.errorPolicyConfig.retryConfig.interval)) case _ => } } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValue.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValue.scala index 6fc1b434..98029734 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValue.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValue.scala @@ -16,16 +16,19 @@ package com.celonis.kafka.connect.transform -import cats.implicits.catsSyntaxOptionId import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct import scala.jdk.CollectionConverters._ import cats.instances.list._ -import cats.instances.option._ +import cats.instances.either._ import cats.syntax.traverse._ +import com.celonis.kafka.connect.ems.errors.InvalidInputException +import com.celonis.kafka.connect.transform.InferSchemaAndNormaliseValue.ValueAndSchema +import com.celonis.kafka.connect.transform.flatten.ConnectJsonConverter +import java.nio.charset.StandardCharsets import scala.collection.immutable.ListMap /** This component does multiple things: @@ -34,7 +37,7 @@ import scala.collection.immutable.ListMap * * We should split inference from normalisation, even if that will complicate the implementation */ -object InferSchemaAndNormaliseValue { +final class InferSchemaAndNormaliseValue(discardCollections: Boolean) { /** Tries to infer a non-flat Kafka connect schema for a value. * @@ -46,53 +49,60 @@ object InferSchemaAndNormaliseValue { */ // TODO: Why optionals at this stage? - def apply(value: Any): Option[ValueAndSchema] = value match { + def apply(value: Any): Either[InvalidInputException, ValueAndSchema] = value match { case _: String => - Some(ValueAndSchema(value, Schema.OPTIONAL_STRING_SCHEMA)) + Right(ValueAndSchema(value, Schema.OPTIONAL_STRING_SCHEMA)) case _: Long => - Some(ValueAndSchema(value, Schema.OPTIONAL_INT64_SCHEMA)) + Right(ValueAndSchema(value, Schema.OPTIONAL_INT64_SCHEMA)) case _: Int => - Some(ValueAndSchema(value, Schema.OPTIONAL_INT32_SCHEMA)) + Right(ValueAndSchema(value, Schema.OPTIONAL_INT32_SCHEMA)) case _: Boolean => - Some(ValueAndSchema(value, Schema.OPTIONAL_BOOLEAN_SCHEMA)) + Right(ValueAndSchema(value, Schema.OPTIONAL_BOOLEAN_SCHEMA)) case _: Float => - Some(ValueAndSchema(value, Schema.OPTIONAL_FLOAT64_SCHEMA)) + Right(ValueAndSchema(value, Schema.OPTIONAL_FLOAT64_SCHEMA)) case _: Double => - Some(ValueAndSchema(value, Schema.OPTIONAL_FLOAT64_SCHEMA)) + Right(ValueAndSchema(value, Schema.OPTIONAL_FLOAT64_SCHEMA)) case value: Struct => - Some(ValueAndSchema(value, value.schema())) + Right(ValueAndSchema(value, value.schema())) case _: Array[Byte] => - Some(ValueAndSchema(value, Schema.OPTIONAL_BYTES_SCHEMA)) + Right(ValueAndSchema(value, Schema.OPTIONAL_BYTES_SCHEMA)) case list: java.util.List[_] => listSchema(list) case innerMap: java.util.Map[_, _] => mapSchema(innerMap) + case _ => - None + val className = if (value == null) "null" else value.getClass.getCanonicalName + Left(InvalidInputException(s"Unexpected value type encountered: $className")) } - private def mapSchema(values: java.util.Map[_, _]): Option[ValueAndSchema] = + private def mapSchema(values: java.util.Map[_, _]): Either[InvalidInputException, ValueAndSchema] = if (values.isEmpty) // TODO test this - Some(ValueAndSchema(values, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BYTES_SCHEMA).build())) + Right(ValueAndSchema(values, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BYTES_SCHEMA).build())) else { - val inferredValues = values.asScala.toMap.filterNot(_._2 == null).toList.traverse { - case (key, value) if key.toString.nonEmpty => InferSchemaAndNormaliseValue(value).map(key.toString -> _) - case _ => None + val inferredValues = values.asScala.toMap.filterNot(isValueToBeDiscarded).toList.traverse { + case (key, value) if key.toString.nonEmpty => apply(value).map(key.toString -> _) + case _ => Left(InvalidInputException("JSON empty key encountered")) } inferredValues.map(values => toStruct(ListMap.from(values))) } - private def listSchema(values: java.util.List[_]): Option[ValueAndSchema] = - values.asScala.toList.traverse(InferSchemaAndNormaliseValue.apply).flatMap { results => - if (results.isEmpty) { - // If the collection is empty, we default to an array of bytes - ValueAndSchema(values, SchemaBuilder.array(Schema.BYTES_SCHEMA).build()).some - } else if (results.map(_.schema).toSet.size > 1) - // If the collection is not empty and contains element of different types, we fail the inference - None - else - ValueAndSchema(results.map(_.normalisedValue).asJava, SchemaBuilder.array(results.head.schema).build()).some - } + /** We discard values if they are null, and if they are lists when discardCollections is set to true + */ + private def isValueToBeDiscarded(keyValue: (Any, Any)): Boolean = keyValue match { + case (_, null) => true // discard fields with empty value + case (_, _: java.util.List[_]) if discardCollections => true // discard arrays when discardCollection is true + case _ => false + } + + private def listSchema(values: java.util.List[_]): Either[InvalidInputException, ValueAndSchema] = { + + val normalisedValue = new String( + ConnectJsonConverter.converter.fromConnectData("ignored", null, values), + StandardCharsets.UTF_8, + ) + Right(ValueAndSchema(normalisedValue, Schema.OPTIONAL_STRING_SCHEMA)) + } private def toStruct(values: ListMap[String, ValueAndSchema]): ValueAndSchema = { val schema = values.foldLeft(SchemaBuilder.struct()) { @@ -104,5 +114,8 @@ object InferSchemaAndNormaliseValue { ValueAndSchema(struct, schema) } +} + +object InferSchemaAndNormaliseValue { case class ValueAndSchema(normalisedValue: Any, schema: Schema) } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala index 618dbb27..d7b46af9 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala @@ -57,9 +57,9 @@ final class RecordTransformer( def transform(sinkRecord: SinkRecord): IO[GenericRecord] = { val (convertedValue, convertedSchema) = preConversion.convert(sinkRecord.value(), Option(sinkRecord.valueSchema())) - val flattenedValue = flattener.flatten(convertedValue, convertedSchema) for { + flattenedValue <- IO(flattener.flatten(convertedValue, convertedSchema)) transformedValue <- IO( inserter.insertFields( flattenedValue, diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/Flattener.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/Flattener.scala index b3878a46..21ba1d24 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/Flattener.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/Flattener.scala @@ -17,6 +17,7 @@ package com.celonis.kafka.connect.transform.flatten import com.celonis.kafka.connect.transform.FlattenerConfig +import com.celonis.kafka.connect.transform.InferSchemaAndNormaliseValue import org.apache.kafka.connect.data.Schema trait Flattener { @@ -30,7 +31,10 @@ object Flattener { config match { case Some(config) => config.jsonBlobChunks match { case Some(jsonBlobChunks) => new ChunkedJsonBlobFlattener(jsonBlobChunks) - case None => new NormalisingFlattener(new StructFlattener(new SchemaFlattener(config.discardCollections))) + case None => new NormalisingFlattener( + new StructFlattener(new SchemaFlattener(config.discardCollections)), + new InferSchemaAndNormaliseValue(config.discardCollections), + ) } case None => noOpFlattener } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/NormalisingFlattener.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/NormalisingFlattener.scala index 836c8c9c..c8fc497a 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/NormalisingFlattener.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/flatten/NormalisingFlattener.scala @@ -21,12 +21,15 @@ import org.apache.kafka.connect.data.Schema /** Add inference and normalisation on top of an existing flattener */ -private final class NormalisingFlattener(private[flatten] val innerFlattener: Flattener) extends Flattener { +private final class NormalisingFlattener( + private[flatten] val innerFlattener: Flattener, + private[flatten] val normaliser: InferSchemaAndNormaliseValue, +) extends Flattener { override def flatten(originalValue: Any, originalSchema: Option[Schema]): Any = { val valueAndSchema = originalSchema match { case Some(originalSchema) => ValueAndSchema(originalValue, originalSchema) case None => - InferSchemaAndNormaliseValue(originalValue).getOrElse(ValueAndSchema(originalValue, Schema.BYTES_SCHEMA)) + normaliser(originalValue).toTry.get } innerFlattener.flatten(valueAndSchema.normalisedValue, Some(valueAndSchema.schema)) diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala index c829c744..5ccb57bf 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala @@ -17,18 +17,13 @@ package com.celonis.kafka.connect.ems.config import cats.implicits.catsSyntaxOptionId -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants._ import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.{ CLOSE_EVERY_CONNECTION_DEFAULT_VALUE => CLOSE_CONN_DEFAULT, -} -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.{ CONNECTION_POOL_KEEPALIVE_MILLIS_DEFAULT_VALUE => KEEPALIVE_DEFAULT, -} -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.{ CONNECTION_POOL_MAX_IDLE_CONNECTIONS_DEFAULT_VALUE => MAX_IDLE_DEFAULT, + _, } -import com.celonis.kafka.connect.ems.errors.ErrorPolicy -import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Retry +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType import com.celonis.kafka.connect.ems.model.DataObfuscation.FixObfuscation import com.celonis.kafka.connect.ems.storage.FileSystemOperations import com.celonis.kafka.connect.ems.storage.ParquetFileCleanupDelete @@ -56,9 +51,8 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers { target = "tableA", connectionId = Some("id222"), authorization = AuthorizationHeader("AppKey 123"), - errorPolicy = Retry, + errorPolicyConfig = ErrorPolicyConfig(ErrorPolicyType.RETRY, RetryConfig(10, 1000), continueOnInvalidInput = false), commitPolicy = CommitPolicyConfig(1000000L, 10.seconds.toMillis, 1000), - retries = RetryConfig(10, 1000), workingDir = new File(UUID.randomUUID().toString).toPath, parquet = ParquetConfig.default, primaryKeys = List("a", "b"), @@ -132,7 +126,7 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers { withMissingConfig(ERROR_POLICY_KEY) { case Left(_) => fail(s"Should not fail ") case Right(value) => - value.errorPolicy shouldBe ErrorPolicy.Throw + value.errorPolicyConfig.policyType shouldBe ErrorPolicyType.THROW () } } @@ -250,12 +244,12 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers { ENDPOINT_KEY -> config.url.toString, TARGET_TABLE_KEY -> config.target, AUTHORIZATION_KEY -> config.authorization.header, - ERROR_POLICY_KEY -> config.errorPolicy.entryName, + ERROR_POLICY_KEY -> config.errorPolicyConfig.policyType.toString, COMMIT_SIZE_KEY -> config.commitPolicy.fileSize, COMMIT_INTERVAL_KEY -> config.commitPolicy.interval, COMMIT_RECORDS_KEY -> config.commitPolicy.records, - ERROR_RETRY_INTERVAL -> config.retries.interval, - NBR_OF_RETRIES_KEY -> config.retries.retries, + ERROR_RETRY_INTERVAL -> config.errorPolicyConfig.retryConfig.interval, + ERROR_POLICY_RETRIES_KEY -> config.errorPolicyConfig.retryConfig.retries, TMP_DIRECTORY_KEY -> config.workingDir.toString, PRIMARY_KEYS_KEY -> config.primaryKeys.mkString(","), CONNECTION_ID_KEY -> config.connectionId.get, diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfigTests.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfigTests.scala new file mode 100644 index 00000000..91e38c6a --- /dev/null +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyConfigTests.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2024 Celonis SE + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.celonis.kafka.connect.ems.config + +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_CONTINUE_ON_INVALID_INPUT_KEY +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_DOC +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_KEY +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType +import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Continue +import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Retry +import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Throw +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class ErrorPolicyConfigTests extends AnyFunSuite with Matchers { + test(s"extract returns an error if $ERROR_POLICY_KEY is missing") { + val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" + ErrorPolicyConfig.extract(Map.empty) shouldBe Left(expectedMessage) + ErrorPolicyConfig.extract(Map("a" -> "b", "b" -> 1)) shouldBe Left(expectedMessage) + ErrorPolicyConfig.extract(Map("a" -> "b", ERROR_POLICY_KEY + ".ext" -> 1)) shouldBe Left(expectedMessage) + } + + test(s"return an error if $ERROR_POLICY_KEY is empty") { + val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "")) shouldBe Left(expectedMessage) + } + + test(s"return an error if $ERROR_POLICY_KEY is not a string") { + val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> 2)) shouldBe Left(expectedMessage) + } + + test(s"return an error if $ERROR_POLICY_KEY is invalid") { + val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "retry.")) shouldBe Left(expectedMessage) + } + + test(s"return the target policy type provided with $ERROR_POLICY_KEY") { + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "retry")).map(_.policyType) shouldBe Right(ErrorPolicyType.RETRY) + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "throw")).map(_.policyType) shouldBe Right(ErrorPolicyType.THROW) + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "continue")).map(_.policyType) shouldBe Right( + ErrorPolicyType.CONTINUE, + ) + + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "rEtry")).map(_.policyType) shouldBe Right(ErrorPolicyType.RETRY) + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "THROW")).map(_.policyType) shouldBe Right(ErrorPolicyType.THROW) + ErrorPolicyConfig.extract(Map(ERROR_POLICY_KEY -> "conTinue")).map(_.policyType) shouldBe Right( + ErrorPolicyType.CONTINUE, + ) + } + + test(s"it parses $ERROR_CONTINUE_ON_INVALID_INPUT_KEY") { + val testCases = List( + Map(ERROR_POLICY_KEY -> "retry") + -> ErrorPolicyConfig(ErrorPolicyType.RETRY, RetryConfig(10, 60000), continueOnInvalidInput = false), + Map(ERROR_POLICY_KEY -> "retry", ERROR_CONTINUE_ON_INVALID_INPUT_KEY -> "false") + -> ErrorPolicyConfig(ErrorPolicyType.RETRY, RetryConfig(10, 60000), continueOnInvalidInput = false), + Map(ERROR_POLICY_KEY -> "retry", ERROR_CONTINUE_ON_INVALID_INPUT_KEY -> "true") + -> ErrorPolicyConfig(ErrorPolicyType.RETRY, RetryConfig(10, 60000), continueOnInvalidInput = true), + ) + + testCases.foreach { case (props, expectedConfig) => + assertResult(Right(expectedConfig))(ErrorPolicyConfig.extract(props)) + } + } + + test("build Error Policy based on config") { + val testCases = List( + ErrorPolicyConfig(ErrorPolicyType.CONTINUE, RetryConfig(10, 60000), false) -> Continue, + ErrorPolicyConfig(ErrorPolicyType.THROW, RetryConfig(10, 60000), false) -> Throw, + ErrorPolicyConfig(ErrorPolicyType.RETRY, RetryConfig(10, 60000), false) -> Retry, + ) + + testCases.foreach { case (config, expectedPolicy) => + config.errorPolicy shouldBe expectedPolicy + } + } +} diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyTests.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyTests.scala index b8a36fd2..4132acdf 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyTests.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/ErrorPolicyTests.scala @@ -16,49 +16,15 @@ package com.celonis.kafka.connect.ems.config -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_DOC -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_KEY -import com.celonis.kafka.connect.ems.errors.ErrorPolicy import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Continue import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Retry import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Throw -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.errors.RetriableException +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers class ErrorPolicyTests extends AnyFunSuite with Matchers { - test(s"return an error if $ERROR_POLICY_KEY is missing") { - val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" - ErrorPolicy.extract(Map.empty) shouldBe Left(expectedMessage) - ErrorPolicy.extract(Map("a" -> "b", "b" -> 1)) shouldBe Left(expectedMessage) - ErrorPolicy.extract(Map("a" -> "b", ERROR_POLICY_KEY + ".ext" -> 1)) shouldBe Left(expectedMessage) - } - - test(s"return an error if $ERROR_POLICY_KEY is empty") { - val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "")) shouldBe Left(expectedMessage) - } - - test(s"return an error if $ERROR_POLICY_KEY is not a string") { - val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> 2)) shouldBe Left(expectedMessage) - } - - test(s"return an error if $ERROR_POLICY_KEY is invalid") { - val expectedMessage = s"Invalid [$ERROR_POLICY_KEY]. $ERROR_POLICY_DOC" - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "retry.")) shouldBe Left(expectedMessage) - } - - test(s"return the target table provided with $ERROR_POLICY_KEY") { - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "retry")) shouldBe Right(Retry) - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "throw")) shouldBe Right(Throw) - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "continue")) shouldBe Right(Continue) - - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "rEtry")) shouldBe Right(Retry) - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "THROW")) shouldBe Right(Throw) - ErrorPolicy.extract(Map(ERROR_POLICY_KEY -> "conTinue")) shouldBe Right(Continue) - } test(s"handle an error according to the configured error policy") { val throwable = new RuntimeException() diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/RetryTests.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/RetryConfigTests.scala similarity index 72% rename from connector/src/test/scala/com/celonis/kafka/connect/ems/config/RetryTests.scala rename to connector/src/test/scala/com/celonis/kafka/connect/ems/config/RetryConfigTests.scala index f163ed27..29e30c5e 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/RetryTests.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/RetryConfigTests.scala @@ -18,20 +18,20 @@ package com.celonis.kafka.connect.ems.config import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_RETRY_INTERVAL import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_RETRY_INTERVAL_DEFAULT -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.NBR_OF_RETIRES_DEFAULT -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.NBR_OF_RETRIES_KEY +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_RETRIES_DEFAULT +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_RETRIES_KEY import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -class RetryTests extends AnyFunSuite with Matchers { +class RetryConfigTests extends AnyFunSuite with Matchers { test(s"return defaults if retry keys are missing") { - RetryConfig.extractRetry(Map.empty) shouldBe Right(RetryConfig(NBR_OF_RETIRES_DEFAULT, + RetryConfig.extractRetry(Map.empty) shouldBe Right(RetryConfig(ERROR_POLICY_RETRIES_DEFAULT, ERROR_RETRY_INTERVAL_DEFAULT, )) } test(s"return the retry config") { - RetryConfig.extractRetry(Map(ERROR_RETRY_INTERVAL -> 1000, NBR_OF_RETRIES_KEY -> 10)) shouldBe Right( + RetryConfig.extractRetry(Map(ERROR_RETRY_INTERVAL -> 1000, ERROR_POLICY_RETRIES_KEY -> 10)) shouldBe Right( RetryConfig( 10, 1000, @@ -41,11 +41,11 @@ class RetryTests extends AnyFunSuite with Matchers { test(s"return an error if retry interval is smaller than 1s") { val message = s"Invalid [$ERROR_RETRY_INTERVAL]. Retry interval cannot be smaller than 1000 (1s)." - RetryConfig.extractRetry(Map(ERROR_RETRY_INTERVAL -> 100, NBR_OF_RETRIES_KEY -> 10)) shouldBe Left(message) + RetryConfig.extractRetry(Map(ERROR_RETRY_INTERVAL -> 100, ERROR_POLICY_RETRIES_KEY -> 10)) shouldBe Left(message) } test(s"return an error if retries is smaller than 1") { - val message = s"Invalid [$NBR_OF_RETRIES_KEY]. Number of retries needs to be greater than 0." - RetryConfig.extractRetry(Map(ERROR_RETRY_INTERVAL -> 1000, NBR_OF_RETRIES_KEY -> 0)) shouldBe Left(message) + val message = s"Invalid [$ERROR_POLICY_RETRIES_KEY]. Number of retries needs to be greater than 0." + RetryConfig.extractRetry(Map(ERROR_RETRY_INTERVAL -> 1000, ERROR_POLICY_RETRIES_KEY -> 0)) shouldBe Left(message) } } diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/conversion/SchemaLessJsonValueConverterTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/conversion/SchemaLessJsonValueConverterTest.scala index 9d380a26..40532ddf 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/conversion/SchemaLessJsonValueConverterTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/conversion/SchemaLessJsonValueConverterTest.scala @@ -119,18 +119,18 @@ class SchemaLessJsonValueConverterTest extends AnyFunSuite with Matchers { excludeStruct.get("value") shouldBe false val carsSchema = record.getSchema.getField("cars").schema() - carsSchema.getType shouldBe Schema.Type.ARRAY - carsSchema.getElementType shouldBe SchemaBuilder.builder().stringType().asNullable - record.get("cars").toString shouldBe "[Ford, BMW, Fiat]" + carsSchema shouldBe SchemaBuilder.builder().stringType().asNullable + record.get("cars") shouldBe """["Ford","BMW","Fiat"]""" val numsSchema = record.getSchema.getField("nums").schema() - numsSchema.getType shouldBe Schema.Type.ARRAY - numsSchema.getElementType shouldBe SchemaBuilder.builder().longType().asNullable - record.get("nums").toString shouldBe "[1, 3, 4]" + numsSchema shouldBe SchemaBuilder.builder().stringType().asNullable + record.get("nums") shouldBe "[1,3,4]" } + private val inference = new InferSchemaAndNormaliseValue(false); + private def convert(value: Any): Either[Throwable, GenericRecord] = - InferSchemaAndNormaliseValue(value).toRight(new RuntimeException("whatever")).flatMap(valueAndSchema => + inference(value).flatMap(valueAndSchema => DataConverter(valueAndSchema.normalisedValue), ) diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandlerTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandlerTest.scala new file mode 100644 index 00000000..ff44ae3f --- /dev/null +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/errors/InvalidInputErrorHandlerTest.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2024 Celonis SE + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.celonis.kafka.connect.ems.errors + +import org.apache.kafka.connect.sink.ErrantRecordReporter +import org.apache.kafka.connect.sink.SinkRecord +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Future +import scala.util.Try + +class InvalidInputErrorHandlerTest extends AnyFunSuite with Matchers { + test("it does not throw for InvalidInputError when flag set to true") { + val handler = new InvalidInputErrorHandler(true, None) + noException should be thrownBy handler.handle(aSinkRecord, InvalidInputException("oooouuups")) + } + + test("it throws for InvalidInputError when flag set to false") { + val handler = new InvalidInputErrorHandler(false, None) + val exception = InvalidInputException("oooouuups") + the[InvalidInputException] thrownBy handler.handle(aSinkRecord, exception) shouldBe exception + } + + test("it throws for other exceptions, regardless of flag value") { + val handler1 = new InvalidInputErrorHandler(false, None) + val handler2 = new InvalidInputErrorHandler(true, None) + val exception = new RuntimeException("oooouuups") + + the[RuntimeException] thrownBy handler1.handle(aSinkRecord, exception) shouldBe exception + the[RuntimeException] thrownBy handler2.handle(aSinkRecord, exception) shouldBe exception + } + + test("it should report only InvalidInputError when flag set to true") { + val reporter = new FakeErrorReporter + val exception = InvalidInputException("oooouuups") + + val handler = new InvalidInputErrorHandler(true, Some(reporter)) + handler.handle(aSinkRecord, exception) + Try(handler.handle(aSinkRecord, new RuntimeException("ignored"))) + reporter.getErrors shouldBe List(aSinkRecord -> exception) + } + + test("it should never report if flag set to false") { + val reporter = new FakeErrorReporter + val exception = InvalidInputException("oooouuups") + + val handler = new InvalidInputErrorHandler(false, Some(reporter)) + Try(handler.handle(aSinkRecord, exception)) + Try(handler.handle(aSinkRecord, new RuntimeException("ignored"))) + reporter.getErrors shouldBe Nil + } + + private lazy val aSinkRecord = new SinkRecord("topic", 0, null, "", null, "", 0) + + final class FakeErrorReporter() extends ErrantRecordReporter { + private var errors: List[(SinkRecord, Throwable)] = Nil + + def getErrors: List[(SinkRecord, Throwable)] = errors + + override def report(record: SinkRecord, error: Throwable): Future[Void] = { + errors = (record, error) :: errors + CompletableFuture.completedFuture(null) + } + + } +} diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala index 91e64098..60e05d58 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala @@ -23,8 +23,8 @@ import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.{ CONNECTION_POOL_MAX_IDLE_CONNECTIONS_DEFAULT_VALUE => MAX_IDLE_DEFAULT, _, } +import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType import com.celonis.kafka.connect.ems.config._ -import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Retry import com.celonis.kafka.connect.ems.model.DataObfuscation.FixObfuscation import com.celonis.kafka.connect.ems.storage.ParquetFileCleanupRename import com.celonis.kafka.connect.ems.storage.WorkingDirectory @@ -56,9 +56,8 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD "tableA", Some("id11111"), AuthorizationHeader("AppKey 123"), - Retry, + ErrorPolicyConfig(ErrorPolicyType.RETRY, RetryConfig(1, 1000), continueOnInvalidInput = false), policy, - RetryConfig(1, 1000), dir, ParquetConfig.default, List("a", "b"), @@ -80,12 +79,12 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD ENDPOINT_KEY -> sinkConfig.url.toString, TARGET_TABLE_KEY -> sinkConfig.target, AUTHORIZATION_KEY -> sinkConfig.authorization.header, - ERROR_POLICY_KEY -> sinkConfig.errorPolicy.entryName, + ERROR_POLICY_KEY -> sinkConfig.errorPolicyConfig.policyType.toString, COMMIT_SIZE_KEY -> policy.fileSize.toString, COMMIT_INTERVAL_KEY -> policy.interval.toString, COMMIT_RECORDS_KEY -> policy.records.toString, - ERROR_RETRY_INTERVAL -> sinkConfig.retries.interval.toString, - NBR_OF_RETRIES_KEY -> sinkConfig.retries.retries.toString, + ERROR_RETRY_INTERVAL -> sinkConfig.errorPolicyConfig.retryConfig.interval.toString, + ERROR_POLICY_RETRIES_KEY -> sinkConfig.errorPolicyConfig.retryConfig.retries.toString, TMP_DIRECTORY_KEY -> dir.toString, DEBUG_KEEP_TMP_FILES_KEY -> sinkConfig.parquet.cleanup.isInstanceOf[ParquetFileCleanupRename].toString, PARQUET_ROW_GROUP_SIZE_BYTES_KEY -> sinkConfig.parquet.rowGroupSize.toString, diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/storage/formats/ParquetFormatWriterTests.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/storage/formats/ParquetFormatWriterTests.scala index 0ad99cf5..edad2674 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/storage/formats/ParquetFormatWriterTests.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/storage/formats/ParquetFormatWriterTests.scala @@ -100,7 +100,8 @@ class ParquetFormatWriterTests extends AnyFunSuite with Matchers with WorkingDir ) val schemaAndValue = converter.toConnectData("topic", entry.asJson.noSpaces.getBytes) // Schemaless json go through normalisation - val normalisedValue = InferSchemaAndNormaliseValue(schemaAndValue.value()).get.normalisedValue + val inference = new InferSchemaAndNormaliseValue(false) + val normalisedValue = inference(schemaAndValue.value()).toTry.get.normalisedValue val struct = DataConverter.apply(normalisedValue).getOrElse(fail("Should convert the map")) val formatWriter = ParquetFormatWriter.from(output, struct.getSchema, ParquetConfig.default, new NoOpExploder().explode) diff --git a/connector/src/test/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValueTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValueTest.scala index f89ae1c8..f1a6e890 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValueTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/transform/InferSchemaAndNormaliseValueTest.scala @@ -16,6 +16,7 @@ package com.celonis.kafka.connect.transform +import com.celonis.kafka.connect.ems.errors.InvalidInputException import com.celonis.kafka.connect.transform.InferSchemaAndNormaliseValue.ValueAndSchema import com.celonis.kafka.connect.transform.flatten.ConnectJsonConverter import org.apache.kafka.connect.data.Schema @@ -24,17 +25,18 @@ import org.apache.kafka.connect.data.Struct import org.scalatest.matchers.should.Matchers import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuite with Matchers { - test("returns None when encountering an unexpected value") { + test("returns Left when encountering an unexpected value") { List[Any]( null, Range(1, 10), Iterator.continually(true), (), ).foreach { - value => assertResult(None)(InferSchemaAndNormaliseValue(value)) + value => assert(infer(value).isLeft) } } test("Infers the schema of simple primitives") { @@ -45,7 +47,7 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit true -> Schema.OPTIONAL_BOOLEAN_SCHEMA, ).foreach { case (value, expectedSchema) => - assertResult(Some(ValueAndSchema(value, expectedSchema)))(InferSchemaAndNormaliseValue(value)) + assertResult(Right(ValueAndSchema(value, expectedSchema)))(infer(value)) } } @@ -54,7 +56,7 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit val expectedSchema = SchemaBuilder.struct().field("hi", Schema.OPTIONAL_STRING_SCHEMA).build() val expectedValue = new Struct(expectedSchema).put("hi", "there") - assertResult(Some(ValueAndSchema(expectedValue, expectedSchema)))(InferSchemaAndNormaliseValue(value)) + assertResult(Right(ValueAndSchema(expectedValue, expectedSchema)))(infer(value)) } test("Infers simple maps of primitives") { @@ -62,14 +64,15 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit val expectedSchema = SchemaBuilder.struct().field("1", Schema.OPTIONAL_BOOLEAN_SCHEMA).build() val expectedValue = new Struct(expectedSchema).put("1", true) - assertResult(Some(ValueAndSchema(expectedValue, expectedSchema)))(InferSchemaAndNormaliseValue(value)) + assertResult(Right(ValueAndSchema(expectedValue, expectedSchema)))(infer(value)) } test("Infers simple collections") { val value = List("a", "b", "c").asJava - val expectedSchema = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build() + val expectedSchema = Schema.OPTIONAL_STRING_SCHEMA + val expectedValue = """["a","b","c"]""" - assertResult(Some(ValueAndSchema(value, expectedSchema)))(InferSchemaAndNormaliseValue(value)) + assertResult(Right(ValueAndSchema(expectedValue, expectedSchema)))(infer(value)) } test("Normalisation transforms maps nested in maps") { @@ -78,40 +81,39 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit val expectedSchema = SchemaBuilder.struct().field("nested", nestedSchema).build() val expectedValue = new Struct(expectedSchema).put("nested", new Struct(nestedSchema).put("a", "123")) - assertResult(Some(ValueAndSchema(expectedValue, expectedSchema)))(InferSchemaAndNormaliseValue(value)) + assertResult(Right(ValueAndSchema(expectedValue, expectedSchema)))(infer(value)) } test("Normalisation transforms maps nested in arrays") { - val value = List(Map("a" -> "123").asJava).asJava - val nestedSchema = SchemaBuilder.struct().field("a", Schema.OPTIONAL_STRING_SCHEMA).build() - val expectedSchema = SchemaBuilder.array(nestedSchema).build() - val expectedValue = List(new Struct(nestedSchema).put("a", "123")).asJava + val value = List(Map("a" -> "123").asJava).asJava + val expectedValue = """[{"a":"123"}]""" - assertResult(Some(ValueAndSchema(expectedValue, expectedSchema)))(InferSchemaAndNormaliseValue(value)) + assertResult(Right(ValueAndSchema(expectedValue, Schema.OPTIONAL_STRING_SCHEMA)))(infer(value)) } - test("Succeeds with empty collections") { - List( - Map.empty[Boolean, Boolean].asJava -> SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BYTES_SCHEMA).build(), - List.empty[Int].asJava -> SchemaBuilder.array(Schema.BYTES_SCHEMA).build(), - ).foreach { - case (value, expectedSchema) => - assertResult(Some(ValueAndSchema(value, expectedSchema)))(InferSchemaAndNormaliseValue(value)) - } + test("Succeeds with empty arrays") { + val value = List.empty[Int].asJava + assertResult(Right(ValueAndSchema("[]", Schema.OPTIONAL_STRING_SCHEMA)))(infer(value)) + } + + test("Succeeds with empty map") { + val value = Map.empty[Boolean, Boolean].asJava + val expectedSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BYTES_SCHEMA).build() + assertResult(Right(ValueAndSchema(value, expectedSchema)))(infer(value)) } - test("Fails with non-empty heterogeneous collections") { + test("Succeeds with non-empty heterogeneous collections") { List( List[Any](1, "blah", true).asJava, - List(List[Any](1, "blah")).asJava, + List(List[Any](1, "blah").asJava).asJava, ).foreach { value => - assertResult(None)(InferSchemaAndNormaliseValue(value)) + infer(value) } } test("Fails when a map contains an empty key") { val value = Map("" -> "x", "y" -> "x").asJava - assertResult(None)(InferSchemaAndNormaliseValue(value)) + assert(infer(value).isLeft) } test("infers nested object's schema") { @@ -121,7 +123,7 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit |""".stripMargin val om = new ObjectMapper() val value = om.readValue(rawJson, classOf[java.util.Map[String, AnyRef]]) - val schema = InferSchemaAndNormaliseValue(value).map(_.schema).getOrElse(fail("some schema expected!")) + val schema = infer(value).toTry.get.schema assertResult( SchemaBuilder.struct().field( @@ -129,7 +131,7 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit SchemaBuilder.struct() .field("I", Schema.OPTIONAL_STRING_SCHEMA) .field("object", Schema.OPTIONAL_BOOLEAN_SCHEMA) - .field("with", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) + .field("with", Schema.OPTIONAL_STRING_SCHEMA) .build(), ).build(), )(schema) @@ -142,7 +144,7 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit |""".stripMargin val value = new ObjectMapper().readValue(rawJson, classOf[java.util.Map[String, AnyRef]]) - val valueAndSchema = InferSchemaAndNormaliseValue(value).getOrElse(fail("some schema expected!")) + val valueAndSchema = infer(value).toTry.get val expectedSchema = SchemaBuilder.struct().field( "hello", @@ -158,6 +160,22 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit assertResult(expectedValue)(valueAndSchema.normalisedValue) } + test("drops arrays when discard collection is set to true") { + @nowarn + val value = Map("hi" -> "there", + "items" -> List(1, 2, 3).asJava, + "itemsNested" -> Map("hello" -> "man", "items" -> List("a", "b").asJava).asJava, + ).asJava + val expectedNestedSchema = SchemaBuilder.struct().field("hello", Schema.OPTIONAL_STRING_SCHEMA).build() + val expectedSchema = SchemaBuilder.struct().field("hi", Schema.OPTIONAL_STRING_SCHEMA).field("itemsNested", + expectedNestedSchema, + ).build() + val expectedNestedValue = new Struct(expectedNestedSchema).put("hello", "man") + val expectedValue = new Struct(expectedSchema).put("hi", "there").put("itemsNested", expectedNestedValue) + + assertResult(Right(ValueAndSchema(expectedValue, expectedSchema)))(infer(value, true)) + } + test("normalises a schemaless JSON") { val json = """ @@ -181,9 +199,9 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit |} |""".stripMargin - val schemaAndValue = ConnectJsonConverter.converter.toConnectData("topic", json.getBytes) - val Some(ValueAndSchema(normalisedValue, _)) = InferSchemaAndNormaliseValue(schemaAndValue.value()) - val struct = normalisedValue.asInstanceOf[Struct] + val schemaAndValue = ConnectJsonConverter.converter.toConnectData("topic", json.getBytes) + val Right(ValueAndSchema(normalisedValue, _)) = infer(schemaAndValue.value()) + val struct = normalisedValue.asInstanceOf[Struct] assertResult(Set( "idType", @@ -209,7 +227,10 @@ class InferSchemaAndNormaliseValueTest extends org.scalatest.funsuite.AnyFunSuit exclude.get("id") shouldBe 0 exclude.get("value") shouldBe false - struct.get("cars").asInstanceOf[java.util.List[AnyRef]].asScala.toArray shouldBe Array("Ford", "BMW", "Fiat") - struct.get("nums").asInstanceOf[java.util.List[AnyRef]].asScala.toArray shouldBe Array(1, 3, 4) + struct.get("cars") shouldBe """["Ford","BMW","Fiat"]""" + struct.get("nums") shouldBe """[1,3,4]""" } + + private def infer(value: Any, discardCollections: Boolean = false): Either[InvalidInputException, ValueAndSchema] = + new InferSchemaAndNormaliseValue(discardCollections).apply(value) } diff --git a/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala index 5e27e4fb..df514626 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala @@ -158,28 +158,26 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { genericRecord.get("another_optional_decimal") shouldBe null } - test("With Chunking disabled, heterogeneous arrays prevent flattening") { - pendingUntilFixed { - val value = Map( - "heterogeneous_array" -> List[Any]("a", 1, true).asJava, - ).asJava - val record = sinkRecord(value) - flattenTransform(record) - () - } + test("With Chunking disabled, heterogeneous arrays do not prevent flattening") { + val value = Map( + "heterogeneous_array" -> List[Any]("a", 1, true).asJava, + ).asJava + val record = sinkRecord(value) + flattenTransform(record) + () } - test("With Chunking disabled, heterogeneous arrays prevents flattening, even with discardCollection enabled") { - pendingUntilFixed { - val value = Map( - "foo" -> "bar", - "heterogeneous_array" -> List[Any]("a", 1, true).asJava, - ).asJava - - val record = sinkRecord(value) - flattenTransform(record) - () - } + test( + "With Chunking disabled, heterogeneous arrays do not prevents flattening, even with discardCollection enabled", + ) { + val value = Map( + "foo" -> "bar", + "heterogeneous_array" -> List[Any]("a", 1, true).asJava, + ).asJava + + val record = sinkRecord(value) + flattenTransform(record) + () } private def chunkTransform(record: SinkRecord, maxChunks: Int, chunkSize: Int): GenericRecord = { diff --git a/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala b/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala index a13b2e54..3982c4ec 100644 --- a/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala +++ b/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala @@ -4,6 +4,8 @@ package com.celonis.kafka.connect.ems import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants._ +import com.celonis.kafka.connect.ems.parquet.ParquetLocalInputFile +import com.celonis.kafka.connect.ems.parquet.extractParquetFromRequest import com.celonis.kafka.connect.ems.testcontainers.connect.EmsConnectorConfiguration import com.celonis.kafka.connect.ems.testcontainers.connect.EmsConnectorConfiguration.TOPICS_KEY import com.celonis.kafka.connect.ems.testcontainers.scalatest.KafkaConnectContainerPerSuite @@ -11,6 +13,8 @@ import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.w import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withConnector import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withParquetUploadLatency import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.mockserver.withMockResponse +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.parquet.hadoop.ParquetFileReader import org.mockserver.verify.VerificationTimes import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -159,4 +163,49 @@ class ErrorPolicyTests extends AnyFunSuite with KafkaConnectContainerPerSuite wi } } } + + test("continue on invalid input") { + + val sourceTopic = randomTopicName() + val emsTable = randomEmsTable() + + val emsConnector = new EmsConnectorConfiguration("ems") + .withConfig(TOPICS_KEY, sourceTopic) + .withConfig(ENDPOINT_KEY, proxyServerUrl) + .withConfig(AUTHORIZATION_KEY, "AppKey key") + .withConfig(TARGET_TABLE_KEY, emsTable) + .withConfig(COMMIT_RECORDS_KEY, 2) + .withConfig(COMMIT_SIZE_KEY, 1000000L) + .withConfig(COMMIT_INTERVAL_KEY, 3600000) + .withConfig(TMP_DIRECTORY_KEY, "/tmp/") + .withConfig(ERROR_POLICY_KEY, "THROW") + .withConfig(ERROR_CONTINUE_ON_INVALID_INPUT_KEY, true) + .withConfig("value.converter.schemas.enable", "false") + .withConfig("value.converter", "org.apache.kafka.connect.json.JsonConverter") + .withConfig("key.converter", "org.apache.kafka.connect.storage.StringConverter") + + withMockResponse(emsRequestForTable(emsTable), mockEmsResponse) { + withConnector(emsConnector) { + + // The first error should not prevent other records to be ingested, even if they are part of the same put batch + withStringStringProducer { producer => + producer.send(new ProducerRecord(sourceTopic, """{"":"missingKey"}""")) + producer.send(new ProducerRecord(sourceTopic, """{"x":"validKey"}""")) + producer.send(new ProducerRecord(sourceTopic, """{"x":"validKey"}""")) + } + + eventually(timeout(20 seconds), interval(1 seconds)) { + mockServerClient.verify(emsRequestForTable(emsTable), VerificationTimes.once()) + val status = kafkaConnectClient.getConnectorStatus(emsConnector.name) + status.tasks.head.state should be("RUNNING") + } + + val httpRequests = mockServerClient.retrieveRecordedRequests(emsRequestForTable(emsTable)) + val parquetFile = extractParquetFromRequest(httpRequests.head) + val fileReader = ParquetFileReader.open(new ParquetLocalInputFile(parquetFile)) + + fileReader.getRecordCount shouldBe 2 + } + } + } }