Skip to content

Commit 8afed61

Browse files
committed
Fixed #16 Added option to configure mapping of fields in SinkRecord to CQL columns
1 parent f8b91f9 commit 8afed61

File tree

4 files changed

+190
-31
lines changed

4 files changed

+190
-31
lines changed

build.sbt

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ libraryDependencies ++= Seq(
3030
"com.datastax.cassandra" % "cassandra-driver-core" % cassandra, //was: 2.1.9
3131
"org.scalatest" %% "scalatest" % "2.2.6" % "test,it",
3232
"org.mockito" % "mockito-core" % "2.0.34-beta" % "test,it",
33-
"ch.qos.logback" % "logback-classic" % "1.0.7" % "test,it",
34-
CrossVersion.partialVersion(scalaVersion.value) match {
35-
case Some((2, minor)) if minor < 11 =>
36-
"org.slf4j" % "slf4j-api" % "1.7.13"
37-
case _ =>
38-
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"
39-
}
40-
)
33+
"ch.qos.logback" % "logback-classic" % "1.0.7" % "test,it"
34+
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
35+
case Some((2, minor)) if minor < 11 => Seq(
36+
"org.slf4j" % "slf4j-api" % "1.7.13"
37+
)
38+
case _ => Seq(
39+
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
40+
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4"
41+
)
42+
})
4143

4244
publishMavenStyle := true
4345

src/it/resources/setup.cql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ CREATE TABLE IF NOT EXISTS test.kv (
2626
value int,
2727
PRIMARY KEY (key));
2828

