Skip to content

Commit 931bdec

Browse files
committed
copy PartitionStatsHandler because ORC is not supported in internal readers/writers so we fall back to AVRO
1 parent b31892c commit 931bdec

File tree

3 files changed

+927
-0
lines changed

3 files changed

+927
-0
lines changed
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.data;
21+
22+
import java.io.IOException;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.Locale;
26+
import java.util.UUID;
27+
import org.apache.iceberg.FileFormat;
28+
import org.apache.iceberg.HasTableOperations;
29+
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
30+
import org.apache.iceberg.PartitionSpec;
31+
import org.apache.iceberg.PartitionStatisticsFile;
32+
import org.apache.iceberg.PartitionStats;
33+
import org.apache.iceberg.PartitionStatsUtil;
34+
import org.apache.iceberg.Partitioning;
35+
import org.apache.iceberg.Schema;
36+
import org.apache.iceberg.Snapshot;
37+
import org.apache.iceberg.StructLike;
38+
import org.apache.iceberg.Table;
39+
import org.apache.iceberg.avro.Avro;
40+
import org.apache.iceberg.avro.InternalReader;
41+
import org.apache.iceberg.data.parquet.InternalWriter;
42+
import org.apache.iceberg.io.CloseableIterable;
43+
import org.apache.iceberg.io.DataWriter;
44+
import org.apache.iceberg.io.InputFile;
45+
import org.apache.iceberg.io.OutputFile;
46+
import org.apache.iceberg.parquet.Parquet;
47+
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
48+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
49+
import org.apache.iceberg.types.Types.IntegerType;
50+
import org.apache.iceberg.types.Types.LongType;
51+
import org.apache.iceberg.types.Types.NestedField;
52+
import org.apache.iceberg.types.Types.StructType;
53+
54+
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
55+
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
56+
57+
/**
58+
* Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers
59+
* to support writing and reading of the stats in table default format.
60+
*/
61+
public class PartitionStatsHandler {
62+
63+
private PartitionStatsHandler() {
64+
}
65+
66+
public static final int PARTITION_FIELD_ID = 0;
67+
public static final String PARTITION_FIELD_NAME = "partition";
68+
public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get());
69+
public static final NestedField DATA_RECORD_COUNT =
70+
NestedField.required(2, "data_record_count", LongType.get());
71+
public static final NestedField DATA_FILE_COUNT =
72+
NestedField.required(3, "data_file_count", IntegerType.get());
73+
public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
74+
NestedField.required(4, "total_data_file_size_in_bytes", LongType.get());
75+
public static final NestedField POSITION_DELETE_RECORD_COUNT =
76+
NestedField.optional(5, "position_delete_record_count", LongType.get());
77+
public static final NestedField POSITION_DELETE_FILE_COUNT =
78+
NestedField.optional(6, "position_delete_file_count", IntegerType.get());
79+
public static final NestedField EQUALITY_DELETE_RECORD_COUNT =
80+
NestedField.optional(7, "equality_delete_record_count", LongType.get());
81+
public static final NestedField EQUALITY_DELETE_FILE_COUNT =
82+
NestedField.optional(8, "equality_delete_file_count", IntegerType.get());
83+
public static final NestedField TOTAL_RECORD_COUNT =
84+
NestedField.optional(9, "total_record_count", LongType.get());
85+
public static final NestedField LAST_UPDATED_AT =
86+
NestedField.optional(10, "last_updated_at", LongType.get());
87+
public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
88+
NestedField.optional(11, "last_updated_snapshot_id", LongType.get());
89+
90+
/**
91+
* Generates the partition stats file schema based on a combined partition type which considers
92+
* all specs in a table.
93+
*
94+
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
95+
* Partitioning#partitionType(Table)}.
96+
* @return a schema that corresponds to the provided unified partition type.
97+
*/
98+
public static Schema schema(StructType unifiedPartitionType) {
99+
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
100+
return new Schema(
101+
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
102+
SPEC_ID,
103+
DATA_RECORD_COUNT,
104+
DATA_FILE_COUNT,
105+
TOTAL_DATA_FILE_SIZE_IN_BYTES,
106+
POSITION_DELETE_RECORD_COUNT,
107+
POSITION_DELETE_FILE_COUNT,
108+
EQUALITY_DELETE_RECORD_COUNT,
109+
EQUALITY_DELETE_FILE_COUNT,
110+
TOTAL_RECORD_COUNT,
111+
LAST_UPDATED_AT,
112+
LAST_UPDATED_SNAPSHOT_ID);
113+
}
114+
115+
/**
116+
* Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot.
117+
*
118+
* @param table The {@link Table} for which the partition statistics is computed.
119+
* @return {@link PartitionStatisticsFile} for the current snapshot, or null if no statistics are
120+
* present.
121+
*/
122+
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException {
123+
if (table.currentSnapshot() == null) {
124+
return null;
125+
}
126+
127+
return computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId());
128+
}
129+
130+
/**
131+
* Computes and writes the {@link PartitionStatisticsFile} for a given table and snapshot.
132+
*
133+
* @param table The {@link Table} for which the partition statistics is computed.
134+
* @param snapshotId snapshot for which partition statistics are computed.
135+
* @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are
136+
* present.
137+
*/
138+
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId)
139+
throws IOException {
140+
Snapshot snapshot = table.snapshot(snapshotId);
141+
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId);
142+
143+
Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, snapshot);
144+
if (stats.isEmpty()) {
145+
return null;
146+
}
147+
148+
StructType partitionType = Partitioning.partitionType(table);
149+
List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType);
150+
return writePartitionStatsFile(
151+
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
152+
}
153+
154+
@VisibleForTesting
155+
static PartitionStatisticsFile writePartitionStatsFile(
156+
Table table, long snapshotId, Schema dataSchema, Iterable<PartitionStats> records)
157+
throws IOException {
158+
FileFormat fileFormat =
159+
FileFormat.fromString(
160+
table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
161+
162+
OutputFile outputFile = newPartitionStatsFile(table, fileFormat, snapshotId);
163+
164+
try (DataWriter<StructLike> writer = dataWriter(dataSchema, outputFile, fileFormat)) {
165+
records.iterator().forEachRemaining(writer::write);
166+
}
167+
168+
return ImmutableGenericPartitionStatisticsFile.builder()
169+
.snapshotId(snapshotId)
170+
.path(outputFile.location())
171+
.fileSizeInBytes(outputFile.toInputFile().getLength())
172+
.build();
173+
}
174+
175+
/**
176+
* Reads partition statistics from the specified {@link InputFile} using given schema.
177+
*
178+
* @param schema The {@link Schema} of the partition statistics file.
179+
* @param inputFile An {@link InputFile} pointing to the partition stats file.
180+
*/
181+
public static CloseableIterable<PartitionStats> readPartitionStatsFile(
182+
Schema schema, InputFile inputFile) {
183+
CloseableIterable<StructLike> records = dataReader(schema, inputFile);
184+
return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats);
185+
}
186+
187+
private static OutputFile newPartitionStatsFile(
188+
Table table, FileFormat fileFormat, long snapshotId) {
189+
Preconditions.checkArgument(
190+
table instanceof HasTableOperations,
191+
"Table must have operations to retrieve metadata location");
192+
193+
return table
194+
.io()
195+
.newOutputFile(
196+
((HasTableOperations) table)
197+
.operations()
198+
.metadataFileLocation(
199+
fileFormat.addExtension(
200+
String.format(
201+
Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID()))));
202+
}
203+
204+
private static DataWriter<StructLike> dataWriter(
205+
Schema dataSchema, OutputFile outputFile, FileFormat fileFormat) throws IOException {
206+
switch (fileFormat) {
207+
case PARQUET:
208+
return Parquet.writeData(outputFile)
209+
.schema(dataSchema)
210+
.createWriterFunc(InternalWriter::createWriter)
211+
.withSpec(PartitionSpec.unpartitioned())
212+
.build();
213+
case ORC:
214+
// Internal writers are not supported for ORC yet. Temporary we go with AVRO.
215+
case AVRO:
216+
return Avro.writeData(outputFile)
217+
.schema(dataSchema)
218+
.createWriterFunc(org.apache.iceberg.avro.InternalWriter::create)
219+
.withSpec(PartitionSpec.unpartitioned())
220+
.build();
221+
default:
222+
throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name());
223+
}
224+
}
225+
226+
private static CloseableIterable<StructLike> dataReader(Schema schema, InputFile inputFile) {
227+
FileFormat fileFormat = FileFormat.fromFileName(inputFile.location());
228+
Preconditions.checkArgument(
229+
fileFormat != null, "Unable to determine format of file: %s", inputFile.location());
230+
231+
switch (fileFormat) {
232+
case PARQUET:
233+
return Parquet.read(inputFile)
234+
.project(schema)
235+
.createReaderFunc(
236+
fileSchema ->
237+
org.apache.iceberg.data.parquet.InternalReader.create(schema, fileSchema))
238+
.build();
239+
case ORC:
240+
// Internal writers are not supported for ORC yet. Temporary we go with AVRO.
241+
case AVRO:
242+
return Avro.read(inputFile)
243+
.project(schema)
244+
.createReaderFunc(fileSchema -> InternalReader.create(schema))
245+
.build();
246+
default:
247+
throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name());
248+
}
249+
}
250+
251+
private static PartitionStats recordToPartitionStats(StructLike record) {
252+
PartitionStats stats =
253+
new PartitionStats(
254+
record.get(PARTITION_FIELD_ID, StructLike.class),
255+
record.get(SPEC_ID.fieldId(), Integer.class));
256+
stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class));
257+
stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class));
258+
stats.set(
259+
TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(),
260+
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class));
261+
stats.set(
262+
POSITION_DELETE_RECORD_COUNT.fieldId(),
263+
record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class));
264+
stats.set(
265+
POSITION_DELETE_FILE_COUNT.fieldId(),
266+
record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class));
267+
stats.set(
268+
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
269+
record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class));
270+
stats.set(
271+
EQUALITY_DELETE_FILE_COUNT.fieldId(),
272+
record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class));
273+
stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class));
274+
stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class));
275+
stats.set(
276+
LAST_UPDATED_SNAPSHOT_ID.fieldId(),
277+
record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class));
278+
return stats;
279+
}
280+
}

iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,48 @@ public static DataFile writeDataFile(Table table, OutputFile out, StructLike par
175175
.build();
176176
}
177177

178+
public static DeleteFile writePosDeleteFile(
179+
Table table, OutputFile out, StructLike partition, List<PositionDelete<?>> deletes)
180+
throws IOException {
181+
return writePosDeleteFile(table, out, partition, deletes, 2);
182+
}
183+
184+
public static DeleteFile writePosDeleteFile(
185+
Table table,
186+
OutputFile out,
187+
StructLike partition,
188+
List<PositionDelete<?>> deletes,
189+
int formatVersion)
190+
throws IOException {
191+
if (formatVersion >= 3) {
192+
OutputFileFactory fileFactory =
193+
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();
194+
DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
195+
try (DVFileWriter closeableWriter = writer) {
196+
for (PositionDelete<?> delete : deletes) {
197+
closeableWriter.delete(delete.path().toString(), delete.pos(), table.spec(), partition);
198+
}
199+
}
200+
201+
return Iterables.getOnlyElement(writer.result().deleteFiles());
202+
} else {
203+
FileWriterFactory<Record> factory =
204+
GenericFileWriterFactory.builderFor(table)
205+
.positionDeleteRowSchema(table.schema())
206+
.build();
207+
208+
PositionDeleteWriter<?> writer =
209+
factory.newPositionDeleteWriter(encrypt(out), table.spec(), partition);
210+
try (Closeable toClose = writer) {
211+
for (PositionDelete delete : deletes) {
212+
writer.write(delete);
213+
}
214+
}
215+
216+
return writer.toDeleteFile();
217+
}
218+
}
219+
178220
private static EncryptedOutputFile encrypt(OutputFile out) {
179221
return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY);
180222
}

0 commit comments

Comments
 (0)