Skip to content

Commit

Permalink
Support multi-table sink feature for influxdb (#6278)
Browse files Browse the repository at this point in the history
  • Loading branch information
531651225 authored Feb 27, 2024
1 parent 9c3c2f1 commit 56f13e9
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class SinkConfig extends InfluxDBConfig {
public SinkConfig(Config config) {
super(config);
loadConfig(config);
}

public static final Option<String> KEY_TIME =
Expand Down Expand Up @@ -103,35 +104,32 @@ public SinkConfig(Config config) {
private int maxRetryBackoffMs;
private TimePrecision precision = DEFAULT_TIME_PRECISION;

public static SinkConfig loadConfig(Config config) {
SinkConfig sinkConfig = new SinkConfig(config);
public void loadConfig(Config config) {

if (config.hasPath(KEY_TIME.key())) {
sinkConfig.setKeyTime(config.getString(KEY_TIME.key()));
setKeyTime(config.getString(KEY_TIME.key()));
}
if (config.hasPath(KEY_TAGS.key())) {
sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
setKeyTags(config.getStringList(KEY_TAGS.key()));
}
if (config.hasPath(MAX_RETRIES.key())) {
sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
setMaxRetries(config.getInt(MAX_RETRIES.key()));
}
if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
sinkConfig.setRetryBackoffMultiplierMs(
config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
}
if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
}
if (config.hasPath(WRITE_TIMEOUT.key())) {
sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
}
if (config.hasPath(RETENTION_POLICY.key())) {
sinkConfig.setRp(config.getString(RETENTION_POLICY.key()));
setRp(config.getString(RETENTION_POLICY.key()));
}
if (config.hasPath(EPOCH.key())) {
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
}
sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key()));
return sinkConfig;
setMeasurement(config.getString(KEY_MEASUREMENT.key()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,36 @@

package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;

import java.io.IOException;

import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {

@AutoService(SeaTunnelSink.class)
public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
private SinkConfig sinkConfig;

@Override
public String getPluginName() {
return "InfluxDB";
}

@Override
public void prepare(Config config) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(config, URL.key(), KEY_MEASUREMENT.key());
if (!result.isSuccess()) {
throw new InfluxdbConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
this.pluginConfig = config;
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@

package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
Expand All @@ -36,6 +45,7 @@
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;

@AutoService(Factory.class)
@Slf4j
public class InfluxDBSinkFactory implements TableSinkFactory {

@Override
Expand All @@ -46,15 +56,29 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(URL, DATABASES, KEY_MEASUREMENT)
.required(URL, DATABASES)
.bundled(USERNAME, PASSWORD)
.optional(
CONNECT_TIMEOUT_MS,
KEY_MEASUREMENT,
KEY_TAGS,
KEY_TIME,
BATCH_SIZE,
MAX_RETRIES,
RETRY_BACKOFF_MULTIPLIER_MS)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
if (!config.getOptional(KEY_MEASUREMENT).isPresent()) {
Map<String, String> map = config.toMap();
map.put(KEY_MEASUREMENT.key(), catalogTable.getTableId().toTablePath().getFullName());
config = ReadonlyConfig.fromMap(new HashMap<>(map));
}
SinkConfig sinkConfig = new SinkConfig(config.toConfig());
return () -> new InfluxDBSink(sinkConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
Expand All @@ -44,17 +44,19 @@
import java.util.Optional;

@Slf4j
public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter {

private final Serializer serializer;
private InfluxDB influxdb;
private final SinkConfig sinkConfig;
private final List<Point> batchList;
private volatile Exception flushException;

public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType)
public InfluxDBSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType)
throws ConnectException {
this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
this.sinkConfig = sinkConfig;
log.info("sinkConfig is {}", JsonUtils.toJsonString(sinkConfig));
this.serializer =
new DefaultSerializer(
seaTunnelRowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
Expand All @@ -51,10 +53,12 @@
import java.net.ConnectException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
Expand Down Expand Up @@ -242,6 +246,64 @@ public void testInfluxdbWithTz(TestContainer container)
}
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK/FLINK do not support multiple table read")
public void testInfluxdbMultipleWrite(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake_to_infuxdb_with_multipletable.conf");

Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertAll(
() -> {
Assertions.assertIterableEquals(
Stream.<List<Object>>of(
Arrays.asList(
1627529632356l,
"label_1",
"sink_1",
4.3,
200,
2.5,
2,
5,
true))
.collect(Collectors.toList()),
readData("infulxdb_sink_1"));
},
() -> {
Assertions.assertIterableEquals(
Stream.<List<Object>>of(
Arrays.asList(
1627529632357l,
"label_2",
"sink_2",
4.3,
200,
2.5,
2,
5,
true))
.collect(Collectors.toList()),
readData("infulxdb_sink_2"));
});
}

public List<List<Object>> readData(String tableName) {
String sinkSql =
String.format(
"select time, label, c_string, c_double, c_bigint, c_float,c_int, c_smallint, c_boolean from %s order by time",
tableName);
QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE));

List<List<Object>> sinkValues =
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
return sinkValues;
}

private void initializeInfluxDBClient() throws ConnectException {
InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "infulxdb_sink_1"
fields {
label = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
rows = [
{
kind = INSERT
fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356]
}
]
},
{
schema = {
table = "infulxdb_sink_2"
fields {
label = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
rows = [
{
kind = INSERT
fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357]
}
]
}
]
}
}

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
key_time = "time"
batch_size = 1
}
}

0 comments on commit 56f13e9

Please sign in to comment.