Skip to content

Commit f6058b3

Browse files
committed
Allow row column for equality delete
1 parent fd2b81e commit f6058b3

File tree

8 files changed

+145
-20
lines changed

8 files changed

+145
-20
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ public ConnectorPageSource createPageSource(
349349
if (!deletes.isEmpty()) {
350350
Supplier<Optional<RowPredicate>> deletePredicate = memoize(() -> getDeleteManager(partitionSpec, partitionData)
351351
.getDeletePredicate(
352+
session,
352353
path,
353354
dataSequenceNumber,
354355
deletes,

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
package io.trino.plugin.iceberg.delete;
1515

1616
import io.trino.plugin.iceberg.IcebergColumnHandle;
17+
import io.trino.spi.connector.ConnectorSession;
1718

1819
import java.util.List;
1920

2021
public interface DeleteFilter
2122
{
22-
RowPredicate createPredicate(List<IcebergColumnHandle> columns, long dataSequenceNumber);
23+
RowPredicate createPredicate(ConnectorSession session, List<IcebergColumnHandle> columns, long dataSequenceNumber);
2324
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.trino.plugin.iceberg.delete.EqualityDeleteFilter.EqualityDeleteFilterBuilder;
2525
import io.trino.spi.TrinoException;
2626
import io.trino.spi.connector.ConnectorPageSource;
27+
import io.trino.spi.connector.ConnectorSession;
2728
import io.trino.spi.predicate.Domain;
2829
import io.trino.spi.predicate.NullableValue;
2930
import io.trino.spi.predicate.Range;
@@ -69,6 +70,7 @@ public DeleteManager(TypeManager typeManager)
6970
}
7071

7172
public Optional<RowPredicate> getDeletePredicate(
73+
ConnectorSession session,
7274
String dataFilePath,
7375
long dataSequenceNumber,
7476
List<DeleteFile> deleteFiles,
@@ -92,9 +94,9 @@ public Optional<RowPredicate> getDeletePredicate(
9294
}
9395

9496
Optional<RowPredicate> positionDeletes = createPositionDeleteFilter(dataFilePath, positionDeleteFiles, readerPageSourceWithRowPositions, deletePageSourceProvider)
95-
.map(filter -> filter.createPredicate(readColumns, dataSequenceNumber));
96-
Optional<RowPredicate> equalityDeletes = createEqualityDeleteFilter(equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream()
97-
.map(filter -> filter.createPredicate(readColumns, dataSequenceNumber))
97+
.map(filter -> filter.createPredicate(session, readColumns, dataSequenceNumber));
98+
Optional<RowPredicate> equalityDeletes = createEqualityDeleteFilter(session, equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream()
99+
.map(filter -> filter.createPredicate(session, readColumns, dataSequenceNumber))
98100
.reduce(RowPredicate::and);
99101

100102
if (positionDeletes.isEmpty()) {
@@ -168,7 +170,7 @@ private static boolean shouldLoadPositionDeleteFile(DeleteFile deleteFile, Optio
168170
(positionUpperBound.isEmpty() || positionUpperBound.get() >= startRowPosition.get());
169171
}
170172

171-
private List<EqualityDeleteFilter> createEqualityDeleteFilter(List<DeleteFile> equalityDeleteFiles, Schema schema, DeletePageSourceProvider deletePageSourceProvider)
173+
private List<EqualityDeleteFilter> createEqualityDeleteFilter(ConnectorSession session, List<DeleteFile> equalityDeleteFiles, Schema schema, DeletePageSourceProvider deletePageSourceProvider)
172174
{
173175
if (equalityDeleteFiles.isEmpty()) {
174176
return List.of();
@@ -189,7 +191,7 @@ private List<EqualityDeleteFilter> createEqualityDeleteFilter(List<DeleteFile> e
189191
EqualityDeleteFilterBuilder builder = equalityDeleteFiltersBySchema.computeIfAbsent(fieldIds, _ -> EqualityDeleteFilter.builder(schemaFromHandles(deleteColumns)));
190192
deleteFilters.add(builder);
191193

192-
ListenableFuture<?> loadFuture = builder.readEqualityDeletes(deleteFile, deleteColumns, deletePageSourceProvider);
194+
ListenableFuture<?> loadFuture = builder.readEqualityDeletes(session, deleteFile, deleteColumns, deletePageSourceProvider);
193195
if (loadFuture.state() != SUCCESS) {
194196
pendingLoads.add(loadFuture);
195197
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.trino.plugin.iceberg.delete.DeleteManager.DeletePageSourceProvider;
2222
import io.trino.spi.TrinoException;
2323
import io.trino.spi.connector.ConnectorPageSource;
24+
import io.trino.spi.connector.ConnectorSession;
2425
import io.trino.spi.connector.SourcePage;
2526
import io.trino.spi.predicate.TupleDomain;
2627
import io.trino.spi.type.Type;
@@ -53,7 +54,7 @@ private EqualityDeleteFilter(Schema deleteSchema, Map<StructLikeWrapper, DataSeq
5354
}
5455

5556
@Override
56-
public RowPredicate createPredicate(List<IcebergColumnHandle> columns, long splitDataSequenceNumber)
57+
public RowPredicate createPredicate(ConnectorSession session, List<IcebergColumnHandle> columns, long splitDataSequenceNumber)
5758
{
5859
StructType fileStructType = structTypeFromHandles(columns);
5960
StructType deleteStructType = deleteSchema.asStruct();
@@ -68,7 +69,7 @@ public RowPredicate createPredicate(List<IcebergColumnHandle> columns, long spli
6869
.toArray(Type[]::new);
6970

7071
return (page, position) -> {
71-
StructProjection row = projection.wrap(new LazyTrinoRow(types, page, position));
72+
StructProjection row = projection.wrap(new LazyTrinoRow(session, types, page, position));
7273
DataSequenceNumber maxDeleteVersion = deletedRows.get(structLikeWrapper.set(row));
7374
// clear reference to avoid memory leak
7475
structLikeWrapper.set(null);
@@ -94,19 +95,19 @@ private EqualityDeleteFilterBuilder(Schema deleteSchema)
9495
this.deletedRows = new ConcurrentHashMap<>();
9596
}
9697

97-
public ListenableFuture<?> readEqualityDeletes(DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
98+
public ListenableFuture<?> readEqualityDeletes(ConnectorSession session, DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
9899
{
99100
verify(deleteColumns.size() == deleteSchema.columns().size(), "delete columns size doesn't match delete schema size");
100101

101102
// ensure only one thread loads the file
102103
ListenableFutureTask<?> futureTask = loadingFiles.computeIfAbsent(
103104
deleteFile.path(),
104-
key -> ListenableFutureTask.create(() -> readEqualityDeletesInternal(deleteFile, deleteColumns, deletePageSourceProvider), null));
105+
key -> ListenableFutureTask.create(() -> readEqualityDeletesInternal(session, deleteFile, deleteColumns, deletePageSourceProvider), null));
105106
futureTask.run();
106107
return Futures.nonCancellationPropagating(futureTask);
107108
}
108109

109-
private void readEqualityDeletesInternal(DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
110+
private void readEqualityDeletesInternal(ConnectorSession session, DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
110111
{
111112
DataSequenceNumber sequenceNumber = new DataSequenceNumber(deleteFile.dataSequenceNumber());
112113
try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(deleteFile, deleteColumns, TupleDomain.all())) {
@@ -122,7 +123,7 @@ private void readEqualityDeletesInternal(DeleteFile deleteFile, List<IcebergColu
122123
}
123124

124125
for (int position = 0; position < page.getPositionCount(); position++) {
125-
TrinoRow row = new TrinoRow(types, page, position);
126+
TrinoRow row = new TrinoRow(session, types, page, position);
126127
deletedRows.merge(wrapper.copyFor(row), sequenceNumber, (existing, newValue) -> {
127128
if (existing.dataSequenceNumber() > newValue.dataSequenceNumber()) {
128129
return existing;

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
*/
1414
package io.trino.plugin.iceberg.delete;
1515

16+
import io.trino.spi.connector.ConnectorSession;
1617
import io.trino.spi.connector.SourcePage;
1718
import io.trino.spi.type.Type;
1819
import org.apache.iceberg.StructLike;
1920

2021
import static com.google.common.base.Preconditions.checkArgument;
2122
import static com.google.common.base.Preconditions.checkElementIndex;
22-
import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue;
23+
import static io.trino.plugin.iceberg.delete.TrinoRow.getObjectValue;
2324
import static java.util.Objects.requireNonNull;
2425

2526
/**
@@ -28,13 +29,15 @@
2829
final class LazyTrinoRow
2930
implements StructLike
3031
{
32+
private final ConnectorSession session;
3133
private final Type[] types;
3234
private final SourcePage page;
3335
private final int position;
3436
private final Object[] values;
3537

36-
public LazyTrinoRow(Type[] types, SourcePage page, int position)
38+
public LazyTrinoRow(ConnectorSession session, Type[] types, SourcePage page, int position)
3739
{
40+
this.session = requireNonNull(session, "session is null");
3841
checkArgument(types.length == page.getChannelCount(), "mismatched types for page");
3942
this.types = requireNonNull(types, "types is null");
4043
this.page = requireNonNull(page, "page is null");
@@ -68,7 +71,7 @@ private Object get(int i)
6871
return value;
6972
}
7073

71-
value = getIcebergValue(page.getBlock(i), position, types[i]);
74+
value = getObjectValue(session, page.getBlock(i), position, types[i]);
7275
values[i] = value;
7376
return value;
7477
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.trino.plugin.iceberg.IcebergColumnHandle;
1818
import io.trino.spi.block.Block;
1919
import io.trino.spi.connector.ConnectorPageSource;
20+
import io.trino.spi.connector.ConnectorSession;
2021
import io.trino.spi.connector.SourcePage;
2122
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
2223
import org.roaringbitmap.longlong.LongBitmapDataProvider;
@@ -39,7 +40,7 @@ public PositionDeleteFilter(ImmutableLongBitmapDataProvider deletedRows)
3940
}
4041

4142
@Override
42-
public RowPredicate createPredicate(List<IcebergColumnHandle> columns, long dataSequenceNumber)
43+
public RowPredicate createPredicate(ConnectorSession session, List<IcebergColumnHandle> columns, long dataSequenceNumber)
4344
{
4445
int filePosChannel = rowPositionChannel(columns);
4546
return (page, position) -> {

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,49 @@
1313
*/
1414
package io.trino.plugin.iceberg.delete;
1515

16+
import io.trino.spi.block.Block;
17+
import io.trino.spi.connector.ConnectorSession;
1618
import io.trino.spi.connector.SourcePage;
19+
import io.trino.spi.type.DecimalType;
20+
import io.trino.spi.type.RowType;
1721
import io.trino.spi.type.Type;
22+
import io.trino.spi.type.VarbinaryType;
23+
import io.trino.spi.type.VarcharType;
1824
import org.apache.iceberg.StructLike;
1925

2026
import java.util.Arrays;
27+
import java.util.List;
2128

2229
import static com.google.common.base.Preconditions.checkArgument;
23-
import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue;
30+
import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz;
31+
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros;
32+
import static io.trino.spi.type.BigintType.BIGINT;
33+
import static io.trino.spi.type.BooleanType.BOOLEAN;
34+
import static io.trino.spi.type.DateType.DATE;
35+
import static io.trino.spi.type.Decimals.readBigDecimal;
36+
import static io.trino.spi.type.DoubleType.DOUBLE;
37+
import static io.trino.spi.type.IntegerType.INTEGER;
38+
import static io.trino.spi.type.RealType.REAL;
39+
import static io.trino.spi.type.SmallintType.SMALLINT;
40+
import static io.trino.spi.type.TimeType.TIME_MICROS;
41+
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
42+
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
43+
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
44+
import static io.trino.spi.type.TinyintType.TINYINT;
45+
import static io.trino.spi.type.UuidType.UUID;
46+
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
2447

25-
final class TrinoRow
48+
public final class TrinoRow
2649
implements StructLike
2750
{
2851
private final Object[] values;
2952

30-
public TrinoRow(Type[] types, SourcePage page, int position)
53+
public TrinoRow(ConnectorSession session, Type[] types, SourcePage page, int position)
3154
{
3255
checkArgument(types.length == page.getChannelCount(), "mismatched types for page");
3356
values = new Object[types.length];
3457
for (int i = 0; i < values.length; i++) {
35-
values[i] = getIcebergValue(page.getBlock(i), position, types[i]);
58+
values[i] = getObjectValue(session, page.getBlock(i), position, types[i]);
3659
}
3760
}
3861

@@ -59,4 +82,84 @@ public String toString()
5982
{
6083
return "TrinoRow" + Arrays.toString(values);
6184
}
85+
86+
static Object getObjectValue(ConnectorSession session, Block block, int position, Type type)
87+
{
88+
if (block.isNull(position)) {
89+
return null;
90+
}
91+
if (type.equals(BIGINT)) {
92+
return BIGINT.getLong(block, position);
93+
}
94+
if (type.equals(TINYINT)) {
95+
return (int) TINYINT.getByte(block, position);
96+
}
97+
if (type.equals(SMALLINT)) {
98+
return (int) SMALLINT.getShort(block, position);
99+
}
100+
if (type.equals(INTEGER)) {
101+
return INTEGER.getInt(block, position);
102+
}
103+
if (type.equals(DATE)) {
104+
return DATE.getInt(block, position);
105+
}
106+
if (type.equals(BOOLEAN)) {
107+
return BOOLEAN.getBoolean(block, position);
108+
}
109+
if (type instanceof DecimalType decimalType) {
110+
return readBigDecimal(decimalType, block, position);
111+
}
112+
if (type.equals(REAL)) {
113+
return REAL.getFloat(block, position);
114+
}
115+
if (type.equals(DOUBLE)) {
116+
return DOUBLE.getDouble(block, position);
117+
}
118+
if (type.equals(TIME_MICROS)) {
119+
return TIME_MICROS.getLong(block, position) / PICOSECONDS_PER_MICROSECOND;
120+
}
121+
if (type.equals(TIMESTAMP_MICROS)) {
122+
return TIMESTAMP_MICROS.getLong(block, position);
123+
}
124+
if (type.equals(TIMESTAMP_TZ_MICROS)) {
125+
return timestampTzToMicros(getTimestampTz(block, position));
126+
}
127+
if (type instanceof VarbinaryType varbinaryType) {
128+
return varbinaryType.getSlice(block, position).toByteBuffer();
129+
}
130+
if (type instanceof VarcharType varcharType) {
131+
return varcharType.getSlice(block, position).toStringUtf8();
132+
}
133+
if (type.equals(UUID)) {
134+
return trinoUuidToJavaUuid(UUID.getSlice(block, position));
135+
}
136+
if (type instanceof RowType rowType) {
137+
List<Object> values = (List<Object>) rowType.getObjectValue(session, block, position);
138+
return rowData(values.toArray());
139+
}
140+
throw new UnsupportedOperationException("Type not supported as partition column: " + type.getDisplayName());
141+
}
142+
143+
public static StructLike rowData(Object... values)
144+
{
145+
return new StructLike() {
146+
@Override
147+
public int size()
148+
{
149+
return values.length;
150+
}
151+
152+
@Override
153+
public <T> T get(int i, Class<T> aClass)
154+
{
155+
return aClass.cast(values[i]);
156+
}
157+
158+
@Override
159+
public <T> void set(int i, T t)
160+
{
161+
throw new UnsupportedOperationException("Testing StructLike does not support setting values.");
162+
}
163+
};
164+
}
62165
}

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
9696
import static io.trino.plugin.iceberg.IcebergTestUtils.getMetadataFileAndUpdatedMillis;
9797
import static io.trino.plugin.iceberg.IcebergTestUtils.getTrinoCatalog;
98+
import static io.trino.plugin.iceberg.delete.TrinoRow.rowData;
9899
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable;
99100
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTableWithSchema;
100101
import static io.trino.spi.type.BigintType.BIGINT;
@@ -293,6 +294,18 @@ public void testV2TableWithEqualityDeleteWhenColumnIsNested()
293294
assertQuery("SELECT array_column[1], map_column[1], row_column.x FROM " + tableName, "SELECT 1, 2, 1 FROM nation WHERE regionkey != 1");
294295
}
295296

297+
@Test
298+
public void testV2TableWithEqualityDeleteWhenColumnIsRow()
299+
throws Exception
300+
{
301+
String tableName = "test_v2_equality_delete_column_is_row" + randomNameSuffix();
302+
assertUpdate("CREATE TABLE " + tableName + " AS " +
303+
"SELECT CAST(ROW(1, 2) AS ROW(x BIGINT, y DOUBLE)) row_column, regionkey FROM tpch.tiny.nation", 25);
304+
Table icebergTable = loadTable(tableName);
305+
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.empty(), ImmutableMap.of("row_column", rowData(1L, 2.0)));
306+
assertQueryReturnsEmptyResult("SELECT row_column.x FROM " + tableName);
307+
}
308+
296309
@Test
297310
public void testParquetMissingFieldId()
298311
throws Exception

0 commit comments

Comments
 (0)