Skip to content

Commit a7a1cbf

Browse files
committed
fix DeleteReadTests
1 parent bc33f4c commit a7a1cbf

File tree

3 files changed

+135
-43
lines changed

3 files changed

+135
-43
lines changed

iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iceberg.data;
2121

22+
import java.io.File;
2223
import java.io.IOException;
2324
import java.util.List;
2425
import java.util.Set;
@@ -44,6 +45,8 @@
4445
import org.junit.Test;
4546
import org.junit.rules.TemporaryFolder;
4647

48+
import static org.assertj.core.api.Assertions.assertThat;
49+
4750
public abstract class DeleteReadTests {
4851
// Schema passed to create tables
4952
public static final Schema SCHEMA = new Schema(
@@ -64,6 +67,12 @@ public abstract class DeleteReadTests {
6467
private List<Record> records = null;
6568
private DataFile dataFile = null;
6669

70+
protected final int formatVersion;
71+
72+
public DeleteReadTests(int formatVersion) {
73+
this.formatVersion = formatVersion;
74+
}
75+
6776
@Before
6877
public void writeTestDataFile() throws IOException {
6978
this.tableName = "test";
@@ -102,6 +111,27 @@ protected boolean expectPruned() {
102111
return true;
103112
}
104113

114+
protected boolean countDeletes() {
115+
return false;
116+
}
117+
118+
/**
119+
* This will only be called after calling rowSet(String, Table, String...), and only if
120+
* countDeletes() is true.
121+
*/
122+
protected long deleteCount() {
123+
return 0L;
124+
}
125+
126+
protected void checkDeleteCount(long expectedDeletes) {
127+
if (countDeletes()) {
128+
long actualDeletes = deleteCount();
129+
assertThat(actualDeletes)
130+
.as("Table should contain expected number of deletes")
131+
.isEqualTo(expectedDeletes);
132+
}
133+
}
134+
105135
@Test
106136
public void testEqualityDeletes() throws IOException {
107137
Schema deleteRowSchema = table.schema().select("data");
@@ -119,7 +149,7 @@ public void testEqualityDeletes() throws IOException {
119149
.addDeletes(eqDeletes)
120150
.commit();
121151

122-
StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
152+
StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
123153
StructLikeSet actual = rowSet(tableName, table, "*");
124154

125155
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -142,7 +172,7 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException {
142172
.addDeletes(eqDeletes)
143173
.commit();
144174

145-
StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id");
175+
StructLikeSet expected = selectColumns(rowSetWithoutIds(table, records, 29, 89, 122), "id");
146176
StructLikeSet actual = rowSet(tableName, table, "id");
147177

148178
if (expectPruned()) {
@@ -180,7 +210,7 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException {
180210
.addDeletes(eqDeletes)
181211
.commit();
182212

183-
StructLikeSet expected = rowSetWithoutIds(29, 89, 122, 144);
213+
StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144);
184214
StructLikeSet actual = rowSet(tableName, table, "*");
185215

186216
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -194,15 +224,20 @@ public void testPositionDeletes() throws IOException {
194224
Pair.of(dataFile.path(), 6L) // id = 122
195225
);
196226

197-
Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
198-
table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
227+
Pair<DeleteFile, CharSequenceSet> posDeletes =
228+
FileHelpers.writeDeleteFile(
229+
table,
230+
Files.localOutput(File.createTempFile("junit", null, temp.getRoot())),
231+
Row.of(0),
232+
deletes,
233+
formatVersion);
199234

200235
table.newRowDelta()
201236
.addDeletes(posDeletes.first())
202237
.validateDataFilesExist(posDeletes.second())
203238
.commit();
204239

205-
StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
240+
StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
206241
StructLikeSet actual = rowSet(tableName, table, "*");
207242

208243
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -212,33 +247,47 @@ public void testPositionDeletes() throws IOException {
212247
public void testMixedPositionAndEqualityDeletes() throws IOException {
213248
Schema dataSchema = table.schema().select("data");
214249
Record dataDelete = GenericRecord.create(dataSchema);
215-
List<Record> dataDeletes = Lists.newArrayList(
216-
dataDelete.copy("data", "a"), // id = 29
217-
dataDelete.copy("data", "d"), // id = 89
218-
dataDelete.copy("data", "g") // id = 122
219-
);
220-
221-
DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
222-
table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
223-
224-
List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
225-
Pair.of(dataFile.path(), 3L), // id = 89
226-
Pair.of(dataFile.path(), 5L) // id = 121
227-
);
228-
229-
Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
230-
table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
231-
232-
table.newRowDelta()
233-
.addDeletes(eqDeletes)
234-
.addDeletes(posDeletes.first())
235-
.validateDataFilesExist(posDeletes.second())
236-
.commit();
237-
238-
StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
250+
List<Record> dataDeletes =
251+
Lists.newArrayList(
252+
dataDelete.copy("data", "a"), // id = 29
253+
dataDelete.copy("data", "d"), // id = 89
254+
dataDelete.copy("data", "g") // id = 122
255+
);
256+
257+
DeleteFile eqDeletes =
258+
FileHelpers.writeDeleteFile(
259+
table,
260+
Files.localOutput(File.createTempFile("junit", null, temp.getRoot())),
261+
Row.of(0),
262+
dataDeletes,
263+
dataSchema);
264+
265+
List<Pair<CharSequence, Long>> deletes =
266+
Lists.newArrayList(
267+
Pair.of(dataFile.location(), 3L), // id = 89
268+
Pair.of(dataFile.location(), 5L) // id = 121
269+
);
270+
271+
Pair<DeleteFile, CharSequenceSet> posDeletes =
272+
FileHelpers.writeDeleteFile(
273+
table,
274+
Files.localOutput(File.createTempFile("junit", null, temp.getRoot())),
275+
Row.of(0),
276+
deletes,
277+
formatVersion);
278+
279+
table
280+
.newRowDelta()
281+
.addDeletes(eqDeletes)
282+
.addDeletes(posDeletes.first())
283+
.validateDataFilesExist(posDeletes.second())
284+
.commit();
285+
286+
StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122);
239287
StructLikeSet actual = rowSet(tableName, table, "*");
240288

241-
Assert.assertEquals("Table should contain expected rows", expected, actual);
289+
assertThat(actual).as("Table should contain expected rows").isEqualTo(expected);
290+
checkDeleteCount(4L);
242291
}
243292

244293
@Test
@@ -269,7 +318,7 @@ public void testMultipleEqualityDeleteSchemas() throws IOException {
269318
.addDeletes(idEqDeletes)
270319
.commit();
271320

272-
StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
321+
StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122);
273322
StructLikeSet actual = rowSet(tableName, table, "*");
274323

275324
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -306,7 +355,7 @@ public void testEqualityDeleteByNull() throws IOException {
306355
.addDeletes(eqDeletes)
307356
.commit();
308357

309-
StructLikeSet expected = rowSetWithoutIds(131);
358+
StructLikeSet expected = rowSetWithoutIds(table, records, 131);
310359
StructLikeSet actual = rowSet(tableName, table, "*");
311360

312361
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -321,12 +370,14 @@ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) {
321370
return set;
322371
}
323372

324-
private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
373+
protected static StructLikeSet rowSetWithoutIds(
374+
Table table, List<Record> recordList, int... idsToRemove) {
325375
Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
326376
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
327-
records.stream()
328-
.filter(row -> !deletedIds.contains(row.getField("id")))
329-
.forEach(set::add);
377+
recordList.stream()
378+
.filter(row -> !deletedIds.contains(row.getField("id")))
379+
.map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record))
380+
.forEach(set::add);
330381
return set;
331382
}
332383
}

iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.iceberg.Schema;
3131
import org.apache.iceberg.StructLike;
3232
import org.apache.iceberg.Table;
33+
import org.apache.iceberg.deletes.BaseDVFileWriter;
34+
import org.apache.iceberg.deletes.DVFileWriter;
3335
import org.apache.iceberg.deletes.EqualityDeleteWriter;
3436
import org.apache.iceberg.deletes.PositionDelete;
3537
import org.apache.iceberg.deletes.PositionDeleteWriter;
@@ -38,7 +40,10 @@
3840
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
3941
import org.apache.iceberg.io.FileAppender;
4042
import org.apache.iceberg.io.FileAppenderFactory;
43+
import org.apache.iceberg.io.FileWriterFactory;
4144
import org.apache.iceberg.io.OutputFile;
45+
import org.apache.iceberg.io.OutputFileFactory;
46+
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
4247
import org.apache.iceberg.types.Types;
4348
import org.apache.iceberg.util.CharSequenceSet;
4449
import org.apache.iceberg.util.Pair;
@@ -95,6 +100,43 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike
95100
return writer.toDeleteFile();
96101
}
97102

103+
public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(
104+
Table table,
105+
OutputFile out,
106+
StructLike partition,
107+
List<Pair<CharSequence, Long>> deletes,
108+
int formatVersion)
109+
throws IOException {
110+
if (formatVersion >= 3) {
111+
OutputFileFactory fileFactory =
112+
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();
113+
DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
114+
try (DVFileWriter closeableWriter = writer) {
115+
for (Pair<CharSequence, Long> delete : deletes) {
116+
closeableWriter.delete(
117+
delete.first().toString(), delete.second(), table.spec(), partition);
118+
}
119+
}
120+
121+
return Pair.of(
122+
Iterables.getOnlyElement(writer.result().deleteFiles()),
123+
writer.result().referencedDataFiles());
124+
} else {
125+
FileWriterFactory<Record> factory = GenericFileWriterFactory.builderFor(table).build();
126+
127+
PositionDeleteWriter<Record> writer =
128+
factory.newPositionDeleteWriter(encrypt(out), table.spec(), partition);
129+
PositionDelete<Record> posDelete = PositionDelete.create();
130+
try (Closeable toClose = writer) {
131+
for (Pair<CharSequence, Long> delete : deletes) {
132+
writer.write(posDelete.set(delete.first(), delete.second(), null));
133+
}
134+
}
135+
136+
return Pair.of(writer.toDeleteFile(), writer.referencedDataFiles());
137+
}
138+
}
139+
98140
public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows) throws IOException {
99141
FileFormat format = defaultFormat(table.properties());
100142
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());

iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests {
4949

5050
// parametrized variables
5151
private final String inputFormat;
52-
private final int formatVersion;
5352
private final FileFormat fileFormat;
5453

5554
@Parameterized.Parameters(name = "fileFormat = {0}, formatVersion = {1}, inputFormat={2}")
@@ -61,8 +60,8 @@ public static Object[][] parameters() {
6160
{ FileFormat.PARQUET, 2, "MapredIcebergInputFormat" },
6261
{ FileFormat.AVRO, 2, "MapredIcebergInputFormat" },
6362
{ FileFormat.ORC, 2, "MapredIcebergInputFormat" },
64-
{FileFormat.PARQUET, 3, "IcebergInputFormat"},
65-
{FileFormat.PARQUET, 3, "MapredIcebergInputFormat"},
63+
{ FileFormat.PARQUET, 3, "IcebergInputFormat" },
64+
{ FileFormat.PARQUET, 3, "MapredIcebergInputFormat" },
6665
};
6766
}
6867

@@ -73,10 +72,10 @@ public void writeTestDataFile() throws IOException {
7372
super.writeTestDataFile();
7473
}
7574

76-
public TestInputFormatReaderDeletes(String inputFormat, int formatVersion, FileFormat fileFormat) {
77-
this.inputFormat = inputFormat;
78-
this.formatVersion = formatVersion;
75+
public TestInputFormatReaderDeletes(FileFormat fileFormat, int formatVersion, String inputFormat) {
76+
super(formatVersion);
7977
this.fileFormat = fileFormat;
78+
this.inputFormat = inputFormat;
8079
}
8180

8281
@Override

0 commit comments

Comments
 (0)