Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ public B addProperty(String key, String value) {
return (B) this;
}

public B setInternalProperties(Map<String, String> internalProperties) {
public B setInternalProperties(@Nonnull Map<String, String> internalProperties) {
this.internalProperties = new HashMap<>(internalProperties);
return (B) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Preconditions;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -47,6 +50,40 @@ public class IcebergTableLikeEntity extends TableLikeEntity {
public static final String LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY =
"last-notification-timestamp";

/*
* The following constants are copied from the TableMetadataParser in Iceberg
* They represent the keys used in the table metadata JSON file.
*/

public static final String FORMAT_VERSION = "format-version";
public static final String TABLE_UUID = "table-uuid";
public static final String LOCATION = "location";
public static final String LAST_SEQUENCE_NUMBER = "last-sequence-number";
public static final String LAST_UPDATED_MILLIS = "last-updated-ms";
public static final String LAST_COLUMN_ID = "last-column-id";
public static final String SCHEMA = "schema";
public static final String SCHEMAS = "schemas";
public static final String CURRENT_SCHEMA_ID = "current-schema-id";
public static final String PARTITION_SPEC = "partition-spec";
public static final String PARTITION_SPECS = "partition-specs";
public static final String DEFAULT_SPEC_ID = "default-spec-id";
public static final String LAST_PARTITION_ID = "last-partition-id";
public static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id";
public static final String SORT_ORDERS = "sort-orders";
public static final String PROPERTIES = "properties";
public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
public static final String REFS = "refs";
public static final String SNAPSHOTS = "snapshots";
public static final String SNAPSHOT_ID = "snapshot-id";
public static final String TIMESTAMP_MS = "timestamp-ms";
public static final String SNAPSHOT_LOG = "snapshot-log";
public static final String METADATA_FILE = "metadata-file";
public static final String METADATA_LOG = "metadata-log";
public static final String STATISTICS = "statistics";
public static final String PARTITION_STATISTICS = "partition-statistics";
public static final String ENCRYPTION_KEYS = "encryption-keys";
public static final String NEXT_ROW_ID = "next-row-id";

public IcebergTableLikeEntity(PolarisBaseEntity sourceEntity) {
super(sourceEntity);
PolarisEntitySubType subType = getSubType();
Expand Down Expand Up @@ -83,6 +120,24 @@ public String getBaseLocation() {
}

public static class Builder extends PolarisEntity.BaseBuilder<IcebergTableLikeEntity, Builder> {

public Builder(
PolarisEntitySubType subType,
TableIdentifier identifier,
Map<String, String> properties,
Map<String, String> internalProperties,
String metadataLocation) {
super();
setType(PolarisEntityType.TABLE_LIKE);
setSubType(subType);
setProperties(properties);
setInternalProperties(internalProperties);
// order here matters. properties and internal properties must be set prior to the following
// properties, which merely update the map, whereas the above calls replace the map entirely.
setTableIdentifier(identifier);
setMetadataLocation(metadataLocation);
}

public Builder(
PolarisEntitySubType subType, TableIdentifier identifier, String metadataLocation) {
super();
Expand Down Expand Up @@ -121,6 +176,29 @@ public Builder setBaseLocation(String location) {
return this;
}

@Override
public Builder setInternalProperties(@Nonnull Map<String, String> internalProperties) {
// ensure we carry forward the parent namespace and metadata location if already set.
// however, we allow for overriding them if explicitly specified in the provided map.
Map<String, String> newInternalProperties = new HashMap<>();
if (this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY) != null) {
newInternalProperties.put(
NamespaceEntity.PARENT_NAMESPACE_KEY,
this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY));
}
if (this.internalProperties.get(METADATA_LOCATION_KEY) != null) {
newInternalProperties.put(
METADATA_LOCATION_KEY, this.internalProperties.get(METADATA_LOCATION_KEY));
}
if (this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY) != null) {
newInternalProperties.put(
LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY,
this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY));
}
newInternalProperties.putAll(internalProperties);
return super.setInternalProperties(newInternalProperties);
}

public Builder setMetadataLocation(String location) {
internalProperties.put(METADATA_LOCATION_KEY, location);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testExactWeightCalculation() {
"location",
"{\"a\": \"b\"}",
Optional.of("{\"c\": \"d\", \"e\": \"f\"}")));
Assertions.assertThat(preciseWeight).isEqualTo(1090);
Assertions.assertThat(preciseWeight).isEqualTo(1183); // :( this is hard-coded
}

private static Map<String, String> getPropertiesMap(String properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1566,14 +1566,19 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
"Generic table with same name already exists: %s", tableIdentifier);
}
}
Map<String, String> storedProperties = buildTableMetadataPropertiesMap(metadata);
IcebergTableLikeEntity entity =
IcebergTableLikeEntity.of(resolvedPath == null ? null : resolvedPath.getRawLeafEntity());
String existingLocation;
if (null == entity) {
existingLocation = null;
entity =
new IcebergTableLikeEntity.Builder(
PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier, newLocation)
PolarisEntitySubType.ICEBERG_TABLE,
tableIdentifier,
Map.of(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we're using the builder pattern, why have rich constructor parameters? Why not call .setABC()?

In this case the Map is empty, so this parameter is redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is in setting the internalProperties map in the builder methods because we have these helper methods, such as setMetadataLocation that aren't fields themselves, but modify entries in the map. If we do call the following, we're fine

builder
    .setInternalProperties(props)
    .setMetadataLocation(newLocation)
    .build();

but if we reverse the order and call the following, we're broken:

builder
    .setMetadataLocation(newLocation)
    .setInternalProperties(props)
    .build();

It's not obvious from the caller's perspective, but setMetadataLocation modifies the underlying map, then the setInternalProperties call completely overwrites the map, losing the value set in the previous call.

With the existing constructor, it's impossible to order the setting of the metadataLocation and the internalProperties via the builder methods. If we used the builder for all properties all the time, it would be fine, but because we pass in the newLocation parameter as a constructor arg, if we set the internalProperties field using the builder method, we lose the location parameter we just passed in.

Copy link
Contributor

@dimas-b dimas-b Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, TIL 🤔 However, having this kind of effects in the codebase it pretty risky in the long term maintenance perspective, IMHO. Would it be reasonable to refactor the builders / related code to allow for more intuitive usage (in another PR, of course)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking about making metadataLocation and other settable map entries into distinct fields in the Builder so that they can just be added to the map in the build call, but... yeah, a future PR

storedProperties,
newLocation)
.setCatalogId(getCatalogId())
.setBaseLocation(metadata.location())
.setId(
Expand All @@ -1583,6 +1588,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
existingLocation = entity.getMetadataLocation();
entity =
new IcebergTableLikeEntity.Builder(entity)
.setInternalProperties(storedProperties)
.setBaseLocation(metadata.location())
.setMetadataLocation(newLocation)
.build();
Expand Down Expand Up @@ -1712,6 +1718,41 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
}
}

