Skip to content

Commit 1694fb4

Browse files
committed
feat(arrow): use arrow impl cross-join
1 parent 60fee23 commit 1694fb4

File tree

6 files changed

+374
-121
lines changed

6 files changed

+374
-121
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* IGinX - the polystore system with high performance
3+
* Copyright (C) Tsinghua University
4+
5+
*
6+
* This program is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License as published by the Free Software Foundation; either
9+
* version 3 of the License, or (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
* Lesser General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU Lesser General Public License
17+
* along with this program; if not, write to the Free Software Foundation,
18+
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19+
*/
20+
package cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.join;
21+
22+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.scalar.expression.ScalarExpression;
23+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.scalar.expression.ScalarExpressionUtils;
24+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.util.*;
25+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.util.exception.ComputeException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Objects;
29+
import javax.annotation.WillClose;
30+
import javax.annotation.WillCloseWhenClosed;
31+
import org.apache.arrow.memory.BufferAllocator;
32+
import org.apache.arrow.util.Preconditions;
33+
import org.apache.arrow.vector.*;
34+
import org.apache.arrow.vector.types.pojo.Schema;
35+
36+
public class JoinArrayList implements AutoCloseable {
37+
38+
private final BufferAllocator allocator;
39+
private final List<ScalarExpression<?>> outputExpressions;
40+
private final Schema probeSideSchema;
41+
private final VectorSchemaRoot buildSideSingleBatch;
42+
private final ResultConsumer resultConsumer;
43+
44+
JoinArrayList(
45+
BufferAllocator allocator,
46+
List<ScalarExpression<?>> outputExpressions,
47+
Schema probeSideSchema,
48+
@WillCloseWhenClosed VectorSchemaRoot buildSideSingleBatch,
49+
ResultConsumer resultConsumer) {
50+
this.allocator = allocator;
51+
this.outputExpressions = outputExpressions;
52+
this.probeSideSchema = probeSideSchema;
53+
this.buildSideSingleBatch = buildSideSingleBatch;
54+
this.resultConsumer = resultConsumer;
55+
}
56+
57+
@Override
58+
public void close() {
59+
buildSideSingleBatch.close();
60+
}
61+
62+
public void probe(VectorSchemaRoot probeSideBatch) throws ComputeException {
63+
Preconditions.checkArgument(probeSideBatch.getSchema().equals(probeSideSchema));
64+
65+
try (SelectionBuilder buildSideCandidateIndicesBuilder =
66+
new SelectionBuilder(allocator, "tempBuildSideIndices", probeSideBatch.getRowCount());
67+
SelectionBuilder probeSideCandidateIndicesBuilder =
68+
new SelectionBuilder(allocator, "tempProbeSideIndices", probeSideBatch.getRowCount())) {
69+
70+
for (int probeSideIndex = 0;
71+
probeSideIndex < probeSideBatch.getRowCount();
72+
probeSideIndex++) {
73+
for (int buildSideIndex = 0;
74+
buildSideIndex < buildSideSingleBatch.getRowCount();
75+
buildSideIndex++) {
76+
buildSideCandidateIndicesBuilder.append(buildSideIndex);
77+
probeSideCandidateIndicesBuilder.append(probeSideIndex);
78+
}
79+
}
80+
81+
try (ArrayDictionaryProvider outputDictionaryProvider =
82+
ArrayDictionaryProvider.of(allocator, buildSideSingleBatch, probeSideBatch);
83+
IntVector buildSideIndices = buildSideCandidateIndicesBuilder.build();
84+
IntVector probeSideIndices = probeSideCandidateIndicesBuilder.build()) {
85+
output(outputDictionaryProvider, buildSideIndices, probeSideIndices);
86+
}
87+
}
88+
}
89+
90+
private void output(
91+
ArrayDictionaryProvider dictionaryProvider,
92+
BaseIntVector buildSideIndices,
93+
BaseIntVector probeSideIndices)
94+
throws ComputeException {
95+
Preconditions.checkArgument(
96+
buildSideIndices.getValueCount() == probeSideIndices.getValueCount());
97+
98+
int buildSideColumnCount = buildSideSingleBatch.getFieldVectors().size();
99+
int probeSideColumnCount = probeSideSchema.getFields().size();
100+
101+
List<FieldVector> vectors = new ArrayList<>();
102+
for (int i = 0; i < buildSideColumnCount; i++) {
103+
vectors.add(ValueVectors.slice(allocator, buildSideIndices, dictionaryProvider.lookup(i)));
104+
}
105+
for (int i = 0; i < probeSideColumnCount; i++) {
106+
vectors.add(
107+
ValueVectors.slice(
108+
allocator, probeSideIndices, dictionaryProvider.lookup(i + buildSideColumnCount)));
109+
}
110+
111+
try (VectorSchemaRoot result =
112+
VectorSchemaRoots.create(vectors, probeSideIndices.getValueCount());
113+
VectorSchemaRoot output =
114+
ScalarExpressionUtils.evaluate(
115+
allocator, dictionaryProvider, result, null, outputExpressions)) {
116+
resultConsumer.consume(
117+
dictionaryProvider.slice(allocator), VectorSchemaRoots.transfer(allocator, output), null);
118+
}
119+
}
120+
121+
public static class Builder implements AutoCloseable {
122+
123+
private final BufferAllocator allocator;
124+
private final Schema buildSideSchema;
125+
private final List<VectorSchemaRoot> buildSideBatches;
126+
127+
public Builder(BufferAllocator allocator, Schema buildSideSchema) {
128+
this.allocator = Objects.requireNonNull(allocator);
129+
this.buildSideSchema = Objects.requireNonNull(buildSideSchema);
130+
this.buildSideBatches = new ArrayList<>();
131+
}
132+
133+
@Override
134+
public void close() {
135+
buildSideBatches.forEach(VectorSchemaRoot::close);
136+
buildSideBatches.clear();
137+
}
138+
139+
public Builder add(@WillClose VectorSchemaRoot buildSideBatch) throws ComputeException {
140+
buildSideBatches.add(buildSideBatch);
141+
return this;
142+
}
143+
144+
public JoinArrayList build(
145+
BufferAllocator allocator,
146+
List<ScalarExpression<?>> outputExpressions,
147+
Schema probeSideSchema,
148+
ResultConsumer resultConsumer)
149+
throws ComputeException {
150+
Preconditions.checkNotNull(allocator);
151+
Preconditions.checkNotNull(probeSideSchema);
152+
Preconditions.checkNotNull(resultConsumer);
153+
154+
try (VectorSchemaRoot buildSideSingleBatch =
155+
VectorSchemaRoots.concat(allocator, buildSideSchema, buildSideBatches)) {
156+
return new JoinArrayList(
157+
allocator,
158+
outputExpressions,
159+
probeSideSchema,
160+
VectorSchemaRoots.transfer(allocator, buildSideSingleBatch),
161+
resultConsumer);
162+
}
163+
}
164+
}
165+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* IGinX - the polystore system with high performance
3+
* Copyright (C) Tsinghua University
4+
5+
*
6+
* This program is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License as published by the Free Software Foundation; either
9+
* version 3 of the License, or (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
* Lesser General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU Lesser General Public License
17+
* along with this program; if not, write to the Free Software Foundation,
18+
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19+
*/
20+
package cn.edu.tsinghua.iginx.engine.physical.memory.execute.executor.binary.stateful;
21+
22+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.join.JoinArrayList;
23+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.scalar.expression.ScalarExpression;
24+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.scalar.expression.ScalarExpressionUtils;
25+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.compute.util.exception.ComputeException;
26+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.executor.ExecutorContext;
27+
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.executor.util.Batch;
28+
import cn.edu.tsinghua.iginx.engine.shared.data.read.BatchSchema;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import org.apache.arrow.vector.VectorSchemaRoot;
32+
import org.apache.arrow.vector.types.pojo.Field;
33+
import org.apache.arrow.vector.types.pojo.Schema;
34+
35+
/**
36+
* This class is used to execute hash join operation. Left is the build side, and right is the probe
37+
* side.
38+
*/
39+
public class CrossJoinExecutor extends StatefulBinaryExecutor {
40+
41+
private final List<ScalarExpression<?>> outputExpressions;
42+
43+
private final JoinArrayList.Builder joinArrayListBuilder;
44+
private JoinArrayList joinArrayList;
45+
private Schema outputSchema;
46+
47+
public CrossJoinExecutor(
48+
ExecutorContext context,
49+
BatchSchema leftSchema,
50+
BatchSchema rightSchema,
51+
List<ScalarExpression<?>> outputExpressions) {
52+
super(context, leftSchema, rightSchema, 1);
53+
this.outputExpressions = new ArrayList<>(outputExpressions);
54+
this.joinArrayListBuilder =
55+
new JoinArrayList.Builder(context.getAllocator(), getLeftSchema().raw());
56+
}
57+
58+
@Override
59+
public Schema getOutputSchema() throws ComputeException {
60+
if (outputSchema == null) {
61+
List<Field> outputFields = new ArrayList<>();
62+
outputFields.addAll(leftSchema.raw().getFields());
63+
outputFields.addAll(rightSchema.raw().getFields());
64+
this.outputSchema =
65+
ScalarExpressionUtils.getOutputSchema(
66+
context.getAllocator(), outputExpressions, new Schema(outputFields));
67+
}
68+
return outputSchema;
69+
}
70+
71+
@Override
72+
protected String getInfo() {
73+
return "CrossJoin output " + outputExpressions;
74+
}
75+
76+
@Override
77+
public void close() throws ComputeException {
78+
joinArrayListBuilder.close();
79+
if (joinArrayList != null) {
80+
joinArrayList.close();
81+
}
82+
super.close();
83+
}
84+
85+
@Override
86+
public boolean needConsumeRight() throws ComputeException {
87+
return super.needConsumeRight() && joinArrayList != null;
88+
}
89+
90+
@Override
91+
protected void consumeLeftUnchecked(Batch batch) throws ComputeException {
92+
joinArrayListBuilder.add(batch.flattened(context.getAllocator()));
93+
}
94+
95+
@Override
96+
protected void consumeLeftEndUnchecked() throws ComputeException {
97+
joinArrayList =
98+
joinArrayListBuilder.build(
99+
context.getAllocator(),
100+
outputExpressions,
101+
getRightSchema().raw(),
102+
(dictionaryProvider, data, selection) ->
103+
offerResult(Batch.of(data, dictionaryProvider, selection)));
104+
}
105+
106+
@Override
107+
protected void consumeRightUnchecked(Batch batch) throws ComputeException {
108+
try (VectorSchemaRoot batchFlattened = batch.flattened(context.getAllocator())) {
109+
joinArrayList.probe(batchFlattened);
110+
}
111+
}
112+
113+
@Override
114+
protected void consumeRightEndUnchecked() throws ComputeException {}
115+
}

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/utils/PhysicalJoinUtils.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,21 @@
1919
*/
2020
package cn.edu.tsinghua.iginx.engine.physical.utils;
2121

22-
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
23-
import cn.edu.tsinghua.iginx.engine.shared.operator.MarkJoin;
24-
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
25-
import cn.edu.tsinghua.iginx.engine.shared.operator.SingleJoin;
22+
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
2623
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
2724

2825
public class PhysicalJoinUtils {
2926
private PhysicalJoinUtils() {}
3027

28+
public static CrossJoin reverse(CrossJoin operator) {
29+
return new CrossJoin(
30+
operator.getSourceB(),
31+
operator.getSourceA(),
32+
operator.getPrefixB(),
33+
operator.getPrefixA(),
34+
operator.getExtraJoinPrefix());
35+
}
36+
3137
public static SingleJoin reverse(SingleJoin join) {
3238
return new SingleJoin(
3339
join.getSourceB(),

0 commit comments

Comments
 (0)