Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.metric.overview;

import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
Expand Down Expand Up @@ -233,15 +234,21 @@ public void increaseInsertNodeEventCount(final String pipeName, final long creat

public void decreaseInsertNodeEventCount(
final String pipeName, final long creationTime, final long transferTime) {
final String pipeID = pipeName + "_" + creationTime;
PipeDataNodeRemainingEventAndTimeOperator operator =
remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));
pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));

operator.decreaseInsertNodeEventCount();

if (transferTime > 0) {
operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
if (transferTime / 1_000_000
>= PipeConfig.getInstance().getPipeConnectorInsertNodeDegradeTransferThresholdMs()) {
PipeDataNodeAgent.task().reportHighTransferTime(pipeName, creationTime);
} else {
PipeDataNodeAgent.task().reportStableTransferTime(pipeName, creationTime);
}
}
}

Expand Down Expand Up @@ -283,15 +290,21 @@ public void increaseTsFileEventCount(final String pipeName, final long creationT

public void decreaseTsFileEventCount(
final String pipeName, final long creationTime, final long transferTime) {
final String pipeID = pipeName + "_" + creationTime;
final PipeDataNodeRemainingEventAndTimeOperator operator =
remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));
pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));

operator.decreaseTsFileEventCount();

if (transferTime > 0) {
operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
if (transferTime / 1_000_000
>= PipeConfig.getInstance().getPipeConnectorTsFileDegradeTransferThresholdMs()) {
PipeDataNodeAgent.task().reportHighTransferTime(pipeName, creationTime);
} else {
PipeDataNodeAgent.task().reportStableTransferTime(pipeName, creationTime);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,13 @@ private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) {
final long floatingMemoryUsageInByte =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
final long totalFloatingMemorySizeInBytes =
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes();
// Use dynamic memory threshold instead of fixed value
final long effectiveTotalFloatingMemorySizeInBytes =
(long)
(PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes()
* PipeDataNodeAgent.task().getMemoryAdjustFactor(pipeName));
final boolean mayInsertNodeMemoryReachDangerousThreshold =
floatingMemoryUsageInByte * pipeCount >= totalFloatingMemorySizeInBytes;
floatingMemoryUsageInByte * pipeCount >= effectiveTotalFloatingMemorySizeInBytes;
if (mayInsertNodeMemoryReachDangerousThreshold && event.mayExtractorUseTablets(this)) {
final PipeDataNodeRemainingEventAndTimeOperator operator =
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
Expand All @@ -218,7 +221,7 @@ private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) {
dataRegionId,
event.getTsFileEpoch().getFilePath(),
floatingMemoryUsageInByte,
totalFloatingMemorySizeInBytes / pipeCount,
effectiveTotalFloatingMemorySizeInBytes / pipeCount,
Optional.ofNullable(operator)
.map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount)
.orElse(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -90,6 +92,9 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionSource.class);

private static final ConcurrentMap<String, Set<PipeRealtimeDataRegionSource>> PIPE_SOURCES =
new ConcurrentHashMap<>();

protected String pipeName;
protected long creationTime;
protected String dataRegionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;

private long pipeConnectorInsertNodeDegradeTransferThresholdMs = 5 * 60 * 1000;
private long pipeConnectorTsFileDegradeTransferThresholdMs = 10 * 60 * 1000;

private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
Expand Down Expand Up @@ -1122,6 +1125,40 @@ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
return pipeConnectorRPCThriftCompressionEnabled;
}

public void setPipeConnectorInsertNodeDegradeTransferThresholdMs(
long pipeConnectorInsertNodeDegradeTransferThresholdMs) {
if (this.pipeConnectorInsertNodeDegradeTransferThresholdMs
== pipeConnectorInsertNodeDegradeTransferThresholdMs) {
return;
}
this.pipeConnectorInsertNodeDegradeTransferThresholdMs =
pipeConnectorInsertNodeDegradeTransferThresholdMs;
logger.info(
"pipeConnectorInsertNodeDegradeTransferThresholdMs is set to {}.",
pipeConnectorInsertNodeDegradeTransferThresholdMs);
}

public long getPipeConnectorInsertNodeDegradeTransferThresholdMs() {
return pipeConnectorInsertNodeDegradeTransferThresholdMs;
}

public void setPipeConnectorTsFileDegradeTransferThresholdMs(
long pipeConnectorTsFileDegradeTransferThresholdMs) {
if (this.pipeConnectorTsFileDegradeTransferThresholdMs
== pipeConnectorTsFileDegradeTransferThresholdMs) {
return;
}
this.pipeConnectorTsFileDegradeTransferThresholdMs =
pipeConnectorTsFileDegradeTransferThresholdMs;
logger.info(
"pipeConnectorTsFileDegradeTransferThresholdMs is set to {}.",
pipeConnectorTsFileDegradeTransferThresholdMs);
}

public long getPipeConnectorTsFileDegradeTransferThresholdMs() {
return pipeConnectorTsFileDegradeTransferThresholdMs;
}

public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,27 @@ public void decreaseFloatingMemoryUsageInByte(
}
}

public void reportHighTransferTime(final String pipeName, final long creationTime) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) {
((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).reportHighTransferTime();
}
}

public void reportStableTransferTime(final String pipeName, final long creationTime) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) {
((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).reportStableTransferTime();
}
}

public double getMemoryAdjustFactor(final String pipeName) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
return pipeMeta == null
? 1.0
: ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getMemoryAdjustFactor();
}

