ids) {
+ StructType structType = schema.asStruct();
+ return new StructProjection(structType, TypeUtil.project(structType, ids));
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ * This projection does not work with repeated types like lists and maps.
+ *
+ * @param dataSchema schema of rows wrapped by this projection
+ * @param projectedSchema result schema of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection create(Schema dataSchema, Schema projectedSchema) {
+ return new StructProjection(dataSchema.asStruct(), projectedSchema.asStruct());
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ *
This projection does not work with repeated types like lists and maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection create(StructType structType, StructType projectedStructType) {
+ return new StructProjection(structType, projectedStructType);
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ *
This projection allows missing fields and does not work with repeated types like lists and
+ * maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection createAllowMissing(
+ StructType structType, StructType projectedStructType) {
+ return new StructProjection(structType, projectedStructType, true);
+ }
+
+ private final StructType type;
+ private final int[] positionMap;
+ private final StructProjection[] nestedProjections;
+ private StructLike struct;
+
+ private StructProjection(
+ StructType type, int[] positionMap, StructProjection[] nestedProjections) {
+ this.type = type;
+ this.positionMap = positionMap;
+ this.nestedProjections = nestedProjections;
+ }
+
+ private StructProjection(StructType structType, StructType projection) {
+ this(structType, projection, false);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private StructProjection(StructType structType, StructType projection, boolean allowMissing) {
+ this.type = projection;
+ this.positionMap = new int[projection.fields().size()];
+ this.nestedProjections = new StructProjection[projection.fields().size()];
+
+ // set up the projection positions and any nested projections that are needed
+ List dataFields = structType.fields();
+ for (int pos = 0; pos < positionMap.length; pos += 1) {
+ Types.NestedField projectedField = projection.fields().get(pos);
+
+ boolean found = false;
+ for (int i = 0; !found && i < dataFields.size(); i += 1) {
+ Types.NestedField dataField = dataFields.get(i);
+ if (projectedField.fieldId() == dataField.fieldId()) {
+ found = true;
+ positionMap[pos] = i;
+ switch (projectedField.type().typeId()) {
+ case STRUCT:
+ nestedProjections[pos] =
+ new StructProjection(
+ dataField.type().asStructType(), projectedField.type().asStructType());
+ break;
+ case MAP:
+ MapType projectedMap = projectedField.type().asMapType();
+ MapType originalMap = dataField.type().asMapType();
+
+ boolean keyProjectable =
+ !projectedMap.keyType().isNestedType() ||
+ projectedMap.keyType().equals(originalMap.keyType());
+ boolean valueProjectable =
+ !projectedMap.valueType().isNestedType() ||
+ projectedMap.valueType().equals(originalMap.valueType());
+ Preconditions.checkArgument(
+ keyProjectable && valueProjectable,
+ "Cannot project a partial map key or value struct. Trying to project %s out of %s",
+ projectedField,
+ dataField);
+
+ nestedProjections[pos] = null;
+ break;
+ case LIST:
+ ListType projectedList = projectedField.type().asListType();
+ ListType originalList = dataField.type().asListType();
+
+ boolean elementProjectable =
+ !projectedList.elementType().isNestedType() ||
+ projectedList.elementType().equals(originalList.elementType());
+ Preconditions.checkArgument(
+ elementProjectable,
+ "Cannot project a partial list element struct. Trying to project %s out of %s",
+ projectedField,
+ dataField);
+
+ nestedProjections[pos] = null;
+ break;
+ default:
+ nestedProjections[pos] = null;
+ }
+ }
+ }
+
+ if (!found && projectedField.isOptional() && allowMissing) {
+ positionMap[pos] = -1;
+ nestedProjections[pos] = null;
+ } else if (!found) {
+ throw new IllegalArgumentException(
+ String.format("Cannot find field %s in %s", projectedField, structType));
+ }
+ }
+ }
+
+ public int projectedFields() {
+ return (int) Ints.asList(positionMap).stream().filter(val -> val != -1).count();
+ }
+
+ public StructProjection wrap(StructLike newStruct) {
+ this.struct = newStruct;
+ return this;
+ }
+
+ public StructProjection copyFor(StructLike newStruct) {
+ return new StructProjection(type, positionMap, nestedProjections).wrap(newStruct);
+ }
+
+ @Override
+ public int size() {
+ return type.fields().size();
+ }
+
+ @Override
+ public T get(int pos, Class javaClass) {
+ // struct can be null if wrap is not called first before the get call
+ // or if a null struct is wrapped.
+ if (struct == null) {
+ return null;
+ }
+
+ int structPos = positionMap[pos];
+ if (nestedProjections[pos] != null) {
+ StructLike nestedStruct = struct.get(structPos, StructLike.class);
+ if (nestedStruct == null) {
+ return null;
+ }
+
+ return javaClass.cast(nestedProjections[pos].wrap(nestedStruct));
+ }
+
+ if (structPos != -1) {
+ return struct.get(structPos, javaClass);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void set(int pos, T value) {
+ throw new UnsupportedOperationException("Cannot set fields in a TypeProjection");
+ }
+}
diff --git a/iceberg/patched-iceberg-core/pom.xml b/iceberg/patched-iceberg-core/pom.xml
index a894ef254a53..5180036b7cf5 100644
--- a/iceberg/patched-iceberg-core/pom.xml
+++ b/iceberg/patched-iceberg-core/pom.xml
@@ -94,6 +94,8 @@
**/HadoopInputFile.class
**/HadoopTableOperations.class
+ **/StructLikeMap.class
+ **/StructLikeWrapper.class
org.apache.iceberg.avro.ValueReaders.class
org.apache.iceberg.avro.ValueWriters.class
org.apache.iceberg.BaseScan.class
diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 654b82faf095..a1f31e28443a 100644
--- a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -23,16 +23,17 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Comparator;
import java.util.List;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.StructLikeMap;
-
-// TODO: remove class once upgraded to Iceberg v1.7.0
+import org.apache.iceberg.util.StructProjection;
/** A {@link Table} implementation that exposes a table's partitions as rows. */
public class PartitionsTable extends BaseMetadataTable {
@@ -168,21 +169,26 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
private static Iterable partitions(Table table, StaticTableScan scan) {
Types.StructType partitionType = Partitioning.partitionType(table);
- PartitionMap partitions = new PartitionMap(partitionType);
+
+ StructLikeMap partitions =
+ StructLikeMap.create(partitionType, new PartitionComparator(partitionType));
+
try (CloseableIterable>> entries = planEntries(scan)) {
for (ManifestEntry extends ContentFile>> entry : entries) {
Snapshot snapshot = table.snapshot(entry.snapshotId());
ContentFile> file = entry.file();
- StructLike partition =
+ StructLike key =
PartitionUtil.coercePartition(
partitionType, table.specs().get(file.specId()), file.partition());
- partitions.get(partition).update(file, snapshot);
+ partitions
+ .computeIfAbsent(key, () -> new Partition(key, partitionType))
+ .update(file, snapshot);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- return partitions.all();
+ return partitions.values();
}
@VisibleForTesting
@@ -241,26 +247,25 @@ private class PartitionsScan extends StaticTableScan {
}
}
- static class PartitionMap {
- private final StructLikeMap partitions;
- private final Types.StructType keyType;
+ private static class PartitionComparator implements Comparator {
+ private Comparator comparator;
- PartitionMap(Types.StructType type) {
- this.partitions = StructLikeMap.create(type);
- this.keyType = type;
+ private PartitionComparator(Types.StructType struct) {
+ this.comparator = Comparators.forType(struct);
}
- Partition get(StructLike key) {
- Partition partition = partitions.get(key);
- if (partition == null) {
- partition = new Partition(key, keyType);
- partitions.put(key, partition);
+ @Override
+ public int compare(StructLike o1, StructLike o2) {
+ if (o1 instanceof StructProjection && o2 instanceof StructProjection) {
+ int cmp =
+ Integer.compare(
+ ((StructProjection) o1).projectedFields(),
+ ((StructProjection) o2).projectedFields());
+ if (cmp != 0) {
+ return cmp;
+ }
}
- return partition;
- }
-
- Iterable all() {
- return partitions.values();
+ return comparator.compare(o1, o2);
}
}
@@ -293,27 +298,25 @@ void update(ContentFile> file, Snapshot snapshot) {
if (snapshot != null) {
long snapshotCommitTime = snapshot.timestampMillis() * 1000;
if (this.lastUpdatedAt == null || snapshotCommitTime > this.lastUpdatedAt) {
+ this.specId = file.specId();
+
this.lastUpdatedAt = snapshotCommitTime;
this.lastUpdatedSnapshotId = snapshot.snapshotId();
}
}
-
switch (file.content()) {
case DATA:
this.dataRecordCount += file.recordCount();
this.dataFileCount += 1;
- this.specId = file.specId();
this.dataFileSizeInBytes += file.fileSizeInBytes();
break;
case POSITION_DELETES:
this.posDeleteRecordCount += file.recordCount();
this.posDeleteFileCount += 1;
- this.specId = file.specId();
break;
case EQUALITY_DELETES:
this.eqDeleteRecordCount += file.recordCount();
this.eqDeleteFileCount += 1;
- this.specId = file.specId();
break;
default:
throw new UnsupportedOperationException(
@@ -322,15 +325,9 @@ void update(ContentFile> file, Snapshot snapshot) {
}
/** Needed because StructProjection is not serializable */
- private PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
- PartitionData data = new PartitionData(keyType);
- for (int i = 0; i < keyType.fields().size(); i++) {
- Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass());
- if (val != null) {
- data.set(i, val);
- }
- }
- return data;
+ private static PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
+ PartitionData keyTemplate = new PartitionData(keyType);
+ return keyTemplate.copyFor(key);
}
}
}
diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
new file mode 100644
index 000000000000..0efc9e44680c
--- /dev/null
+++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+
+public class StructLikeMap extends AbstractMap implements Map {
+
+ public static StructLikeMap create(
+ Types.StructType type, Comparator comparator) {
+ return new StructLikeMap<>(type, comparator);
+ }
+
+ public static StructLikeMap create(Types.StructType type) {
+ return create(type, Comparators.forType(type));
+ }
+
+ private final Types.StructType type;
+ private final Map wrapperMap;
+ private final ThreadLocal wrappers;
+
+ private StructLikeMap(Types.StructType type, Comparator comparator) {
+ this.type = type;
+ this.wrapperMap = Maps.newHashMap();
+ this.wrappers = ThreadLocal.withInitial(() -> StructLikeWrapper.forType(type, comparator));
+ }
+
+ @Override
+ public int size() {
+ return wrapperMap.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return wrapperMap.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ if (key instanceof StructLike || key == null) {
+ StructLikeWrapper wrapper = wrappers.get();
+ boolean result = wrapperMap.containsKey(wrapper.set((StructLike) key));
+ wrapper.set(null); // don't hold a reference to the key.
+ return result;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return wrapperMap.containsValue(value);
+ }
+
+ @Override
+ public T get(Object key) {
+ if (key instanceof StructLike || key == null) {
+ StructLikeWrapper wrapper = wrappers.get();
+ T value = wrapperMap.get(wrapper.set((StructLike) key));
+ wrapper.set(null); // don't hold a reference to the key.
+ return value;
+ }
+ return null;
+ }
+
+ @Override
+ public T put(StructLike key, T value) {
+ return wrapperMap.put(wrappers.get().copyFor(key), value);
+ }
+
+ @Override
+ public T remove(Object key) {
+ if (key instanceof StructLike || key == null) {
+ StructLikeWrapper wrapper = wrappers.get();
+ T value = wrapperMap.remove(wrapper.set((StructLike) key));
+ wrapper.set(null); // don't hold a reference to the key.
+ return value;
+ }
+ return null;
+ }
+
+ @Override
+ public void clear() {
+ wrapperMap.clear();
+ }
+
+ @Override
+ public Set keySet() {
+ StructLikeSet keySet = StructLikeSet.create(type);
+ for (StructLikeWrapper wrapper : wrapperMap.keySet()) {
+ keySet.add(wrapper.get());
+ }
+ return keySet;
+ }
+
+ @Override
+ public Collection values() {
+ return wrapperMap.values();
+ }
+
+ @Override
+ public Set> entrySet() {
+ Set> entrySet = Sets.newHashSet();
+ for (Entry entry : wrapperMap.entrySet()) {
+ entrySet.add(new StructLikeEntry<>(entry));
+ }
+ return entrySet;
+ }
+
+ public T computeIfAbsent(StructLike struct, Supplier valueSupplier) {
+ return wrapperMap.computeIfAbsent(wrappers.get().copyFor(struct), key -> valueSupplier.get());
+ }
+
+ private static class StructLikeEntry implements Entry {
+
+ private final Entry inner;
+
+ private StructLikeEntry(Entry inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public StructLike getKey() {
+ return inner.getKey().get();
+ }
+
+ @Override
+ public R getValue() {
+ return inner.getValue();
+ }
+
+ @Override
+ public int hashCode() {
+ return inner.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StructLikeEntry> that = (StructLikeEntry>) o;
+ return inner.equals(that.inner);
+ }
+
+ @Override
+ public R setValue(R value) {
+ throw new UnsupportedOperationException("Does not support setValue.");
+ }
+ }
+
+ public StructLikeMap transformValues(Function func) {
+ StructLikeMap result = create(type);
+ wrapperMap.forEach((key, value) -> result.put(key.get(), func.apply(value)));
+ return result;
+ }
+}
diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
new file mode 100644
index 000000000000..3dbb91a43ce3
--- /dev/null
+++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Comparator;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.JavaHash;
+import org.apache.iceberg.types.Types;
+
+/** Wrapper to adapt StructLike for use in maps and sets by implementing equals and hashCode. */
+public class StructLikeWrapper {
+
+ public static StructLikeWrapper forType(
+ Types.StructType type, Comparator comparator) {
+ return new StructLikeWrapper(comparator, JavaHash.forType(type));
+ }
+
+ public static StructLikeWrapper forType(Types.StructType type) {
+ return forType(type, Comparators.forType(type));
+ }
+
+ private final Comparator comparator;
+ private final JavaHash structHash;
+ private Integer hashCode;
+ private StructLike struct;
+
+ private StructLikeWrapper(Comparator comparator, JavaHash structHash) {
+ this.comparator = comparator;
+ this.structHash = structHash;
+ this.hashCode = null;
+ }
+
+ /**
+ * Creates a copy of this wrapper that wraps a struct.
+ *
+ * This is equivalent to {@code new StructLikeWrapper(type).set(newStruct)} but is cheaper
+ * because no analysis of the type is necessary.
+ *
+ * @param newStruct a {@link StructLike} row
+ * @return a copy of this wrapper wrapping the give struct
+ */
+ public StructLikeWrapper copyFor(StructLike newStruct) {
+ return new StructLikeWrapper(comparator, structHash).set(newStruct);
+ }
+
+ public StructLikeWrapper set(StructLike newStruct) {
+ this.struct = newStruct;
+ this.hashCode = null;
+ return this;
+ }
+
+ public StructLike get() {
+ return struct;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (!(other instanceof StructLikeWrapper)) {
+ return false;
+ }
+
+ StructLikeWrapper that = (StructLikeWrapper) other;
+
+ if (this.struct == that.struct) {
+ return true;
+ }
+
+ if (this.struct == null ^ that.struct == null) {
+ return false;
+ }
+
+ return comparator.compare(this.struct, that.struct) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ if (hashCode == null) {
+ this.hashCode = structHash.hash(struct);
+ }
+
+ return hashCode;
+ }
+}