29+
CREATE TABLE IF NOT EXISTS test.fieldmap (
30+
new_key text,
31+
new_value int,
32+
new_nested text,
33+
new_dnested text,
34+
PRIMARY KEY (new_key));
35+
2936
CREATE TABLE test.playlists (
3037
id bigint,
3138
song_order int,

src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818
*/
1919
package com.tuplejump.kafka.connect.cassandra
2020

21-
import scala.collection.JavaConverters._
2221
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
2322
import org.apache.kafka.connect.sink.{SinkRecord, SinkTaskContext}
2423

25-
class CassandraSinkTaskSpec extends AbstractFlatSpec {
24+
import scala.collection.JavaConverters._
25+
import scala.util.parsing.json.JSONObject
2626

27-
val topicName = "test_kv_topic"
28-
val tableName = "test.kv"
29-
val config = sinkConfig((topicName, tableName))
27+
class CassandraSinkTaskSpec extends AbstractFlatSpec {
3028

3129
it should "start sink task" in {
30+
val topicName = "test_kv_topic"
31+
val tableName = "test.kv"
32+
val config = sinkConfig((topicName, tableName))
33+
3234
val sinkTask = new CassandraSinkTask()
3335
val mockContext = mock[SinkTaskContext]
3436

@@ -41,6 +43,7 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
4143
it should "save records in cassandra" in {
4244
val topicName = "test_kv_topic"
4345
val tableName = "test.kv"
46+
val config = sinkConfig((topicName, tableName))
4447

4548
val sinkTask = new CassandraSinkTask()
4649
val mockContext = mock[SinkTaskContext]
@@ -67,4 +70,73 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
6770
rowCount should be(2)
6871
}
6972

73+
74+
it should "save records in cassandra with custom field mapping" in {
75+
val topicName = "test_fieldmap_topic"
76+
val tableName = "test.fieldmap"
77+
val config = sinkConfig((topicName, tableName))
78+
79+
val sinkTask = new CassandraSinkTask()
80+
val mockContext = mock[SinkTaskContext]
81+
82+
val fieldMapping: JSONObject = JSONObject(Map(
83+
"key" -> "new_key",
84+
"value" -> "new_value",
85+
"nvalue" -> JSONObject(Map(
86+
"blah1" -> "new_nested",
87+
"blah2" -> JSONObject(Map(
88+
"blah2" -> "new_dnested"
89+
))
90+
))
91+
))
92+
93+
sinkTask.initialize(mockContext)
94+
sinkTask.start((config + ("field.mapping" -> fieldMapping.toString())).asJava)
95+
96+
val doubleNestedSchema = SchemaBuilder.struct.name("dnestedSchema").version(1)
97+
.field("blah1", Schema.STRING_SCHEMA)
98+
.field("blah2", Schema.STRING_SCHEMA).build
99+
val nestedSchema = SchemaBuilder.struct.name("nestedSchema").version(1)
100+
.field("blah1", Schema.STRING_SCHEMA)
101+
.field("blah2", doubleNestedSchema).build
102+
val valueSchema = SchemaBuilder.struct.name("record").version(1)
103+
.field("key", Schema.STRING_SCHEMA)
104+
.field("value", Schema.INT32_SCHEMA)
105+
.field("nvalue", nestedSchema).build
106+
107+
val dnestedValue1 = new Struct(doubleNestedSchema)
108+
.put("blah1", "dnes_blah1_1")
109+
.put("blah2", "dnes_blah2_1")
110+
val nestedValue1 = new Struct(nestedSchema)
111+
.put("blah1", "nes_blah1_1")
112+
.put("blah2", dnestedValue1)
113+
val value1 = new Struct(valueSchema)
114+
.put("key", "pqr")
115+
.put("value", 15)
116+
.put("nvalue", nestedValue1)
117+
118+
val dnestedValue2 = new Struct(doubleNestedSchema)
119+
.put("blah1", "dnes_blah1_2")
120+
.put("blah2", "dnes_blah2_2")
121+
val nestedValue2 = new Struct(nestedSchema)
122+
.put("blah1", "nes_blah1_2")
123+
.put("blah2", dnestedValue2)
124+
val value2 = new Struct(valueSchema)
125+
.put("key", "abc")
126+
.put("value", 17)
127+
.put("nvalue", nestedValue2)
128+
129+
val record1 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value1, 0)
130+
val record2 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value2, 0)
131+
132+
sinkTask.put(List(record1, record2).asJavaCollection)
133+
134+
sinkTask.stop()
135+
136+
val session = CassandraCluster.local.connect
137+
val result = session.execute(s"select count(1) from $tableName").one()
138+
val rowCount = result.getLong(0)
139+
rowCount should be(2)
140+
}
141+
70142
}

src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala

Lines changed: 96 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,32 @@
1616

1717
package com.tuplejump.kafka.connect.cassandra
1818

19-
import java.util.{Collection => JCollection, Map => JMap, Date => JDate}
19+
import java.util.{Collection => JCollection, Date => JDate, Map => JMap}
2020

21-
import org.apache.kafka.connect.connector.Task
22-
23-
import scala.collection.JavaConverters._
21+
import com.tuplejump.kafka.connect.cassandra.Configuration.Query
2422
import org.apache.kafka.clients.consumer.OffsetAndMetadata
2523
import org.apache.kafka.common.TopicPartition
26-
import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}
27-
import org.apache.kafka.connect.errors.{ConnectException, DataException}
28-
import org.apache.kafka.connect.data.{Schema, Struct, Timestamp}
24+
import org.apache.kafka.connect.connector.Task
2925
import org.apache.kafka.connect.data.Schema.Type._
26+
import org.apache.kafka.connect.data.{Field, Schema, Struct, Timestamp}
27+
import org.apache.kafka.connect.errors.{ConnectException, DataException}
28+
import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}
29+
30+
import scala.collection.JavaConverters._
3031