public int getPipeCount() {
return pipeMetaKeeper.getPipeMetaCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class PipeTemporaryMetaInAgent implements PipeTemporaryMeta {

Expand All @@ -35,6 +36,9 @@ public class PipeTemporaryMetaInAgent implements PipeTemporaryMeta {
private final String pipeNameWithCreationTime;
private final Map<Integer, CommitterKey> regionId2CommitterKeyMap = new ConcurrentHashMap<>();

private final AtomicReference<MemoryFactorState> state =
new AtomicReference<>(new MemoryFactorState(1.0, 0L, 0));

PipeTemporaryMetaInAgent(final String pipeName, final long creationTime) {
this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
}
Expand Down Expand Up @@ -72,6 +76,77 @@ public CommitterKey getCommitterKey(
return newKey;
}

public void reportHighTransferTime() {
final MemoryFactorState current = state.get();
final long currentTime = System.currentTimeMillis();
if (currentTime - current.lastTime < 5000 || current.factor <= 0.5) {
return;
}

state.updateAndGet(
old -> {
long updateTime = System.currentTimeMillis();
if (updateTime - old.lastTime < 5000 || old.factor <= 0.5) {
return old;
}
return new MemoryFactorState(Math.max(0.5, old.factor - 0.1), updateTime, 0);
});
}

public void reportStableTransferTime() {
final MemoryFactorState current = state.get();
final long currentTime = System.currentTimeMillis();
if (current.factor >= 1.0) {
return;
}

if (currentTime - current.lastTime < 5000 && current.stableCount + 1 < 5) {
state.updateAndGet(
old -> {
if (old.factor >= 1.0) {
return old;
}
return new MemoryFactorState(old.factor, old.lastTime, old.stableCount + 1);
});
return;
}

state.updateAndGet(
old -> {
long updateTime = System.currentTimeMillis();
if (old.factor >= 1.0) {
return old;
}

int newCount = old.stableCount + 1;
if (newCount < 5) {
return new MemoryFactorState(old.factor, old.lastTime, newCount);
}

if (updateTime - old.lastTime < 5000) {
return new MemoryFactorState(old.factor, old.lastTime, newCount);
}

return new MemoryFactorState(Math.min(1.0, old.factor + 0.1), updateTime, 0);
});
}

public double getMemoryAdjustFactor() {
return state.get().factor;
}

private static class MemoryFactorState {
final double factor;
final long lastTime;
final int stableCount;

MemoryFactorState(double factor, long lastTime, int stableCount) {
this.factor = factor;
this.lastTime = lastTime;
this.stableCount = stableCount;
}
}

/////////////////////////////// Object ///////////////////////////////

// We assume that the "pipeNameWithCreationTime" does not contain extra information
Expand All @@ -85,23 +160,37 @@ public boolean equals(final Object o) {
return false;
}
final PipeTemporaryMetaInAgent that = (PipeTemporaryMetaInAgent) o;
final MemoryFactorState thisState = this.state.get();
final MemoryFactorState thatState = that.state.get();
return Objects.equals(
this.floatingMemoryUsageInByte.get(), that.floatingMemoryUsageInByte.get())
&& Objects.equals(this.regionId2CommitterKeyMap, that.regionId2CommitterKeyMap);
&& Objects.equals(this.regionId2CommitterKeyMap, that.regionId2CommitterKeyMap)
&& Objects.equals(thisState.factor, thatState.factor)
&& Objects.equals(thisState.stableCount, thatState.stableCount)
&& Objects.equals(thisState.lastTime, thatState.lastTime);
}

@Override
public int hashCode() {
return Objects.hash(floatingMemoryUsageInByte, regionId2CommitterKeyMap);
final MemoryFactorState currentState = state.get();
return Objects.hash(
floatingMemoryUsageInByte.get(),
regionId2CommitterKeyMap,
currentState.factor,
currentState.stableCount,
currentState.lastTime);
}

@Override
public String toString() {
final MemoryFactorState currentState = state.get();
return "PipeTemporaryMeta{"
+ "floatingMemoryUsage="
+ floatingMemoryUsageInByte
+ ", regionId2CommitterKeyMap="
+ regionId2CommitterKeyMap
+ ", memoryAdjustFactor="
+ currentState.factor
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ public boolean isPipeConnectorRPCThriftCompressionEnabled() {
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}

public long getPipeConnectorInsertNodeDegradeTransferThresholdMs() {
return COMMON_CONFIG.getPipeConnectorInsertNodeDegradeTransferThresholdMs();
}

public long getPipeConnectorTsFileDegradeTransferThresholdMs() {
return COMMON_CONFIG.getPipeConnectorTsFileDegradeTransferThresholdMs();
}

public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,24 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
config.setPipeConnectorInsertNodeDegradeTransferThresholdMs(
Long.parseLong(
Optional.ofNullable(
properties.getProperty("pipe_sink_insert_node_degrade_transfer_threshold_ms"))
.orElse(
properties.getProperty(
"pipe_connector_insert_node_degrade_transfer_threshold_ms",
String.valueOf(
config.getPipeConnectorInsertNodeDegradeTransferThresholdMs())))));
config.setPipeConnectorTsFileDegradeTransferThresholdMs(
Long.parseLong(
Optional.ofNullable(
properties.getProperty("pipe_sink_tsfile_degrade_transfer_threshold_ms"))
.orElse(
properties.getProperty(
"pipe_connector_tsfile_degrade_transfer_threshold_ms",
String.valueOf(
config.getPipeConnectorTsFileDegradeTransferThresholdMs())))));
config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
Long.parseLong(
Optional.ofNullable(
Expand Down
Loading