private static Map<String, String> buildTableMetadataPropertiesMap(TableMetadata metadata) {
Map<String, String> storedProperties = new HashMap<>();
storedProperties.put(IcebergTableLikeEntity.LOCATION, metadata.location());
storedProperties.put(
IcebergTableLikeEntity.FORMAT_VERSION, String.valueOf(metadata.formatVersion()));
storedProperties.put(IcebergTableLikeEntity.TABLE_UUID, metadata.uuid());
storedProperties.put(
IcebergTableLikeEntity.CURRENT_SCHEMA_ID, String.valueOf(metadata.currentSchemaId()));
if (metadata.currentSnapshot() != null) {
storedProperties.put(
IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
String.valueOf(metadata.currentSnapshot().snapshotId()));
}
storedProperties.put(
IcebergTableLikeEntity.LAST_COLUMN_ID, String.valueOf(metadata.lastColumnId()));
storedProperties.put(IcebergTableLikeEntity.NEXT_ROW_ID, String.valueOf(metadata.nextRowId()));
storedProperties.put(
IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER, String.valueOf(metadata.lastSequenceNumber()));
storedProperties.put(
IcebergTableLikeEntity.LAST_UPDATED_MILLIS, String.valueOf(metadata.lastUpdatedMillis()));
if (metadata.sortOrder() != null) {
storedProperties.put(
IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID,
String.valueOf(metadata.defaultSortOrderId()));
}
if (metadata.spec() != null) {
storedProperties.put(
IcebergTableLikeEntity.DEFAULT_SPEC_ID, String.valueOf(metadata.defaultSpecId()));
storedProperties.put(
IcebergTableLikeEntity.LAST_PARTITION_ID,
String.valueOf(metadata.lastAssignedPartitionId()));
}
return storedProperties;
}

