diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java index 98701af458..1cd4d59473 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java @@ -411,7 +411,7 @@ public B addProperty(String key, String value) { return (B) this; } - public B setInternalProperties(Map internalProperties) { + public B setInternalProperties(@Nonnull Map internalProperties) { this.internalProperties = new HashMap<>(internalProperties); return (B) this; } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java index ea7af63638..f1b7567792 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java @@ -20,7 +20,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Preconditions; +import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -47,6 +50,40 @@ public class IcebergTableLikeEntity extends TableLikeEntity { public static final String LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY = "last-notification-timestamp"; + /* + * The following constants are copied from the TableMetadataParser in Iceberg + * They represent the keys used in the table metadata JSON file. + */ + + public static final String FORMAT_VERSION = "format-version"; + public static final String TABLE_UUID = "table-uuid"; + public static final String LOCATION = "location"; + public static final String LAST_SEQUENCE_NUMBER = "last-sequence-number"; + public static final String LAST_UPDATED_MILLIS = "last-updated-ms"; + public static final String LAST_COLUMN_ID = "last-column-id"; + public static final String SCHEMA = "schema"; + public static final String SCHEMAS = "schemas"; + public static final String CURRENT_SCHEMA_ID = "current-schema-id"; + public static final String PARTITION_SPEC = "partition-spec"; + public static final String PARTITION_SPECS = "partition-specs"; + public static final String DEFAULT_SPEC_ID = "default-spec-id"; + public static final String LAST_PARTITION_ID = "last-partition-id"; + public static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id"; + public static final String SORT_ORDERS = "sort-orders"; + public static final String PROPERTIES = "properties"; + public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; + public static final String REFS = "refs"; + public static final String SNAPSHOTS = "snapshots"; + public static final String SNAPSHOT_ID = "snapshot-id"; + public static final String TIMESTAMP_MS = "timestamp-ms"; + public static final String SNAPSHOT_LOG = "snapshot-log"; + public static final String METADATA_FILE = "metadata-file"; + public static final String METADATA_LOG = "metadata-log"; + public static final String STATISTICS = "statistics"; + public static final String PARTITION_STATISTICS = "partition-statistics"; + public static final String ENCRYPTION_KEYS = "encryption-keys"; + public static final String NEXT_ROW_ID = "next-row-id"; + public IcebergTableLikeEntity(PolarisBaseEntity sourceEntity) { super(sourceEntity); PolarisEntitySubType subType = getSubType(); @@ -83,6 +120,24 @@ public String getBaseLocation() { } public static class Builder extends PolarisEntity.BaseBuilder { + + public Builder( + PolarisEntitySubType subType, + TableIdentifier identifier, + Map properties, + Map internalProperties, + String metadataLocation) { + super(); + setType(PolarisEntityType.TABLE_LIKE); + setSubType(subType); + setProperties(properties); + setInternalProperties(internalProperties); + // order here matters. properties and internal properties must be set prior to the following + // properties, which merely update the map, whereas the above calls replace the map entirely. + setTableIdentifier(identifier); + setMetadataLocation(metadataLocation); + } + public Builder( PolarisEntitySubType subType, TableIdentifier identifier, String metadataLocation) { super(); @@ -121,6 +176,29 @@ public Builder setBaseLocation(String location) { return this; } + @Override + public Builder setInternalProperties(@Nonnull Map internalProperties) { + // ensure we carry forward the parent namespace and metadata location if already set. + // however, we allow for overriding them if explicitly specified in the provided map. + Map newInternalProperties = new HashMap<>(); + if (this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY) != null) { + newInternalProperties.put( + NamespaceEntity.PARENT_NAMESPACE_KEY, + this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY)); + } + if (this.internalProperties.get(METADATA_LOCATION_KEY) != null) { + newInternalProperties.put( + METADATA_LOCATION_KEY, this.internalProperties.get(METADATA_LOCATION_KEY)); + } + if (this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY) != null) { + newInternalProperties.put( + LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY, + this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY)); + } + newInternalProperties.putAll(internalProperties); + return super.setInternalProperties(newInternalProperties); + } + public Builder setMetadataLocation(String location) { internalProperties.put(METADATA_LOCATION_KEY, location); return this; diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java index 109a521723..780fbc9448 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java @@ -124,7 +124,7 @@ public void testExactWeightCalculation() { "location", "{\"a\": \"b\"}", Optional.of("{\"c\": \"d\", \"e\": \"f\"}"))); - Assertions.assertThat(preciseWeight).isEqualTo(1090); + Assertions.assertThat(preciseWeight).isEqualTo(1183); // :( this is hard-coded } private static Map getPropertiesMap(String properties) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 8d237fb9e2..f02c617154 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -1566,6 +1566,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { "Generic table with same name already exists: %s", tableIdentifier); } } + Map storedProperties = buildTableMetadataPropertiesMap(metadata); IcebergTableLikeEntity entity = IcebergTableLikeEntity.of(resolvedPath == null ? null : resolvedPath.getRawLeafEntity()); String existingLocation; @@ -1573,7 +1574,11 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { existingLocation = null; entity = new IcebergTableLikeEntity.Builder( - PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier, newLocation) + PolarisEntitySubType.ICEBERG_TABLE, + tableIdentifier, + Map.of(), + storedProperties, + newLocation) .setCatalogId(getCatalogId()) .setBaseLocation(metadata.location()) .setId( @@ -1583,6 +1588,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { existingLocation = entity.getMetadataLocation(); entity = new IcebergTableLikeEntity.Builder(entity) + .setInternalProperties(storedProperties) .setBaseLocation(metadata.location()) .setMetadataLocation(newLocation) .build(); @@ -1712,6 +1718,41 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { } } + private static Map buildTableMetadataPropertiesMap(TableMetadata metadata) { + Map storedProperties = new HashMap<>(); + storedProperties.put(IcebergTableLikeEntity.LOCATION, metadata.location()); + storedProperties.put( + IcebergTableLikeEntity.FORMAT_VERSION, String.valueOf(metadata.formatVersion())); + storedProperties.put(IcebergTableLikeEntity.TABLE_UUID, metadata.uuid()); + storedProperties.put( + IcebergTableLikeEntity.CURRENT_SCHEMA_ID, String.valueOf(metadata.currentSchemaId())); + if (metadata.currentSnapshot() != null) { + storedProperties.put( + IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID, + String.valueOf(metadata.currentSnapshot().snapshotId())); + } + storedProperties.put( + IcebergTableLikeEntity.LAST_COLUMN_ID, String.valueOf(metadata.lastColumnId())); + storedProperties.put(IcebergTableLikeEntity.NEXT_ROW_ID, String.valueOf(metadata.nextRowId())); + storedProperties.put( + IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER, String.valueOf(metadata.lastSequenceNumber())); + storedProperties.put( + IcebergTableLikeEntity.LAST_UPDATED_MILLIS, String.valueOf(metadata.lastUpdatedMillis())); + if (metadata.sortOrder() != null) { + storedProperties.put( + IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID, + String.valueOf(metadata.defaultSortOrderId())); + } + if (metadata.spec() != null) { + storedProperties.put( + IcebergTableLikeEntity.DEFAULT_SPEC_ID, String.valueOf(metadata.defaultSpecId())); + storedProperties.put( + IcebergTableLikeEntity.LAST_PARTITION_ID, + String.valueOf(metadata.lastAssignedPartitionId())); + } + return storedProperties; + } + /** * An implementation of {@link ViewOperations} that integrates with {@link IcebergCatalog}. Much * of this code was originally copied from {@link org.apache.iceberg.view.BaseViewOperations}. diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index 1c9a43898c..46d7f55183 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -44,6 +44,7 @@ import java.lang.reflect.Method; import java.time.Clock; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,6 +63,7 @@ import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; @@ -103,12 +105,14 @@ import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.NamespaceEntity; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PrincipalEntity; import org.apache.polaris.core.entity.TaskEntity; +import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; import org.apache.polaris.core.exceptions.CommitConflictException; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; @@ -154,6 +158,7 @@ import org.apache.polaris.service.types.TableUpdateNotification; import org.assertj.core.api.AbstractCollectionAssert; import org.assertj.core.api.Assertions; +import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.ListAssert; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.assertj.core.configuration.PreferredAssumptionException; @@ -2282,6 +2287,96 @@ public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataO } } + @Test + public void testTableInternalPropertiesStoredOnCommit() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + + catalog.createNamespace(NS); + catalog.buildTable(TABLE, SCHEMA).create(); + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + Table afterAppend = catalog.loadTable(TABLE); + EntityResult schemaResult = + metaStoreManager.readEntityByName( + polarisContext, + List.of(catalogEntity), + PolarisEntityType.NAMESPACE, + PolarisEntitySubType.NULL_SUBTYPE, + NS.toString()); + Assertions.assertThat(schemaResult).returns(true, EntityResult::isSuccess); + EntityResult tableResult = + metaStoreManager.readEntityByName( + polarisContext, + List.of(catalogEntity, schemaResult.getEntity()), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE, + TABLE.name()); + Assertions.assertThat(tableResult) + .returns(true, EntityResult::isSuccess) + .extracting(er -> PolarisEntity.of(er.getEntity())) + .extracting(PolarisEntity::getInternalPropertiesAsMap) + .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) + .containsEntry(NamespaceEntity.PARENT_NAMESPACE_KEY, NS.toString()) + .containsEntry( + IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID, + String.valueOf(afterAppend.currentSnapshot().snapshotId())) + .containsEntry(IcebergTableLikeEntity.LOCATION, afterAppend.location()) + .containsEntry(IcebergTableLikeEntity.TABLE_UUID, afterAppend.uuid().toString()) + .containsEntry( + IcebergTableLikeEntity.CURRENT_SCHEMA_ID, + String.valueOf(afterAppend.schema().schemaId())) + .containsEntry( + IcebergTableLikeEntity.LAST_COLUMN_ID, + afterAppend.schema().columns().stream() + .max(Comparator.comparing(Types.NestedField::fieldId)) + .map(Types.NestedField::fieldId) + .orElse(0) + .toString()) + .containsEntry( + IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER, + String.valueOf(afterAppend.currentSnapshot().sequenceNumber())); + + catalog.loadTable(TABLE).refresh(); + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit(); + validatePropertiesUpdated( + schemaResult, + IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID, + tbl -> String.valueOf(tbl.currentSnapshot().snapshotId())); + + catalog.loadTable(TABLE).refresh(); + catalog.loadTable(TABLE).updateSchema().addColumn("new_col", Types.LongType.get()).commit(); + validatePropertiesUpdated( + schemaResult, + IcebergTableLikeEntity.CURRENT_SCHEMA_ID, + tbl -> String.valueOf(tbl.schema().schemaId())); + + catalog.loadTable(TABLE).refresh(); + catalog.loadTable(TABLE).replaceSortOrder().desc("new_col", NullOrder.NULLS_FIRST).commit(); + validatePropertiesUpdated( + schemaResult, + IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID, + table -> String.valueOf(table.sortOrder().orderId())); + } + + private void validatePropertiesUpdated( + EntityResult schemaResult, String key, Function expectedValue) { + Table afterUpdate = catalog.loadTable(TABLE); + EntityResult tableResult = + metaStoreManager.readEntityByName( + polarisContext, + List.of(catalogEntity, schemaResult.getEntity()), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE, + TABLE.name()); + Assertions.assertThat(tableResult) + .returns(true, EntityResult::isSuccess) + .extracting(er -> PolarisEntity.of(er.getEntity())) + .extracting(PolarisEntity::getInternalPropertiesAsMap) + .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) + .containsEntry(key, expectedValue.apply(afterUpdate)); + } + @Test public void testEventsAreEmitted() { IcebergCatalog catalog = catalog();