Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bitsail][connector]Doris batch replace model use recordStream buffer #305

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public List<DorisCommittable> commit(List<DorisCommittable> committables) throws
}

private void replacePartOrTab(List<DorisCommittable> committables) throws IOException {
if (committables.isEmpty()) {
return;
}
LOG.info("Try to commit temporary partition or table, num of committing events: {}", committables.size());
try {
dorisConnectionHolder = DorisSchemaManagerGenerator.getDorisConnection(new DorisConnectionHolder(), dorisOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.bytedance.bitsail.connector.doris.config.DorisOptions;
import com.bytedance.bitsail.connector.doris.constant.DorisConstants;
import com.bytedance.bitsail.connector.doris.partition.DorisPartition;

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.util.Preconditions;
Expand All @@ -30,6 +31,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

/**
* Builder for HttpPut.
Expand Down Expand Up @@ -85,6 +87,14 @@ public HttpPutBuilder setEntity(HttpEntity httpEntity) {
return this;
}

public HttpPutBuilder setTemporaryPartition(boolean isTemp, DorisOptions dorisOptions) {
if (isTemp && dorisOptions.isTableHasPartitions()) {
String tempPartitions = dorisOptions.getPartitions().stream().map(DorisPartition::getTempName).collect(Collectors.joining(","));
header.put("temporary_partitions", tempPartitions);
}
return this;
}

public HttpPutBuilder setEmptyEntity() {
try {
this.httpEntity = new StringEntity("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public interface DorisWriterOptions extends WriterOptions.BaseWriterOptions {
key(WRITER_PREFIX + "sink_flush_interval_ms")
.defaultValue(5000);

ConfigOption<Integer> SINK_CHECK_INTERVAL =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be long type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SINK_CHECK_INTERVAL represents the time interval, not too large

key(WRITER_PREFIX + "sink_check_interval")
.defaultValue(10000);

ConfigOption<Integer> SINK_MAX_RETRIES =
key(WRITER_PREFIX + "sink_max_retries")
.defaultValue(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@Data
public class DorisPartition implements Serializable {

private static final String TEMP_PARTITION_PREFIX = "_bitsail_doris_temp_partition_";
private static final String TEMP_PARTITION_PREFIX = "bitsail_doris_temp_partition_";

@JsonProperty(value = "name", required = true)
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,13 @@ public void replacePartition(List<DorisPartition> partitionList) throws SQLExcep
String tempPartitions = partitionList.stream().map(DorisPartition::getTempName).collect(Collectors.joining(","));
String formalPartitions = partitionList.stream().map(DorisPartition::getName).collect(Collectors.joining(","));
String sqlReplacePartition = String.format(SQL_TEMPLATE_REPLACE_PARTITIONS, table, formalPartitions, tempPartitions);
log.info("replace partition sql: {}", sqlReplacePartition);
statement.executeUpdate(sqlReplacePartition);
log.info("Succeed replace partition: replace partition ({}) with temporary partition ({})", formalPartitions, tempPartitions);
String sqlQueryTemporaryPartition = String.format(SQL_TEMPLATE_SHOW_TEMP_PARTITIONS, database, table, tempPartitions);
log.info("Query temporary partition sql: {}", sqlQueryTemporaryPartition);
if (statement.executeUpdate(sqlQueryTemporaryPartition) != 0) {
log.info("replace partition sql: {}", sqlReplacePartition);
statement.executeUpdate(sqlReplacePartition);
log.info("Succeed replace partition: replace partition ({}) with temporary partition ({})", formalPartitions, tempPartitions);
}
}

public void replaceTable(String tempTable) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ private void initDorisExecutionOptions(BitSailConfiguration writerConfiguration)
LOG.info("Start to init DorisExecutionOptions!");
final DorisExecutionOptions.DorisExecutionOptionsBuilder builder = DorisExecutionOptions.builder();
builder.flushIntervalMs(writerConfiguration.get(DorisWriterOptions.SINK_FLUSH_INTERVAL_MS))
.checkInterval(writerConfiguration.get(DorisWriterOptions.SINK_CHECK_INTERVAL))
.maxRetries(writerConfiguration.get(DorisWriterOptions.SINK_MAX_RETRIES))
.bufferCount(writerConfiguration.get(DorisWriterOptions.SINK_BUFFER_COUNT))
.bufferSize(writerConfiguration.get(DorisWriterOptions.SINK_BUFFER_SIZE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,68 @@
import com.bytedance.bitsail.connector.doris.committer.DorisCommittable;
import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions;
import com.bytedance.bitsail.connector.doris.config.DorisOptions;
import com.bytedance.bitsail.connector.doris.error.DorisErrorCode;
import com.bytedance.bitsail.connector.doris.http.model.RespContent;
import com.bytedance.bitsail.connector.doris.sink.DorisWriterState;
import com.bytedance.bitsail.connector.doris.sink.label.LabelGenerator;
import com.bytedance.bitsail.connector.doris.sink.record.RecordStream;
import com.bytedance.bitsail.connector.doris.sink.streamload.DorisStreamLoad;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static com.bytedance.bitsail.connector.doris.sink.streamload.LoadStatus.PUBLISH_TIMEOUT;
import static com.bytedance.bitsail.connector.doris.sink.streamload.LoadStatus.SUCCESS;

public class DorisReplaceProxy extends AbstractDorisWriteModeProxy {
private static final Logger LOG = LoggerFactory.getLogger(DorisReplaceProxy.class);
protected List dorisBatchBuffers;
protected long dorisBatchBuffersSize;
private RecordStream recordStream;
private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private LabelGenerator labelGenerator;
private DorisWriterState dorisWriterState;
private AtomicInteger cacheRecordSize;
private AtomicInteger cacheRecordCount;
private volatile boolean loading = false;
private final BlockingQueue<byte[]> cache = new LinkedBlockingQueue<>();
private volatile Exception loadException = null;
private int flushRecordCacheSize;
private int flushRecordCacheCount;
private byte[] lineDelimiter;
private int intervalTime;
private ScheduledExecutorService scheduler;
private final int initialDelay = 1000;

public DorisReplaceProxy(DorisExecutionOptions dorisExecutionOptions, DorisOptions dorisOptions) {
this.dorisExecutionOptions = dorisExecutionOptions;
this.dorisBatchBuffers = new ArrayList(dorisExecutionOptions.getBufferCount());
this.dorisOptions = dorisOptions;
this.recordStream = new RecordStream(dorisExecutionOptions.getBufferSize(), dorisExecutionOptions.getBufferCount());
this.dorisStreamLoad = new DorisStreamLoad(dorisExecutionOptions, dorisOptions,
new LabelGenerator(dorisExecutionOptions.getLabelPrefix(), dorisExecutionOptions.isEnable2PC()), recordStream);
this.dorisBatchBuffersSize = 0;
this.labelGenerator = new LabelGenerator(dorisExecutionOptions.getLabelPrefix(), dorisExecutionOptions.isEnable2PC());
this.dorisStreamLoad = new DorisStreamLoad(dorisExecutionOptions, dorisOptions, labelGenerator,
new RecordStream(dorisExecutionOptions.getBufferSize(), dorisExecutionOptions.getBufferCount()));
this.dorisWriterState = new DorisWriterState(dorisExecutionOptions.getLabelPrefix());
this.lineDelimiter = dorisOptions.getLineDelimiter().getBytes();
this.intervalTime = dorisExecutionOptions.getCheckInterval();
this.cacheRecordSize = new AtomicInteger();
this.cacheRecordCount = new AtomicInteger();
this.scheduler = Executors.newScheduledThreadPool(1,
new BasicThreadFactory.Builder().namingPattern("Doris-replace-writer").daemon(true).build());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this thread is daemon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that when the user thread exits, the JVM not necessary to manage the checkDone program after setting it as a daemon thread

scheduler.scheduleWithFixedDelay(this::checkDone, initialDelay, intervalTime, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
Expand All @@ -56,59 +90,106 @@ public DorisReplaceProxy() {

@Override
public void write(String record) throws IOException {
addBatchBuffers(record);
checkLoadException();
byte[] bytes = record.getBytes(StandardCharsets.UTF_8);
ArrayList<byte[]> tmpCache = null;
if (cacheRecordCount.get() >= dorisExecutionOptions.getRecordCount() || cacheRecordSize.get() >= dorisExecutionOptions.getRecordSize()) {
tmpCache = new ArrayList<>(cache);
flushRecordCacheSize = cacheRecordSize.get();
flushRecordCacheCount = cacheRecordCount.get();
cache.clear();
cacheRecordCount.set(0);
cacheRecordSize.set(0);
}
cacheRecordSize.getAndAdd(bytes.length);
cacheRecordCount.getAndIncrement();
cache.add(bytes);

if (Objects.nonNull(tmpCache)) {
flush(tmpCache);
}
}

private void addBatchBuffers(String record) throws IOException {
this.dorisBatchBuffers.add(record);
this.dorisBatchBuffersSize += record.getBytes().length;
if (dorisBatchBuffers.size() >= dorisExecutionOptions.getRecordSize()
|| this.dorisBatchBuffersSize >= dorisExecutionOptions.getRecordCount()) {
flush(false);
private void flush(ArrayList<byte[]> flushCache) {
if (!loading) {
LOG.info("start load by cache full, recordCount {}, recordSize {}", flushRecordCacheCount, flushRecordCacheSize);
try {
startLoad(flushCache);
} catch (Exception e) {
LOG.error("start stream load failed.", e);
loadException = e;
}
}
}

@SuppressWarnings("checkstyle:MagicNumber")
@Override
public void flush(boolean endOfInput) throws IOException {
if (dorisBatchBuffers.isEmpty()) {
return;
}
String result;
if (DorisOptions.LOAD_CONTENT_TYPE.JSON.equals(dorisOptions.getLoadDataFormat())) {
result = dorisBatchBuffers.toString();
} else {
result = String.join(dorisOptions.getLineDelimiter(), dorisBatchBuffers);
}
for (int i = 0; i <= dorisExecutionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(result, dorisOptions, true);
dorisBatchBuffers.clear();
this.dorisBatchBuffersSize = 0;
break;
} catch (BitSailException e) {
LOG.error("doris sink error, retry times = {}", i, e);
if (i >= dorisExecutionOptions.getMaxRetries()) {
throw new IOException(e.getMessage());
}
try {
LOG.warn("StreamLoad error", e);
Thread.sleep(1000L * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("unable to flush; interrupted while doing another attempt", e);
}

private synchronized void startLoad(List<byte[]> flushCache) throws IOException {
this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(), true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Label maybe duplicate when multi task start load at same time, so label prefix should contains subtaskid info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Label is generated according to timestamp. Due to the synchronized keyword in this method, only a single thread can be entered at a time

if (!flushCache.isEmpty()) {
// add line delimiter
ByteBuffer buf = ByteBuffer.allocate(flushRecordCacheSize + (flushCache.size() - 1) * lineDelimiter.length);
for (int i = 0; i < flushCache.size(); i++) {
if (i > 0) {
buf.put(lineDelimiter);
}
buf.put(flushCache.get(i));
}
dorisStreamLoad.writeRecord(buf.array());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there need invoke twice writeRecord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be what I forgot to delete when I deleted the redundant code at the end, I will improve here

}
this.loading = true;
}

@Override
public List<DorisCommittable> prepareCommit() throws IOException {
if (loading) {
LOG.info("stop load by prepareCommit.");
stopLoad();
return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisOptions.getDatabaseName(), 0));
}
return Collections.emptyList();
}

private synchronized void stopLoad() throws IOException {
this.loading = false;
this.flushRecordCacheSize = 0;
RespContent respContent = dorisStreamLoad.stopLoad();
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
LOG.warn(errMsg);
throw new BitSailException(DorisErrorCode.LOAD_FAILED, errMsg);
}
}

@Override
public List<DorisWriterState> snapshotState(long checkpointId) {
return null;
return Collections.singletonList(dorisWriterState);
}

private synchronized void checkDone() {
LOG.info("start timer checker, interval {} ms", intervalTime);
try {
if (!loading) {
LOG.info("not loading, skip timer checker");
return;
}
if (dorisStreamLoad.getPendingLoadFuture() != null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have question about this line,does the pending load future change between the interval? i think the right pipeline should be

  1. Future pendingLoadFuture = dorisStreamLoad.getPendingLoadFuture();
  2. check pendingLoadFuture same with dorisStreamLoad.getPendingLoadFuture()
  3. check pendingLoadFuture is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a problem. As long as dorisStreamLoad.getPendingLoadFuture() is not done, it needs to be stopped.

&& !dorisStreamLoad.getPendingLoadFuture().isDone()) {
LOG.info("stop load by timer checker");
stopLoad();
}
} catch (Exception e) {
LOG.error("stream load failed, thread exited:", e);
loadException = e;
}
}

private void checkLoadException() {
if (loadException != null) {
LOG.error("loading error.", loadException);
throw new RuntimeException("error while loading data.", loadException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void init() {
if (executionOptions.isEnable2PC()) {
dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId);
}
dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -106,7 +106,7 @@ public List<DorisWriterState> snapshotState(long checkpointId) {
//Dynamically refresh be Node
dorisStreamLoad.setHostPort(RestService.getAvailableHost());
try {
dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1), false);
} catch (IOException e) {
LOG.warn("Failed to start load. checkpointId={}", checkpointId, e);
throw new RuntimeException(e);
Expand Down
Loading