/**
* An implementation of {@link ViewOperations} that integrates with {@link IcebergCatalog}. Much
* of this code was originally copied from {@link org.apache.iceberg.view.BaseViewOperations}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -62,6 +63,7 @@
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -103,12 +105,14 @@
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.NamespaceEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
import org.apache.polaris.core.exceptions.CommitConflictException;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
Expand Down Expand Up @@ -154,6 +158,7 @@
import org.apache.polaris.service.types.TableUpdateNotification;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.assertj.core.configuration.PreferredAssumptionException;
Expand Down Expand Up @@ -2282,6 +2287,96 @@ public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataO
}
}

@Test
public void testTableInternalPropertiesStoredOnCommit() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Non-blocker] Since we are testing the extraction of metadata fields, it would be good if we could add/or parametrize this test with different format-version (1,2,3). Technically the TableMetadata should be format-version agnostic, but a sanity check would be useful in case there's some upstream bug : )

Assumptions.assumeTrue(
requiresNamespaceCreate(),
"Only applicable if namespaces must be created before adding children");

catalog.createNamespace(NS);
catalog.buildTable(TABLE, SCHEMA).create();
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
Table afterAppend = catalog.loadTable(TABLE);
EntityResult schemaResult =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
EntityResult schemaResult =
EntityResult namespaceResult =

[nit] Let's use namespace here to be consistent with polaris' terminology

metaStoreManager.readEntityByName(
polarisContext,
List.of(catalogEntity),
PolarisEntityType.NAMESPACE,
PolarisEntitySubType.NULL_SUBTYPE,
NS.toString());
Assertions.assertThat(schemaResult).returns(true, EntityResult::isSuccess);
EntityResult tableResult =
metaStoreManager.readEntityByName(
polarisContext,
List.of(catalogEntity, schemaResult.getEntity()),
PolarisEntityType.TABLE_LIKE,
PolarisEntitySubType.ICEBERG_TABLE,
TABLE.name());
Assertions.assertThat(tableResult)
.returns(true, EntityResult::isSuccess)
.extracting(er -> PolarisEntity.of(er.getEntity()))
.extracting(PolarisEntity::getInternalPropertiesAsMap)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry(NamespaceEntity.PARENT_NAMESPACE_KEY, NS.toString())
.containsEntry(
IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
String.valueOf(afterAppend.currentSnapshot().snapshotId()))
.containsEntry(IcebergTableLikeEntity.LOCATION, afterAppend.location())
.containsEntry(IcebergTableLikeEntity.TABLE_UUID, afterAppend.uuid().toString())
.containsEntry(
IcebergTableLikeEntity.CURRENT_SCHEMA_ID,
String.valueOf(afterAppend.schema().schemaId()))
.containsEntry(
IcebergTableLikeEntity.LAST_COLUMN_ID,
afterAppend.schema().columns().stream()
.max(Comparator.comparing(Types.NestedField::fieldId))
.map(Types.NestedField::fieldId)
.orElse(0)
.toString())
.containsEntry(
IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER,
String.valueOf(afterAppend.currentSnapshot().sequenceNumber()));

catalog.loadTable(TABLE).refresh();
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit();
validatePropertiesUpdated(
schemaResult,
IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
tbl -> String.valueOf(tbl.currentSnapshot().snapshotId()));

catalog.loadTable(TABLE).refresh();
catalog.loadTable(TABLE).updateSchema().addColumn("new_col", Types.LongType.get()).commit();
validatePropertiesUpdated(
schemaResult,
IcebergTableLikeEntity.CURRENT_SCHEMA_ID,
tbl -> String.valueOf(tbl.schema().schemaId()));

catalog.loadTable(TABLE).refresh();
catalog.loadTable(TABLE).replaceSortOrder().desc("new_col", NullOrder.NULLS_FIRST).commit();
validatePropertiesUpdated(
schemaResult,
IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID,
table -> String.valueOf(table.sortOrder().orderId()));
}

private void validatePropertiesUpdated(
EntityResult schemaResult, String key, Function<Table, String> expectedValue) {
Table afterUpdate = catalog.loadTable(TABLE);
EntityResult tableResult =
metaStoreManager.readEntityByName(
polarisContext,
List.of(catalogEntity, schemaResult.getEntity()),
PolarisEntityType.TABLE_LIKE,
PolarisEntitySubType.ICEBERG_TABLE,
TABLE.name());
Assertions.assertThat(tableResult)
.returns(true, EntityResult::isSuccess)
.extracting(er -> PolarisEntity.of(er.getEntity()))
.extracting(PolarisEntity::getInternalPropertiesAsMap)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry(key, expectedValue.apply(afterUpdate));
}

@Test
public void testEventsAreEmitted() {
IcebergCatalog catalog = catalog();
Expand Down