From 639ed019f9993302d91bbc2bb3dc75053edf6fa4 Mon Sep 17 00:00:00 2001 From: Bibo <33744252+531651225@users.noreply.github.com> Date: Tue, 27 Feb 2024 11:37:59 +0800 Subject: [PATCH] Support multi-table sink feature for influxdb (#6278) --- .../seatunnel/influxdb/config/SinkConfig.java | 24 +++--- .../seatunnel/influxdb/sink/InfluxDBSink.java | 45 +++------- .../influxdb/sink/InfluxDBSinkFactory.java | 26 +++++- .../influxdb/sink/InfluxDBSinkWriter.java | 12 +-- .../e2e/connector/influxdb/InfluxdbIT.java | 62 ++++++++++++++ .../fake_to_infuxdb_with_multipletable.conf | 85 +++++++++++++++++++ 6 files changed, 200 insertions(+), 54 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java index 806309bffeb..071e3c235fc 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java @@ -34,6 +34,7 @@ public class SinkConfig extends InfluxDBConfig { public SinkConfig(Config config) { super(config); + loadConfig(config); } public static final Option KEY_TIME = @@ -103,35 +104,32 @@ public SinkConfig(Config config) { private int maxRetryBackoffMs; private TimePrecision precision = DEFAULT_TIME_PRECISION; - public static SinkConfig loadConfig(Config config) { - SinkConfig sinkConfig = new SinkConfig(config); + public void loadConfig(Config config) { if (config.hasPath(KEY_TIME.key())) { - sinkConfig.setKeyTime(config.getString(KEY_TIME.key())); + setKeyTime(config.getString(KEY_TIME.key())); } if (config.hasPath(KEY_TAGS.key())) { - sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key())); + setKeyTags(config.getStringList(KEY_TAGS.key())); } if (config.hasPath(MAX_RETRIES.key())) { - sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key())); + setMaxRetries(config.getInt(MAX_RETRIES.key())); } if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) { - sinkConfig.setRetryBackoffMultiplierMs( - config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key())); + setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key())); } if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) { - sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key())); + setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key())); } if (config.hasPath(WRITE_TIMEOUT.key())) { - sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key())); + setWriteTimeout(config.getInt(WRITE_TIMEOUT.key())); } if (config.hasPath(RETENTION_POLICY.key())) { - sinkConfig.setRp(config.getString(RETENTION_POLICY.key())); + setRp(config.getString(RETENTION_POLICY.key())); } if (config.hasPath(EPOCH.key())) { - sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key()))); + setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key()))); } - sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key())); - return sinkConfig; + setMeasurement(config.getString(KEY_MEASUREMENT.key())); } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java index 9cc03272d1e..da7ba20f91d 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -17,61 +17,36 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException; - -import com.google.auto.service.AutoService; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import java.io.IOException; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT; +public class InfluxDBSink extends AbstractSimpleSink + implements SupportMultiTableSink { -@AutoService(SeaTunnelSink.class) -public class InfluxDBSink extends AbstractSimpleSink { - - private Config pluginConfig; private SeaTunnelRowType seaTunnelRowType; + private SinkConfig sinkConfig; @Override public String getPluginName() { return "InfluxDB"; } - @Override - public void prepare(Config config) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists(config, URL.key(), KEY_MEASUREMENT.key()); - if (!result.isSuccess()) { - throw new InfluxdbConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - this.pluginConfig = config; - } - - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) { + this.sinkConfig = sinkConfig; + this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); } @Override public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { - return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType); + return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType); } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java index 3d44158e78b..81a294e95bc 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java @@ -17,11 +17,20 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS; import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES; @@ -36,6 +45,7 @@ import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS; @AutoService(Factory.class) +@Slf4j public class InfluxDBSinkFactory implements TableSinkFactory { @Override @@ -46,10 +56,11 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(URL, DATABASES, KEY_MEASUREMENT) + .required(URL, DATABASES) .bundled(USERNAME, PASSWORD) .optional( CONNECT_TIMEOUT_MS, + KEY_MEASUREMENT, KEY_TAGS, KEY_TIME, BATCH_SIZE, @@ -57,4 +68,17 @@ public OptionRule optionRule() { RETRY_BACKOFF_MULTIPLIER_MS) .build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + ReadonlyConfig config = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + if (!config.getOptional(KEY_MEASUREMENT).isPresent()) { + Map map = config.toMap(); + map.put(KEY_MEASUREMENT.key(), catalogTable.getTableId().toTablePath().getFullName()); + config = ReadonlyConfig.fromMap(new HashMap<>(map)); + } + SinkConfig sinkConfig = new SinkConfig(config.toConfig()); + return () -> new InfluxDBSink(sinkConfig, catalogTable); + } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index f2d401db515..b0d23c7e799 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; @@ -44,7 +44,8 @@ import java.util.Optional; @Slf4j -public class InfluxDBSinkWriter extends AbstractSinkWriter { +public class InfluxDBSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { private final Serializer serializer; private InfluxDB influxdb; @@ -52,9 +53,10 @@ public class InfluxDBSinkWriter extends AbstractSinkWriter { private final List batchList; private volatile Exception flushException; - public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) + public InfluxDBSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) throws ConnectException { - this.sinkConfig = SinkConfig.loadConfig(pluginConfig); + this.sinkConfig = sinkConfig; + log.info("sinkConfig is {}", JsonUtils.toJsonString(sinkConfig)); this.serializer = new DefaultSerializer( seaTunnelRowType, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index ddc7afadacb..c139afc5e87 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -25,7 +25,9 @@ import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; @@ -51,10 +53,12 @@ import java.net.ConnectException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j @@ -242,6 +246,64 @@ public void testInfluxdbWithTz(TestContainer container) } } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table read") + public void testInfluxdbMultipleWrite(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake_to_infuxdb_with_multipletable.conf"); + + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertAll( + () -> { + Assertions.assertIterableEquals( + Stream.>of( + Arrays.asList( + 1627529632356l, + "label_1", + "sink_1", + 4.3, + 200, + 2.5, + 2, + 5, + true)) + .collect(Collectors.toList()), + readData("infulxdb_sink_1")); + }, + () -> { + Assertions.assertIterableEquals( + Stream.>of( + Arrays.asList( + 1627529632357l, + "label_2", + "sink_2", + 4.3, + 200, + 2.5, + 2, + 5, + true)) + .collect(Collectors.toList()), + readData("infulxdb_sink_2")); + }); + } + + public List> readData(String tableName) { + String sinkSql = + String.format( + "select time, label, c_string, c_double, c_bigint, c_float,c_int, c_smallint, c_boolean from %s order by time", + tableName); + QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + + List> sinkValues = + sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); + return sinkValues; + } + private void initializeInfluxDBClient() throws ConnectException { InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl); influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf new file mode 100644 index 00000000000..eda13ff7040 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "infulxdb_sink_1" + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "infulxdb_sink_2" + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357] + } + ] + } + ] + } +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + key_time = "time" + batch_size = 1 + } +} \ No newline at end of file