Skip to content

Commit df3cdec

Browse files
committed
fix: IT of Influxdb
1 parent 79247d1 commit df3cdec

File tree

3 files changed

+14
-8
lines changed

3 files changed

+14
-8
lines changed

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/task/memory/row/RowStreamToBatchStreamWrapper.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import cn.edu.tsinghua.iginx.engine.shared.data.read.*;
2727
import java.util.*;
2828
import org.apache.arrow.memory.BufferAllocator;
29+
import org.apache.arrow.util.Preconditions;
2930

3031
public class RowStreamToBatchStreamWrapper implements BatchStream {
3132

@@ -42,6 +43,7 @@ public RowStreamToBatchStreamWrapper(
4243

4344
public RowStreamToBatchStreamWrapper(
4445
BufferAllocator allocator, RowStream rowStream, TaskMetrics metrics, int batchRowCount) {
46+
Preconditions.checkArgument(batchRowCount > 0);
4547
this.allocator = Objects.requireNonNull(allocator);
4648
this.rowStream = Objects.requireNonNull(rowStream);
4749
this.metrics = Objects.requireNonNull(metrics);
@@ -96,7 +98,7 @@ public Batch getNext() throws PhysicalException {
9698
}
9799

98100
boolean hasKey = schema.hasKey();
99-
try (StopWatch watch = new StopWatch(metrics::accumulateCpuTime)) {
101+
try (StopWatch ignored = new StopWatch(metrics::accumulateCpuTime)) {
100102
try (BatchBuilder builder = new BatchBuilder(allocator, schema, rows.size())) {
101103
for (Row row : rows) {
102104
if (hasKey) {

core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/data/read/FetchMetricsRowStream.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.ArrayDeque;
2626
import java.util.Objects;
2727
import java.util.Queue;
28+
import org.apache.arrow.util.Preconditions;
2829

2930
public class FetchMetricsRowStream implements RowStream {
3031

@@ -34,6 +35,7 @@ public class FetchMetricsRowStream implements RowStream {
3435
private final Queue<Row> cache = new ArrayDeque<>();
3536

3637
public FetchMetricsRowStream(RowStream delegate, TaskMetrics metrics, int batchRowCount) {
38+
Preconditions.checkArgument(batchRowCount > 0);
3739
this.delegate = Objects.requireNonNull(delegate);
3840
this.metrics = Objects.requireNonNull(metrics);
3941
this.batchRowCount = batchRowCount;
@@ -51,19 +53,20 @@ public Header getHeader() throws PhysicalException {
5153

5254
@Override
5355
public boolean hasNext() throws PhysicalException {
54-
return !cache.isEmpty() || delegate.hasNext();
56+
// TODO: InfluxDBQueryRowStream 存在 BUG,不能重复调用 hasNext 方法,否则会导致数据跳过
57+
if (cache.isEmpty()) {
58+
fetchBatch();
59+
}
60+
return !cache.isEmpty();
5561
}
5662

5763
@Override
5864
public Row next() throws PhysicalException {
59-
if (cache.isEmpty() && delegate.hasNext()) {
60-
fetchBatch();
61-
}
6265
return cache.remove();
6366
}
6467

6568
private void fetchBatch() throws PhysicalException {
66-
try (StopWatch watch = new StopWatch(metrics::accumulateCpuTime)) {
69+
try (StopWatch ignored = new StopWatch(metrics::accumulateCpuTime)) {
6770
for (int i = 0; i < batchRowCount && delegate.hasNext(); i++) {
6871
cache.add(delegate.next());
6972
}

optimizer/src/main/java/cn/edu/tsinghua/iginx/physical/optimizer/naive/initializer/AddSchemaPrefixInfoGenerator.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
3030
import cn.edu.tsinghua.iginx.utils.Pair;
3131
import java.util.*;
32+
import org.apache.arrow.util.Preconditions;
3233

3334
public class AddSchemaPrefixInfoGenerator implements UnaryExecutorFactory<ProjectExecutor> {
3435

3536
private final AddSchemaPrefix operator;
3637

3738
public AddSchemaPrefixInfoGenerator(AddSchemaPrefix operator) {
3839
this.operator = Objects.requireNonNull(operator);
40+
Preconditions.checkNotNull(operator.getSchemaPrefix());
3941
}
4042

4143
@Override
@@ -45,8 +47,7 @@ public ProjectExecutor initialize(ExecutorContext context, BatchSchema inputSche
4547
return new ProjectExecutor(context, inputSchema.raw(), expressions);
4648
}
4749

48-
public List<ScalarExpression<?>> getExpression(ExecutorContext context, BatchSchema inputSchema)
49-
throws ComputeException {
50+
public List<ScalarExpression<?>> getExpression(ExecutorContext context, BatchSchema inputSchema) {
5051
List<Pair<String, Integer>> columnsAndIndices = getColumnsAndIndices(inputSchema, operator);
5152
List<ScalarExpression<?>> ret = new ArrayList<>();
5253
for (Pair<String, Integer> pair : columnsAndIndices) {

0 commit comments

Comments
 (0)