3132
class CassandraSinkTask extends SinkTask with TaskLifecycle {
33+
3234
import CassandraSinkTask._
3335

36+
private val FieldMappingParam = "field.mapping"
37+
3438
def taskClass: Class[_ <: Task] = classOf[CassandraSinkTask]
3539

3640
override def put(records: JCollection[SinkRecord]): Unit =
3741
records.asScala.foreach { record =>
3842
configuration.find(record.topic) match {
3943
case Some(topic) =>
40-
val query = convert(record, topic)
44+
val query: Query = convert(record, topic, configuration.config.get(FieldMappingParam))
4145
session.execute(query)
4246
case other =>
4347
throw new ConnectException("Failed to get cassandra session.")
@@ -51,33 +55,107 @@ class CassandraSinkTask extends SinkTask with TaskLifecycle {
5155

5256
/** INTERNAL API. */
5357
private[kafka] object CassandraSinkTask {
58+
5459
import Configuration._
5560

61+
import scala.util.parsing.json._
62+
5663
/* TODO: Use keySchema, partition and kafkaOffset
5764
TODO: Add which types are currently supported in README */
58-
def convert(record: SinkRecord, sink: SinkConfig): Query = {
65+
def convert(record: SinkRecord, sink: SinkConfig, fieldMappingProperty: Option[String] = None): Query = {
66+
val colNamesVsValues: Map[String, String] = {
67+
fieldMappingProperty match {
68+
case Some(fieldMappingString) => convertToCqlData(record, fieldMappingString)
69+
case None => convertToCqlData(record)
70+
}
71+
}
72+
colNamesVsValues.view.map(e => Vector(e._1, e._2)).transpose match {
73+
case columnNames :: columnValues :: Nil =>
74+
s"INSERT INTO ${sink.namespace}(${columnNames.mkString(",")}) VALUES(${columnValues.mkString(",")})"
75+
}
76+
}
77+
78+
def convertToCqlData(record: SinkRecord): (Map[String, String]) = {
5979
val valueSchema = record.valueSchema
60-
val columnNames = valueSchema.fields.asScala.map(_.name).toSet
61-
val columnValues = valueSchema.`type`() match {
80+
valueSchema.`type`() match {
6281
case STRUCT =>
6382
val struct: Struct = record.value.asInstanceOf[Struct]
64-
columnNames.map(schema(valueSchema, struct, _)).mkString(",")
83+
valueSchema.fields.asScala.map { field =>
84+
(field.name, schema(valueSchema, struct, field))
85+
}.toMap
6586
case other =>
6687
throw new DataException(
6788
s"Unable to create insert statement with unsupported value schema type $other.")
6889
}
69-
s"INSERT INTO ${sink.namespace}(${columnNames.mkString(",")}) VALUES($columnValues)"
7090
}
7191

7292
/* TODO support all types. */
73-
def schema(valueSchema: Schema, result: Struct, col: String): AnyRef =
74-
valueSchema.field(col).schema match {
93+
def schema(valueSchema: Schema, result: Struct, field: Field): String = {
94+
field.schema match {
7595
case x if x.`type`() == Schema.STRING_SCHEMA.`type`() =>
76-
s"'${result.get(col).toString}'"
96+
s"'${result.get(field).toString}'"
7797
case x if x.name() == Timestamp.LOGICAL_NAME =>
78-
val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate])
98+
val time = Timestamp.fromLogical(x, result.get(field).asInstanceOf[JDate])
7999
s"$time"
80100
case y =>
81-
result.get(col)
101+
String.valueOf(result.get(field))
102+
}
103+
}
104+
105+
106+
// scalastyle:off
107+
def convertToCqlData(record: SinkRecord, fieldMappingString: String): Map[String, String] = {
108+
lazy val exception = new DataException(s"Invalid fieldMapping received - $fieldMappingString")
109+
val result = scala.collection.mutable.Map.empty[String, String]
110+
JSON.parseFull(fieldMappingString) match {
111+
case Some(data) =>
112+
data match {
113+
case map: Map[_, _] =>
114+
val valueSchema = record.valueSchema
115+
valueSchema.`type`() match {
116+
case STRUCT =>
117+
val struct: Struct = record.value.asInstanceOf[Struct]
118+
populateResultsMap(result, struct, map.asInstanceOf[Map[String, Any]])
119+
case other =>
120+
throw new DataException(
121+
s"Unable to create insert statement with unsupported value schema type $other.")
122+
}
123+
case other =>
124+
throw exception
125+
}
126+
case None =>
127+
throw exception
82128
}
129+
result.toMap
130+
}
131+
132+
private def populateResultsMap(results: scala.collection.mutable.Map[String, String], struct: Struct, fieldMapping: Map[String, Any]): Unit = {
133+
lazy val exception = new DataException(s"Mismatch between fieldMapping and Schema")
134+
struct.schema.fields.asScala.foreach { field =>
135+
val fieldMappingValue = fieldMapping.get(field.name)
136+
field.schema.`type`() match {
137+
case STRUCT =>
138+
fieldMappingValue match {
139+
case Some(value) =>
140+
value match {
141+
case newMap: Map[_, _] => populateResultsMap(results, struct.get(field).asInstanceOf[Struct], newMap.asInstanceOf[Map[String, Any]])
142+
case _ => throw exception
143+
}
144+
case None =>
145+
}
146+
case _ =>
147+
fieldMappingValue match {
148+
case Some(value) =>
149+
value match {
150+
case strValue: String => results.put(strValue, schema(field.schema, struct, field))
151+
case _ => throw exception
152+
}
153+
case None =>
154+
}
155+
}
156+
}
157+
}
158+
159+
// scalastyle:on
160+
83161
}

0 commit comments

Comments
 (0)