Skip to content

Commit d7fa9af

Browse files
authored
[Feature][Connector] Add druid sink connector (apache#6346)
1 parent 0c3044e commit d7fa9af

File tree

18 files changed

+1078
-0
lines changed

18 files changed

+1078
-0
lines changed

config/plugin_config

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ connector-openmldb
6767
connector-pulsar
6868
connector-rabbitmq
6969
connector-redis
70+
connector-druid
7071
connector-s3-redshift
7172
connector-sentry
7273
connector-slack

docs/en/connector-v2/sink/Druid.md

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Druid
2+
3+
> Druid sink connector
4+
5+
## Description
6+
7+
Write data to Druid
8+
9+
## Key features
10+
11+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
12+
13+
## Data Type Mapping
14+
15+
| SeaTunnel Data Type | Druid Data Type |
16+
|---------------------|-----------------|
17+
| TINYINT | LONG |
18+
| SMALLINT | LONG |
19+
| INT | LONG |
20+
| BIGINT | LONG |
21+
| FLOAT | FLOAT |
22+
| DOUBLE | DOUBLE |
23+
| DECIMAL | DOUBLE |
24+
| STRING | STRING |
25+
| BOOLEAN | STRING |
26+
| TIMESTAMP | STRING |
27+
28+
## Options
29+
30+
| name | type | required | default value |
31+
|----------------|--------|----------|---------------|
32+
| coordinatorUrl | string | yes | - |
33+
| datasource | string | yes | - |
34+
| batchSize | int | no | 10000 |
35+
| common-options | | no | - |
36+
37+
### coordinatorUrl [string]
38+
39+
The coordinatorUrl host and port of Druid, example: "myHost:8888"
40+
41+
### datasource [string]
42+
43+
The datasource name you want to write, example: "seatunnel"
44+
45+
### batchSize [int]
46+
47+
The number of rows flushed to Druid per batch. Default value is `1024`.
48+
49+
### common options
50+
51+
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
52+
53+
## Example
54+
55+
```hocon
56+
Druid {
57+
coordinatorUrl = "testHost:8888"
58+
datasource = "seatunnel"
59+
}
60+
```
61+
62+
## Changelog
63+
64+
### next version
65+
66+
- Add Druid sink connector
67+

plugin-mapping.properties

+1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ seatunnel.source.AmazonSqs = connector-amazonsqs
119119
seatunnel.sink.AmazonSqs = connector-amazonsqs
120120
seatunnel.source.Paimon = connector-paimon
121121
seatunnel.sink.Paimon = connector-paimon
122+
seatunnel.sink.Druid = connector-druid
122123
seatunnel.source.Easysearch = connector-easysearch
123124
seatunnel.sink.Easysearch = connector-easysearch
124125
seatunnel.source.Postgres-CDC = connector-cdc-postgres
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.seatunnel</groupId>
23+
<artifactId>seatunnel-connectors-v2</artifactId>
24+
<version>${revision}</version>
25+
</parent>
26+
27+
<artifactId>connector-druid</artifactId>
28+
<name>SeaTunnel : Connectors V2 : Druid</name>
29+
30+
<properties>
31+
<druid.version>24.0.1</druid.version>
32+
<httpclient.version>4.5.13</httpclient.version>
33+
</properties>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.seatunnel</groupId>
38+
<artifactId>connector-common</artifactId>
39+
<version>${project.version}</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.druid</groupId>
43+
<artifactId>druid-processing</artifactId>
44+
<version>${druid.version}</version>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.apache.druid</groupId>
48+
<artifactId>druid-indexing-service</artifactId>
49+
<version>${druid.version}</version>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.httpcomponents</groupId>
53+
<artifactId>httpclient</artifactId>
54+
<version>${httpclient.version}</version>
55+
</dependency>
56+
</dependencies>
57+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.connectors.druid.config;
20+
21+
import org.apache.seatunnel.api.configuration.Option;
22+
import org.apache.seatunnel.api.configuration.Options;
23+
24+
public class DruidConfig {
25+
public static final Integer BATCH_SIZE_DEFAULT = 10000;
26+
27+
public static Option<String> COORDINATOR_URL =
28+
Options.key("coordinatorUrl")
29+
.stringType()
30+
.noDefaultValue()
31+
.withDescription("The coordinatorUrl host and port of Druid.");
32+
33+
public static Option<String> DATASOURCE =
34+
Options.key("datasource")
35+
.stringType()
36+
.noDefaultValue()
37+
.withDescription("The datasource name need to write.");
38+
39+
public static Option<Integer> BATCH_SIZE =
40+
Options.key("batchSize")
41+
.intType()
42+
.defaultValue(BATCH_SIZE_DEFAULT)
43+
.withDescription("The batch size of the druid write.");
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.connectors.druid.exception;
20+
21+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
22+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
23+
24+
public class DruidConnectorException extends SeaTunnelRuntimeException {
25+
26+
public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
27+
super(seaTunnelErrorCode, errorMessage);
28+
}
29+
30+
public DruidConnectorException(
31+
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
32+
super(seaTunnelErrorCode, errorMessage, cause);
33+
}
34+
35+
public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
36+
super(seaTunnelErrorCode, cause);
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.connectors.druid.sink;
20+
21+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
22+
import org.apache.seatunnel.api.sink.SinkWriter;
23+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
26+
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
27+
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
28+
29+
import java.io.IOException;
30+
31+
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
32+
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
33+
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
34+
35+
public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
36+
37+
private ReadonlyConfig config;
38+
private CatalogTable catalogTable;
39+
private SeaTunnelRowType seaTunnelRowType;
40+
41+
@Override
42+
public String getPluginName() {
43+
return "Druid";
44+
}
45+
46+
public DruidSink(ReadonlyConfig config, CatalogTable table) {
47+
this.config = config;
48+
this.catalogTable = table;
49+
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
50+
}
51+
52+
@Override
53+
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
54+
throws IOException {
55+
return new DruidWriter(
56+
seaTunnelRowType,
57+
config.get(COORDINATOR_URL),
58+
config.get(DATASOURCE),
59+
config.get(BATCH_SIZE));
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.connectors.druid.sink;
20+
21+
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
23+
import org.apache.seatunnel.api.table.connector.TableSink;
24+
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
26+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
27+
28+
import com.google.auto.service.AutoService;
29+
30+
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
31+
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
32+
33+
@AutoService(Factory.class)
34+
public class DruidSinkFactory implements TableSinkFactory {
35+
@Override
36+
public String factoryIdentifier() {
37+
return "Druid";
38+
}
39+
40+
@Override
41+
public OptionRule optionRule() {
42+
return OptionRule.builder().required(COORDINATOR_URL, DATASOURCE).build();
43+
}
44+
45+
@Override
46+
public TableSink createSink(TableSinkFactoryContext context) {
47+
CatalogTable catalogTable = context.getCatalogTable();
48+
return () -> new DruidSink(context.getOptions(), catalogTable);
49+
}
50+
}

0 commit comments

Comments
 (0)