diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/committer/DorisCommitter.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/committer/DorisCommitter.java index 28a5704f2..023c71d30 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/committer/DorisCommitter.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/committer/DorisCommitter.java @@ -90,6 +90,9 @@ public List commit(List committables) throws } private void replacePartOrTab(List committables) throws IOException { + if (committables.isEmpty()) { + return; + } LOG.info("Try to commit temporary partition or table, num of committing events: {}", committables.size()); try { dorisConnectionHolder = DorisSchemaManagerGenerator.getDorisConnection(new DorisConnectionHolder(), dorisOptions); diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/http/HttpPutBuilder.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/http/HttpPutBuilder.java index e772cd1c7..1517774f9 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/http/HttpPutBuilder.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/http/HttpPutBuilder.java @@ -18,6 +18,7 @@ import com.bytedance.bitsail.connector.doris.config.DorisOptions; import com.bytedance.bitsail.connector.doris.constant.DorisConstants; +import com.bytedance.bitsail.connector.doris.partition.DorisPartition; import org.apache.commons.codec.binary.Base64; import org.apache.flink.util.Preconditions; @@ -30,6 +31,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; /** * Builder for HttpPut. @@ -85,6 +87,14 @@ public HttpPutBuilder setEntity(HttpEntity httpEntity) { return this; } + public HttpPutBuilder setTemporaryPartition(boolean isTemp, DorisOptions dorisOptions) { + if (isTemp && dorisOptions.isTableHasPartitions()) { + String tempPartitions = dorisOptions.getPartitions().stream().map(DorisPartition::getTempName).collect(Collectors.joining(",")); + header.put("temporary_partitions", tempPartitions); + } + return this; + } + public HttpPutBuilder setEmptyEntity() { try { this.httpEntity = new StringEntity(""); diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java index 867274c57..55a224e05 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisWriterOptions.java @@ -73,6 +73,10 @@ public interface DorisWriterOptions extends WriterOptions.BaseWriterOptions { key(WRITER_PREFIX + "sink_flush_interval_ms") .defaultValue(5000); + ConfigOption SINK_CHECK_INTERVAL = + key(WRITER_PREFIX + "sink_check_interval") + .defaultValue(10000); + ConfigOption SINK_MAX_RETRIES = key(WRITER_PREFIX + "sink_max_retries") .defaultValue(3); diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartition.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartition.java index 7f7e4fd3b..a3e0f6558 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartition.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartition.java @@ -30,7 +30,7 @@ @Data public class DorisPartition implements Serializable { - private static final String TEMP_PARTITION_PREFIX = "_bitsail_doris_temp_partition_"; + private static final String TEMP_PARTITION_PREFIX = "bitsail_doris_temp_partition_"; @JsonProperty(value = "name", required = true) private String name; diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionHelper.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionHelper.java index b4e1d071d..3ee30e092 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionHelper.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/partition/DorisPartitionHelper.java @@ -167,9 +167,13 @@ public void replacePartition(List partitionList) throws SQLExcep String tempPartitions = partitionList.stream().map(DorisPartition::getTempName).collect(Collectors.joining(",")); String formalPartitions = partitionList.stream().map(DorisPartition::getName).collect(Collectors.joining(",")); String sqlReplacePartition = String.format(SQL_TEMPLATE_REPLACE_PARTITIONS, table, formalPartitions, tempPartitions); - log.info("replace partition sql: {}", sqlReplacePartition); - statement.executeUpdate(sqlReplacePartition); - log.info("Succeed replace partition: replace partition ({}) with temporary partition ({})", formalPartitions, tempPartitions); + String sqlQueryTemporaryPartition = String.format(SQL_TEMPLATE_SHOW_TEMP_PARTITIONS, database, table, tempPartitions); + log.info("Query temporary partition sql: {}", sqlQueryTemporaryPartition); + if (statement.executeUpdate(sqlQueryTemporaryPartition) != 0) { + log.info("replace partition sql: {}", sqlReplacePartition); + statement.executeUpdate(sqlReplacePartition); + log.info("Succeed replace partition: replace partition ({}) with temporary partition ({})", formalPartitions, tempPartitions); + } } public void replaceTable(String tempTable) throws SQLException { diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java index ac41d3031..69f33a7f3 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/DorisSink.java @@ -151,6 +151,7 @@ private void initDorisExecutionOptions(BitSailConfiguration writerConfiguration) LOG.info("Start to init DorisExecutionOptions!"); final DorisExecutionOptions.DorisExecutionOptionsBuilder builder = DorisExecutionOptions.builder(); builder.flushIntervalMs(writerConfiguration.get(DorisWriterOptions.SINK_FLUSH_INTERVAL_MS)) + .checkInterval(writerConfiguration.get(DorisWriterOptions.SINK_CHECK_INTERVAL)) .maxRetries(writerConfiguration.get(DorisWriterOptions.SINK_MAX_RETRIES)) .bufferCount(writerConfiguration.get(DorisWriterOptions.SINK_BUFFER_COUNT)) .bufferSize(writerConfiguration.get(DorisWriterOptions.SINK_BUFFER_SIZE)) diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisReplaceProxy.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisReplaceProxy.java index 980137e51..32f451136 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisReplaceProxy.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisReplaceProxy.java @@ -20,34 +20,68 @@ import com.bytedance.bitsail.connector.doris.committer.DorisCommittable; import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; import com.bytedance.bitsail.connector.doris.config.DorisOptions; +import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; +import com.bytedance.bitsail.connector.doris.http.model.RespContent; import com.bytedance.bitsail.connector.doris.sink.DorisWriterState; import com.bytedance.bitsail.connector.doris.sink.label.LabelGenerator; import com.bytedance.bitsail.connector.doris.sink.record.RecordStream; import com.bytedance.bitsail.connector.doris.sink.streamload.DorisStreamLoad; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.bytedance.bitsail.connector.doris.sink.streamload.LoadStatus.PUBLISH_TIMEOUT; +import static com.bytedance.bitsail.connector.doris.sink.streamload.LoadStatus.SUCCESS; public class DorisReplaceProxy extends AbstractDorisWriteModeProxy { private static final Logger LOG = LoggerFactory.getLogger(DorisReplaceProxy.class); - protected List dorisBatchBuffers; - protected long dorisBatchBuffersSize; - private RecordStream recordStream; + private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private LabelGenerator labelGenerator; + private DorisWriterState dorisWriterState; + private AtomicInteger cacheRecordSize; + private AtomicInteger cacheRecordCount; + private volatile boolean loading = false; + private final BlockingQueue cache = new LinkedBlockingQueue<>(); + private volatile Exception loadException = null; + private int flushRecordCacheSize; + private int flushRecordCacheCount; + private byte[] lineDelimiter; + private int intervalTime; + private ScheduledExecutorService scheduler; + private final int initialDelay = 1000; public DorisReplaceProxy(DorisExecutionOptions dorisExecutionOptions, DorisOptions dorisOptions) { this.dorisExecutionOptions = dorisExecutionOptions; - this.dorisBatchBuffers = new ArrayList(dorisExecutionOptions.getBufferCount()); this.dorisOptions = dorisOptions; - this.recordStream = new RecordStream(dorisExecutionOptions.getBufferSize(), dorisExecutionOptions.getBufferCount()); - this.dorisStreamLoad = new DorisStreamLoad(dorisExecutionOptions, dorisOptions, - new LabelGenerator(dorisExecutionOptions.getLabelPrefix(), dorisExecutionOptions.isEnable2PC()), recordStream); - this.dorisBatchBuffersSize = 0; + this.labelGenerator = new LabelGenerator(dorisExecutionOptions.getLabelPrefix(), dorisExecutionOptions.isEnable2PC()); + this.dorisStreamLoad = new DorisStreamLoad(dorisExecutionOptions, dorisOptions, labelGenerator, + new RecordStream(dorisExecutionOptions.getBufferSize(), dorisExecutionOptions.getBufferCount())); + this.dorisWriterState = new DorisWriterState(dorisExecutionOptions.getLabelPrefix()); + this.lineDelimiter = dorisOptions.getLineDelimiter().getBytes(); + this.intervalTime = dorisExecutionOptions.getCheckInterval(); + this.cacheRecordSize = new AtomicInteger(); + this.cacheRecordCount = new AtomicInteger(); + this.scheduler = Executors.newScheduledThreadPool(1, + new BasicThreadFactory.Builder().namingPattern("Doris-replace-writer").daemon(true).build()); + scheduler.scheduleWithFixedDelay(this::checkDone, initialDelay, intervalTime, TimeUnit.MILLISECONDS); } @VisibleForTesting @@ -56,59 +90,106 @@ public DorisReplaceProxy() { @Override public void write(String record) throws IOException { - addBatchBuffers(record); + checkLoadException(); + byte[] bytes = record.getBytes(StandardCharsets.UTF_8); + ArrayList tmpCache = null; + if (cacheRecordCount.get() >= dorisExecutionOptions.getRecordCount() || cacheRecordSize.get() >= dorisExecutionOptions.getRecordSize()) { + tmpCache = new ArrayList<>(cache); + flushRecordCacheSize = cacheRecordSize.get(); + flushRecordCacheCount = cacheRecordCount.get(); + cache.clear(); + cacheRecordCount.set(0); + cacheRecordSize.set(0); + } + cacheRecordSize.getAndAdd(bytes.length); + cacheRecordCount.getAndIncrement(); + cache.add(bytes); + + if (Objects.nonNull(tmpCache)) { + flush(tmpCache); + } } - private void addBatchBuffers(String record) throws IOException { - this.dorisBatchBuffers.add(record); - this.dorisBatchBuffersSize += record.getBytes().length; - if (dorisBatchBuffers.size() >= dorisExecutionOptions.getRecordSize() - || this.dorisBatchBuffersSize >= dorisExecutionOptions.getRecordCount()) { - flush(false); + private void flush(ArrayList flushCache) { + if (!loading) { + LOG.info("start load by cache full, recordCount {}, recordSize {}", flushRecordCacheCount, flushRecordCacheSize); + try { + startLoad(flushCache); + } catch (Exception e) { + LOG.error("start stream load failed.", e); + loadException = e; + } } } - @SuppressWarnings("checkstyle:MagicNumber") @Override public void flush(boolean endOfInput) throws IOException { - if (dorisBatchBuffers.isEmpty()) { - return; - } - String result; - if (DorisOptions.LOAD_CONTENT_TYPE.JSON.equals(dorisOptions.getLoadDataFormat())) { - result = dorisBatchBuffers.toString(); - } else { - result = String.join(dorisOptions.getLineDelimiter(), dorisBatchBuffers); - } - for (int i = 0; i <= dorisExecutionOptions.getMaxRetries(); i++) { - try { - dorisStreamLoad.load(result, dorisOptions, true); - dorisBatchBuffers.clear(); - this.dorisBatchBuffersSize = 0; - break; - } catch (BitSailException e) { - LOG.error("doris sink error, retry times = {}", i, e); - if (i >= dorisExecutionOptions.getMaxRetries()) { - throw new IOException(e.getMessage()); - } - try { - LOG.warn("StreamLoad error", e); - Thread.sleep(1000L * i); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IOException("unable to flush; interrupted while doing another attempt", e); + } + + private synchronized void startLoad(List flushCache) throws IOException { + this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(), true); + if (!flushCache.isEmpty()) { + // add line delimiter + ByteBuffer buf = ByteBuffer.allocate(flushRecordCacheSize + (flushCache.size() - 1) * lineDelimiter.length); + for (int i = 0; i < flushCache.size(); i++) { + if (i > 0) { + buf.put(lineDelimiter); } + buf.put(flushCache.get(i)); } + dorisStreamLoad.writeRecord(buf.array()); } + this.loading = true; } @Override public List prepareCommit() throws IOException { + if (loading) { + LOG.info("stop load by prepareCommit."); + stopLoad(); + return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisOptions.getDatabaseName(), 0)); + } return Collections.emptyList(); } + private synchronized void stopLoad() throws IOException { + this.loading = false; + this.flushRecordCacheSize = 0; + RespContent respContent = dorisStreamLoad.stopLoad(); + if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { + String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()); + LOG.warn(errMsg); + throw new BitSailException(DorisErrorCode.LOAD_FAILED, errMsg); + } + } + @Override public List snapshotState(long checkpointId) { - return null; + return Collections.singletonList(dorisWriterState); + } + + private synchronized void checkDone() { + LOG.info("start timer checker, interval {} ms", intervalTime); + try { + if (!loading) { + LOG.info("not loading, skip timer checker"); + return; + } + if (dorisStreamLoad.getPendingLoadFuture() != null + && !dorisStreamLoad.getPendingLoadFuture().isDone()) { + LOG.info("stop load by timer checker"); + stopLoad(); + } + } catch (Exception e) { + LOG.error("stream load failed, thread exited:", e); + loadException = e; + } + } + + private void checkLoadException() { + if (loadException != null) { + LOG.error("loading error.", loadException); + throw new RuntimeException("error while loading data.", loadException); + } } } diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisUpsertProxy.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisUpsertProxy.java index cd686484c..2825296cb 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisUpsertProxy.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/proxy/DorisUpsertProxy.java @@ -69,7 +69,7 @@ public void init() { if (executionOptions.isEnable2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId); } - dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1)); + dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1), false); } catch (Exception e) { throw new RuntimeException(e); } @@ -106,7 +106,7 @@ public List snapshotState(long checkpointId) { //Dynamically refresh be Node dorisStreamLoad.setHostPort(RestService.getAvailableHost()); try { - dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1)); + dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1), false); } catch (IOException e) { LOG.warn("Failed to start load. checkpointId={}", checkpointId, e); throw new RuntimeException(e); diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/streamload/DorisStreamLoad.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/streamload/DorisStreamLoad.java index 40efe8571..34e97f4b3 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/streamload/DorisStreamLoad.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/sink/streamload/DorisStreamLoad.java @@ -17,7 +17,6 @@ package com.bytedance.bitsail.connector.doris.sink.streamload; import com.bytedance.bitsail.common.BitSailException; -import com.bytedance.bitsail.common.model.ColumnInfo; import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; import com.bytedance.bitsail.connector.doris.config.DorisOptions; import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; @@ -25,24 +24,17 @@ import com.bytedance.bitsail.connector.doris.http.HttpUtil; import com.bytedance.bitsail.connector.doris.http.ResponseUtil; import com.bytedance.bitsail.connector.doris.http.model.RespContent; -import com.bytedance.bitsail.connector.doris.partition.DorisPartition; import com.bytedance.bitsail.connector.doris.rest.RestService; import com.bytedance.bitsail.connector.doris.sink.label.LabelGenerator; import com.bytedance.bitsail.connector.doris.sink.record.RecordStream; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.util.Preconditions; -import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.InputStreamEntity; -import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; @@ -50,20 +42,14 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; -import java.util.stream.Collectors; import static com.bytedance.bitsail.connector.doris.http.ResponseUtil.LABEL_EXIST_PATTERN; import static com.bytedance.bitsail.connector.doris.sink.streamload.LoadStatus.LABEL_ALREADY_EXIST; @@ -76,7 +62,6 @@ public class DorisStreamLoad { private static final String STREAM_LOAD_URL_FORMAT = "http://%s/api/%s/%s/_stream_load"; private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"; - private static final int SUCCESS_STATUS_CODE = 200; private LabelGenerator labelGenerator; private final String jobExistFinished = "FINISHED"; private String userName; @@ -90,7 +75,6 @@ public class DorisStreamLoad { private String loadUrlStr; protected DorisExecutionOptions executionOptions; protected DorisOptions dorisOptions; - protected String authEncoding; protected transient CloseableHttpClient httpClient; private Future pendingLoadFuture; private ExecutorService executorService; @@ -102,7 +86,6 @@ public DorisStreamLoad(DorisExecutionOptions executionOptions, DorisOptions dori this.password = dorisOptions.getPassword(); this.dorisOptions = dorisOptions; this.labelGenerator = labelGenerator; - this.authEncoding = basicAuthHeader(dorisOptions.getUsername(), dorisOptions.getPassword()); this.lineDelimiter = dorisOptions.getLineDelimiter().getBytes(StandardCharsets.UTF_8); this.httpClient = new HttpUtil().getHttpClient(); this.recordStream = recordStream; @@ -211,18 +194,19 @@ public void writeRecord(byte[] record) throws IOException { * @param label * @throws IOException */ - public void startLoad(String label) throws IOException { + public void startLoad(String label, boolean isTemp) throws IOException { loadBatchFirstRecord = true; HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(); LOG.info("stream load started for {}", label); InputStreamEntity entity = new InputStreamEntity(recordStream); - putBuilder.setUrl(loadUrlStr) + putBuilder.setUrl(getStreamLoadUrl(isTemp)) .baseAuth(userName, password) .addCommonHeader() .setFormat(dorisOptions) .setLabel(label) .addHiddenColumns(executionOptions.getEnableDelete()) + .setTemporaryPartition(isTemp, dorisOptions) .setEntity(entity) .addProperties(executionOptions.getStreamLoadProp()); if (enable2PC) { @@ -261,83 +245,6 @@ public RespContent stopLoad() throws IOException { } } - public void load(String value, DorisOptions options, boolean isTemp) throws BitSailException { - LoadResponse loadResponse = loadBatch(value, options, isTemp); - LOG.info("StreamLoad Response:{}", loadResponse); - if (loadResponse.status != SUCCESS_STATUS_CODE) { - throw new BitSailException(DorisErrorCode.LOAD_FAILED, "stream load error: " + loadResponse.respContent); - } else { - try { - RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class); - if (!SUCCESS.equals(respContent.getStatus())) { - String errMsg = String.format("stream load error: %s, see more in %s, load value string: %s", - respContent.getMessage(), respContent.getErrorURL(), value); - throw new BitSailException(DorisErrorCode.LOAD_FAILED, errMsg); - } - } catch (IOException e) { - throw new BitSailException(DorisErrorCode.LOAD_FAILED, e.getMessage()); - } - } - } - - private LoadResponse loadBatch(String value, DorisOptions options, boolean isTemp) { - String label = executionOptions.getStreamLoadProp().getProperty("label"); - if (StringUtils.isBlank(label)) { - SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss"); - String formatDate = sdf.format(new Date()); - label = String.format("bitsail_doris_connector_%s_%s", formatDate, - UUID.randomUUID().toString().replaceAll("-", "")); - } - - try { - HttpPut put = new HttpPut(getStreamLoadUrl(isTemp)); - if (options.getLoadDataFormat().equals(DorisOptions.LOAD_CONTENT_TYPE.JSON)) { - put.setHeader("format", "json"); - put.setHeader("strip_outer_array", "true"); - } else if (options.getLoadDataFormat().equals(DorisOptions.LOAD_CONTENT_TYPE.CSV)) { - put.setHeader("format", "csv"); - put.setHeader("column_separator", options.getFieldDelimiter()); - } - - if (isTemp && dorisOptions.isTableHasPartitions()) { - String tempPartitions = dorisOptions.getPartitions().stream().map(DorisPartition::getTempName).collect(Collectors.joining(",")); - put.setHeader("temporary_partitions", tempPartitions); - } - if (executionOptions.isEnable2PC()) { - put.setHeader("two_phase_commit", "true"); - } - - put.setHeader(HttpHeaders.EXPECT, "100-continue"); - //set column meta info - List columnNames = new ArrayList<>(); - for (ColumnInfo columnInfo : options.getColumnInfos()) { - columnNames.add(columnInfo.getName()); - } - put.setHeader("columns", String.join(",", columnNames)); - put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding); - put.setHeader("label", label); - for (Map.Entry entry : executionOptions.getStreamLoadProp().entrySet()) { - put.setHeader(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); - } - StringEntity entity = new StringEntity(value, "UTF-8"); - put.setEntity(entity); - - try (CloseableHttpResponse response = httpClient.execute(put)) { - final int statusCode = response.getStatusLine().getStatusCode(); - final String reasonPhrase = response.getStatusLine().getReasonPhrase(); - String loadResult = ""; - if (response.getEntity() != null) { - loadResult = EntityUtils.toString(response.getEntity()); - } - return new LoadResponse(statusCode, reasonPhrase, loadResult); - } - } catch (Exception e) { - String err = "failed to stream load data with label: " + label; - LOG.warn(err, e); - return new LoadResponse(-1, e.getMessage(), err); - } - } - private String getStreamLoadUrl(boolean isTemp) { String tableName = dorisOptions.getTableName(); if (isTemp && !dorisOptions.isTableHasPartitions()) { @@ -346,44 +253,20 @@ private String getStreamLoadUrl(boolean isTemp) { return String.format(STREAM_LOAD_URL_FORMAT, hostPort, dorisOptions.getDatabaseName(), tableName); } - protected String basicAuthHeader(String username, String password) { - final String tobeEncode = username + ":" + password; - byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); - return "Basic " + new String(encoded); - } - public String getHostPort() { return hostPort; } public void setHostPort(String hostPort) { this.hostPort = hostPort; - this.loadUrlStr = String.format(STREAM_LOAD_URL_FORMAT, hostPort, dorisOptions.getDatabaseName(), dorisOptions.getTableName()); } - @VisibleForTesting - public DorisStreamLoad() { + public Future getPendingLoadFuture() { + return pendingLoadFuture; } - public static class LoadResponse { - public int status; - public String respMsg; - public String respContent; - - public LoadResponse(int status, String respMsg, String respContent) { - this.status = status; - this.respMsg = respMsg; - this.respContent = respContent; - } - - @Override - public String toString() { - try { - return OBJECT_MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - return ""; - } - } + @VisibleForTesting + public DorisStreamLoad() { } } diff --git a/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/sink/DorisBatchReplaceSinkCase.java b/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/sink/DorisBatchReplaceSinkCase.java new file mode 100644 index 000000000..b038b63b7 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/sink/DorisBatchReplaceSinkCase.java @@ -0,0 +1,58 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.sink; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.option.CommonOptions; +import com.bytedance.bitsail.connector.doris.option.DorisWriterOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public class DorisBatchReplaceSinkCase { + + @Test + public void test() throws Exception { + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("fake_to_doris.json"); + addDorisInfo(jobConf); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + /** + * Add your doris setting to job configuration. + * Below codes are just example. + *

+ * Note: the trigger condition for BATCH_REPLACE is satisfies the sink_record_size or sink_record_count + */ + @SuppressWarnings("checkstyle:MagicNumber") + public void addDorisInfo(BitSailConfiguration jobConf) { + jobConf.set(DorisWriterOptions.FE_HOSTS, "127.0.0.1:8030"); + jobConf.set(DorisWriterOptions.MYSQL_HOSTS, "127.0.0.1:9030"); + jobConf.set(DorisWriterOptions.USER, "root"); + jobConf.set(DorisWriterOptions.PASSWORD, ""); + jobConf.set(DorisWriterOptions.DB_NAME, "test"); + jobConf.set(DorisWriterOptions.TABLE_NAME, "test_bitsail"); + jobConf.set(DorisWriterOptions.SINK_WRITE_MODE, "BATCH_REPLACE"); + jobConf.set(CommonOptions.CheckPointOptions.CHECKPOINT_ENABLE, true); + jobConf.set(CommonOptions.CheckPointOptions.CHECKPOINT_INTERVAL, 100000L); + jobConf.set(DorisWriterOptions.SINK_ENABLE_2PC, false); + jobConf.set(DorisWriterOptions.SINK_LABEL_PREFIX, "bitsail-doris"); + } +}