diff --git a/.gitignore b/.gitignore
index 5a59990d7..3e0130df7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,7 @@ hs_err_pid*
# Ignore java-version and idea files.
.java-version
.idea
+.vscode
# Ignore Gradle project-specific cache directory
.gradle
diff --git a/pom.xml b/pom.xml
index ff80b9544..e26fa0356 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
3.4
1.4.2
2.4.0
+ 1.2.0
2.18.2
2.43.0
0.16.1
@@ -333,6 +334,13 @@
${delta.hive.version}
+
+
+ org.apache.paimon
+ paimon-bundle
+ ${paimon.version}
+
+
org.apache.spark
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
index 9d89de6aa..9ea7943a7 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
@@ -27,9 +27,10 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
+ public static final String PAIMON = "PAIMON";
public static final String PARQUET = "PARQUET";
public static String[] values() {
- return new String[] {"HUDI", "ICEBERG", "DELTA"};
+ return new String[] {"HUDI", "ICEBERG", "DELTA", "PAIMON"};
}
}
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 6bd5282c7..b27eafd34 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -110,6 +110,18 @@
test
+
+
+ org.apache.paimon
+ paimon-bundle
+
+
+ org.apache.paimon
+ paimon-spark-${spark.version.prefix}
+ ${paimon.version}
+ test
+
+
org.apache.hadoop
diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
new file mode 100644
index 000000000..aa0ac299e
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import static org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Log4j2
+public class PaimonConversionSource implements ConversionSource {
+
+ private final FileStoreTable paimonTable;
+ private final SchemaManager schemaManager;
+ private final SnapshotManager snapshotManager;
+
+ private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance();
+ private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance();
+ private final PaimonPartitionExtractor partitionSpecExtractor =
+ PaimonPartitionExtractor.getInstance();
+
+ public PaimonConversionSource(FileStoreTable paimonTable) {
+ this.paimonTable = paimonTable;
+ this.schemaManager = paimonTable.schemaManager();
+ this.snapshotManager = paimonTable.snapshotManager();
+ }
+
+ @Override
+ public InternalTable getTable(Snapshot snapshot) {
+ TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
+ InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema);
+
+ List partitionKeys = paimonTable.partitionKeys();
+ List partitioningFields =
+ partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema);
+
+ DataLayoutStrategy dataLayoutStrategy =
+ partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : HIVE_STYLE_PARTITION;
+
+ return InternalTable.builder()
+ .name(paimonTable.name())
+ .tableFormat(TableFormat.PAIMON)
+ .readSchema(internalSchema)
+ .layoutStrategy(dataLayoutStrategy)
+ .basePath(paimonTable.location().toString())
+ .partitioningFields(partitioningFields)
+ .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
+ .latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
+ .build();
+ }
+
+ @Override
+ public InternalTable getCurrentTable() {
+ SnapshotManager snapshotManager = paimonTable.snapshotManager();
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null) {
+ throw new ReadException("No snapshots found for table " + paimonTable.name());
+ }
+ return getTable(snapshot);
+ }
+
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+ SnapshotManager snapshotManager = paimonTable.snapshotManager();
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null) {
+ throw new ReadException("No snapshots found for table " + paimonTable.name());
+ }
+
+ InternalTable internalTable = getTable(snapshot);
+ List internalDataFiles =
+ dataFileExtractor.toInternalDataFiles(paimonTable, snapshot);
+
+ return InternalSnapshot.builder()
+ .table(internalTable)
+ .version(Long.toString(snapshot.timeMillis()))
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+ // TODO : Implement pending commits extraction, required for incremental sync
+ .sourceIdentifier(getCommitIdentifier(snapshot))
+ .build();
+ }
+
+ @Override
+ public TableChange getTableChangeForCommit(Snapshot snapshot) {
+ throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
+ }
+
+ @Override
+ public CommitsBacklog getCommitsBacklog(
+ InstantsForIncrementalSync instantsForIncrementalSync) {
+ throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
+ }
+
+ @Override
+ public boolean isIncrementalSyncSafeFrom(Instant instant) {
+ return false; // Incremental sync is not supported yet
+ }
+
+ @Override
+ public String getCommitIdentifier(Snapshot snapshot) {
+ return Long.toString(snapshot.commitIdentifier());
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
new file mode 100644
index 000000000..82a28efc8
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.io.IOException;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+public class PaimonConversionSourceProvider extends ConversionSourceProvider {
+ @Override
+ public ConversionSource getConversionSourceInstance(SourceTable sourceTableConfig) {
+ try {
+ Options catalogOptions = new Options();
+ CatalogContext context = CatalogContext.create(catalogOptions, hadoopConf);
+
+ Path path = new Path(sourceTableConfig.getDataPath());
+ FileIO fileIO = FileIO.get(path, context);
+ FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path);
+
+ return new PaimonConversionSource(paimonTable);
+ } catch (IOException e) {
+ throw new ReadException(e.getMessage());
+ }
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
new file mode 100644
index 000000000..578d4423e
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.util.*;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class PaimonDataFileExtractor {
+
+ private final PaimonPartitionExtractor partitionExtractor =
+ PaimonPartitionExtractor.getInstance();
+
+ private static final PaimonDataFileExtractor INSTANCE = new PaimonDataFileExtractor();
+
+ public static PaimonDataFileExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public List toInternalDataFiles(FileStoreTable table, Snapshot snapshot) {
+ List result = new ArrayList<>();
+ Iterator manifestEntryIterator =
+ newSnapshotReader(table, snapshot).readFileIterator();
+ while (manifestEntryIterator.hasNext()) {
+ result.add(toInternalDataFile(table, manifestEntryIterator.next()));
+ }
+ return result;
+ }
+
+ private InternalDataFile toInternalDataFile(FileStoreTable table, ManifestEntry entry) {
+ return InternalDataFile.builder()
+ .physicalPath(toFullPhysicalPath(table, entry))
+ .fileSizeBytes(entry.file().fileSize())
+ .lastModified(entry.file().creationTimeEpochMillis())
+ .recordCount(entry.file().rowCount())
+ .partitionValues(partitionExtractor.toPartitionValues(table, entry.partition()))
+ .columnStats(toColumnStats(entry.file()))
+ .build();
+ }
+
+ private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) {
+ String basePath = table.location().toString();
+ String bucketPath = "bucket-" + entry.bucket();
+ String filePath = entry.file().fileName();
+
+ Optional partitionPath = partitionExtractor.toPartitionPath(table, entry.partition());
+ if (partitionPath.isPresent()) {
+ return String.join("/", basePath, partitionPath.get(), bucketPath, filePath);
+ } else {
+ return String.join("/", basePath, bucketPath, filePath);
+ }
+ }
+
+ private List toColumnStats(DataFileMeta file) {
+ // TODO: Implement logic to extract column stats from the file meta
+ return Collections.emptyList();
+ }
+
+ private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) {
+ // If the table has primary keys, we read only the top level files
+ // which means we can only consider fully compacted files.
+ if (!table.schema().primaryKeys().isEmpty()) {
+ return table
+ .newSnapshotReader()
+ .withLevel(table.coreOptions().numLevels() - 1)
+ .withSnapshot(snapshot);
+ } else {
+ return table.newSnapshotReader().withSnapshot(snapshot);
+ }
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
new file mode 100644
index 000000000..0c497470e
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Extracts partition spec for Paimon as identity transforms on partition keys. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonPartitionExtractor {
+
+ private final PaimonSchemaExtractor paimonSchemaExtractor = PaimonSchemaExtractor.getInstance();
+
+ private static final PaimonPartitionExtractor INSTANCE = new PaimonPartitionExtractor();
+
+ public static PaimonPartitionExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public List toInternalPartitionFields(
+ List partitionKeys, InternalSchema schema) {
+ if (partitionKeys == null || partitionKeys.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return partitionKeys.stream()
+ .map(key -> toPartitionField(key, schema))
+ .collect(Collectors.toList());
+ }
+
+ public List toPartitionValues(FileStoreTable table, BinaryRow partition) {
+ InternalRowPartitionComputer partitionComputer = newPartitionComputer(table);
+ InternalSchema internalSchema = paimonSchemaExtractor.toInternalSchema(table.schema());
+
+ List partitionValues = new ArrayList<>();
+ for (Map.Entry entry :
+ partitionComputer.generatePartValues(partition).entrySet()) {
+ PartitionValue partitionValue =
+ PartitionValue.builder()
+ .partitionField(toPartitionField(entry.getKey(), internalSchema))
+ .range(Range.scalar(entry.getValue()))
+ .build();
+ partitionValues.add(partitionValue);
+ }
+ return partitionValues;
+ }
+
+ public Optional toPartitionPath(FileStoreTable table, BinaryRow partition) {
+ InternalRowPartitionComputer partitionComputer = newPartitionComputer(table);
+ return partitionComputer.generatePartValues(partition).entrySet().stream()
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .reduce((a, b) -> a + "/" + b);
+ }
+
+ private InternalPartitionField toPartitionField(String key, InternalSchema schema) {
+ InternalField sourceField =
+ findField(schema, key)
+ .orElseThrow(
+ () -> new IllegalArgumentException("Partition key not found in schema: " + key));
+ return InternalPartitionField.builder()
+ .sourceField(sourceField)
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+ }
+
+ private Optional findField(InternalSchema schema, String path) {
+ return schema.getAllFields().stream().filter(f -> f.getPath().equals(path)).findFirst();
+ }
+
+ private InternalRowPartitionComputer newPartitionComputer(FileStoreTable table) {
+ return new InternalRowPartitionComputer(
+ table.coreOptions().partitionDefaultName(),
+ table.store().partitionType(),
+ table.partitionKeys().toArray(new String[0]),
+ table.coreOptions().legacyPartitionName());
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java
new file mode 100644
index 000000000..8bb669bec
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SchemaUtils;
+
+/** Converts Paimon RowType to XTable InternalSchema. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonSchemaExtractor {
+ private static final PaimonSchemaExtractor INSTANCE = new PaimonSchemaExtractor();
+
+ public static PaimonSchemaExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public InternalSchema toInternalSchema(TableSchema paimonSchema) {
+ RowType rowType = paimonSchema.logicalRowType();
+ List fields = toInternalFields(rowType);
+ return InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(fields)
+ .recordKeyFields(primaryKeyFields(paimonSchema, fields))
+ .build();
+ }
+
+ private List primaryKeyFields(
+ TableSchema paimonSchema, List internalFields) {
+ List keys = paimonSchema.primaryKeys();
+ return internalFields.stream()
+ .filter(f -> keys.contains(f.getName()))
+ .collect(Collectors.toList());
+ }
+
+ private List toInternalFields(RowType rowType) {
+ List fields = new ArrayList<>(rowType.getFieldCount());
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ DataField dataField = rowType.getFields().get(i);
+ InternalField internalField =
+ InternalField.builder()
+ .name(dataField.name())
+ .fieldId(dataField.id())
+ .parentPath(null)
+ .schema(
+ fromPaimonType(dataField.type(), dataField.name(), dataField.type().isNullable()))
+ .defaultValue(
+ dataField.type().isNullable() ? InternalField.Constants.NULL_DEFAULT_VALUE : null)
+ .build();
+ fields.add(internalField);
+ }
+ return fields;
+ }
+
+ private InternalSchema fromPaimonType(DataType type, String fieldPath, boolean nullable) {
+ InternalType internalType;
+ List fields = null;
+ Map metadata = null;
+ if (type instanceof CharType || type instanceof VarCharType) {
+ internalType = InternalType.STRING;
+ } else if (type instanceof BooleanType) {
+ internalType = InternalType.BOOLEAN;
+ } else if (type instanceof TinyIntType
+ || type instanceof SmallIntType
+ || type instanceof IntType) {
+ internalType = InternalType.INT;
+ } else if (type instanceof BigIntType) {
+ internalType = InternalType.LONG;
+ } else if (type instanceof FloatType) {
+ internalType = InternalType.FLOAT;
+ } else if (type instanceof DoubleType) {
+ internalType = InternalType.DOUBLE;
+ } else if (type instanceof BinaryType || type instanceof VarBinaryType) {
+ internalType = InternalType.BYTES;
+ } else if (type instanceof DateType) {
+ internalType = InternalType.DATE;
+ } else if (type instanceof TimestampType || type instanceof LocalZonedTimestampType) {
+ internalType = InternalType.TIMESTAMP;
+ metadata =
+ Collections.singletonMap(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
+ } else if (type instanceof DecimalType) {
+ DecimalType d = (DecimalType) type;
+ metadata = new HashMap<>(2, 1.0f);
+ metadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, d.getPrecision());
+ metadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, d.getScale());
+ internalType = InternalType.DECIMAL;
+ } else if (type instanceof RowType) {
+ RowType rt = (RowType) type;
+ List nested = new ArrayList<>(rt.getFieldCount());
+ for (DataField df : rt.getFields()) {
+ nested.add(
+ InternalField.builder()
+ .name(df.name())
+ .fieldId(df.id())
+ .parentPath(fieldPath)
+ .schema(
+ fromPaimonType(
+ df.type(),
+ SchemaUtils.getFullyQualifiedPath(fieldPath, df.name()),
+ df.type().isNullable()))
+ .defaultValue(
+ df.type().isNullable() ? InternalField.Constants.NULL_DEFAULT_VALUE : null)
+ .build());
+ }
+ fields = nested;
+ internalType = InternalType.RECORD;
+ } else if (type instanceof ArrayType) {
+ ArrayType at = (ArrayType) type;
+ InternalSchema elementSchema =
+ fromPaimonType(
+ at.getElementType(),
+ SchemaUtils.getFullyQualifiedPath(
+ fieldPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
+ at.getElementType().isNullable());
+ InternalField elementField =
+ InternalField.builder()
+ .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .parentPath(fieldPath)
+ .schema(elementSchema)
+ .build();
+ fields = Collections.singletonList(elementField);
+ internalType = InternalType.LIST;
+ } else if (type instanceof MapType) {
+ MapType mt = (MapType) type;
+ InternalSchema keySchema =
+ fromPaimonType(
+ mt.getKeyType(),
+ SchemaUtils.getFullyQualifiedPath(
+ fieldPath, InternalField.Constants.MAP_KEY_FIELD_NAME),
+ false);
+ InternalField keyField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+ .parentPath(fieldPath)
+ .schema(keySchema)
+ .build();
+ InternalSchema valueSchema =
+ fromPaimonType(
+ mt.getValueType(),
+ SchemaUtils.getFullyQualifiedPath(
+ fieldPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
+ mt.getValueType().isNullable());
+ InternalField valueField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+ .parentPath(fieldPath)
+ .schema(valueSchema)
+ .build();
+ fields = Arrays.asList(keyField, valueField);
+ internalType = InternalType.MAP;
+ } else {
+ throw new NotSupportedException("Unsupported Paimon type: " + type.asSQLString());
+ }
+
+ return InternalSchema.builder()
+ .name(type.asSQLString())
+ .dataType(internalType)
+ .isNullable(nullable)
+ .metadata(metadata)
+ .fields(fields)
+ .build();
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
index 14395e0db..a5670eacd 100644
--- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
@@ -21,6 +21,7 @@
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
import static org.apache.xtable.model.storage.TableFormat.PARQUET;
import java.nio.file.Path;
@@ -91,6 +92,9 @@ static GenericTable getInstance(
case ICEBERG:
return TestIcebergTable.forStandardSchemaAndPartitioning(
tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration());
+ case PAIMON:
+ return TestPaimonTable.createTable(
+ tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration(), false);
default:
throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
}
@@ -113,6 +117,9 @@ static GenericTable getInstanceWithAdditionalColumns(
case ICEBERG:
return TestIcebergTable.forSchemaWithAdditionalColumnsAndPartitioning(
tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration());
+ case PAIMON:
+ return TestPaimonTable.createTable(
+ tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration(), true);
default:
throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
}
diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index bda54c0f6..1ffaa5369 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -24,6 +24,7 @@
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
import static org.apache.xtable.model.storage.TableFormat.PARQUET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -66,6 +67,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.CleanupMode;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -104,9 +106,12 @@
import org.apache.xtable.iceberg.TestIcebergDataHelper;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.paimon.PaimonConversionSourceProvider;
public class ITConversionController {
- @TempDir public static Path tempDir;
+ @TempDir(cleanup = CleanupMode.NEVER) // TODO remove CleanupMode.NEVER after debugging
+ public static Path tempDir;
+
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -123,6 +128,15 @@ public static void setupOnce() {
.sparkContext()
.hadoopConfiguration()
.set("parquet.avro.write-old-list-structure", "false");
+ sparkSession
+ .sparkContext()
+ .conf()
+ .set(
+ "spark.sql.extensions",
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .set("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
+ .set("spark.sql.catalog.paimon.warehouse", tempDir.toUri().toString());
+
jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
}
@@ -142,10 +156,13 @@ private static Stream testCasesWithPartitioningAndSyncModes() {
private static Stream generateTestParametersForFormatsSyncModesAndPartitioning() {
List arguments = new ArrayList<>();
- for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
+ for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) {
for (SyncMode syncMode : SyncMode.values()) {
+ if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL)
+ continue; // Paimon does not support incremental sync yet
+
for (boolean isPartitioned : new boolean[] {true, false}) {
- arguments.add(Arguments.of(sourceTableFormat, syncMode, isPartitioned));
+ arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned));
}
}
}
@@ -170,23 +187,37 @@ private static Stream testCasesWithSyncModes() {
}
private ConversionSourceProvider> getConversionSourceProvider(String sourceTableFormat) {
- if (sourceTableFormat.equalsIgnoreCase(HUDI)) {
- ConversionSourceProvider hudiConversionSourceProvider =
- new HudiConversionSourceProvider();
- hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
- return hudiConversionSourceProvider;
- } else if (sourceTableFormat.equalsIgnoreCase(DELTA)) {
- ConversionSourceProvider deltaConversionSourceProvider =
- new DeltaConversionSourceProvider();
- deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
- return deltaConversionSourceProvider;
- } else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) {
- ConversionSourceProvider icebergConversionSourceProvider =
- new IcebergConversionSourceProvider();
- icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
- return icebergConversionSourceProvider;
- } else {
- throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat);
+ switch (sourceTableFormat.toUpperCase()) {
+ case HUDI:
+ {
+ ConversionSourceProvider hudiConversionSourceProvider =
+ new HudiConversionSourceProvider();
+ hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return hudiConversionSourceProvider;
+ }
+ case DELTA:
+ {
+ ConversionSourceProvider deltaConversionSourceProvider =
+ new DeltaConversionSourceProvider();
+ deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return deltaConversionSourceProvider;
+ }
+ case ICEBERG:
+ {
+ ConversionSourceProvider icebergConversionSourceProvider =
+ new IcebergConversionSourceProvider();
+ icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return icebergConversionSourceProvider;
+ }
+ case PAIMON:
+ {
+ ConversionSourceProvider paimonConversionSourceProvider =
+ new PaimonConversionSourceProvider();
+ paimonConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return paimonConversionSourceProvider;
+ }
+ default:
+ throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat);
}
}
@@ -486,11 +517,9 @@ public void testTimeTravelQueries(String sourceTableFormat) throws Exception {
private static List getOtherFormats(String sourceTableFormat) {
return Arrays.stream(TableFormat.values())
- .filter(
- format ->
- !format.equals(sourceTableFormat)
- && !format.equals(
- PARQUET)) // excluded file formats because upset, insert etc. not supported
+ .filter(fmt -> !fmt.equals(sourceTableFormat))
+ .filter(fmt -> !fmt.equals(PAIMON)) // Paimon target is not supported yet
+ .filter(fmt -> !fmt.equals(PARQUET)) // upserts/inserts are not supported in Parquet
.collect(Collectors.toList());
}
@@ -911,34 +940,34 @@ private void checkDatasetEquivalence(
}));
String[] selectColumnsArr = sourceTable.getColumnsToSelect().toArray(new String[] {});
- List dataset1Rows = sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+ List sourceRowsList = sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
targetRowsByFormat.forEach(
- (format, targetRows) -> {
- List dataset2Rows =
+ (targetFormat, targetRows) -> {
+ List targetRowsList =
targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
assertEquals(
- dataset1Rows.size(),
- dataset2Rows.size(),
+ sourceRowsList.size(),
+ targetRowsList.size(),
String.format(
"Datasets have different row counts when reading from Spark. Source: %s, Target: %s",
- sourceFormat, format));
+ sourceFormat, targetFormat));
// sanity check the count to ensure test is set up properly
if (expectedCount != null) {
- assertEquals(expectedCount, dataset1Rows.size());
+ assertEquals(expectedCount, sourceRowsList.size());
} else {
// if count is not known ahead of time, ensure datasets are non-empty
- assertFalse(dataset1Rows.isEmpty());
+ assertFalse(sourceRowsList.isEmpty());
}
- if (containsUUIDFields(dataset1Rows) && containsUUIDFields(dataset2Rows)) {
- compareDatasetWithUUID(dataset1Rows, dataset2Rows);
+ if (containsUUIDFields(sourceRowsList) && containsUUIDFields(targetRowsList)) {
+ compareDatasetWithUUID(sourceRowsList, targetRowsList);
} else {
assertEquals(
- dataset1Rows,
- dataset2Rows,
+ sourceRowsList,
+ targetRowsList,
String.format(
"Datasets are not equivalent when reading from Spark. Source: %s, Target: %s",
- sourceFormat, format));
+ sourceFormat, targetFormat));
}
});
}
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
new file mode 100644
index 000000000..551020074
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.BucketEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ParameterUtils;
+
+public class TestPaimonTable implements GenericTable {
+
+ private final Random random = new Random();
+ private final FileStoreTable paimonTable;
+ private final String partitionField;
+
+ public TestPaimonTable(FileStoreTable paimonTable, String partitionField) {
+ this.paimonTable = paimonTable;
+ this.partitionField = partitionField;
+ }
+
+ public static GenericTable createTable(
+ String tableName,
+ String partitionField,
+ Path tempDir,
+ Configuration hadoopConf,
+ boolean additionalColumns) {
+ String basePath = initBasePath(tempDir, tableName);
+ Catalog catalog = createFilesystemCatalog(basePath, hadoopConf);
+ FileStoreTable paimonTable = createTable(catalog, partitionField, additionalColumns);
+
+ System.out.println(
+ "Initialized Paimon test table at base path: "
+ + basePath
+ + " with partition field: "
+ + partitionField
+ + " and additional columns: "
+ + additionalColumns);
+
+ return new TestPaimonTable(paimonTable, partitionField);
+ }
+
+ public static Catalog createFilesystemCatalog(String basePath, Configuration hadoopConf) {
+ CatalogContext context = CatalogContext.create(new org.apache.paimon.fs.Path(basePath));
+ return CatalogFactory.createCatalog(context);
+ }
+
+ public static FileStoreTable createTable(
+ Catalog catalog, String partitionField, boolean additionalColumns) {
+ try {
+ catalog.createDatabase("test_db", true);
+ Identifier identifier = Identifier.create("test_db", "test_table");
+ Schema schema = buildSchema(partitionField, additionalColumns);
+ catalog.createTable(identifier, schema, true);
+ return (FileStoreTable) catalog.getTable(identifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Schema buildSchema(String partitionField, boolean additionalColumns) {
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .column("created_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .column("updated_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .column("is_active", DataTypes.BOOLEAN())
+ .column("description", DataTypes.VARCHAR(255))
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1");
+
+ if (partitionField != null) {
+ builder
+ .primaryKey("id", partitionField)
+ .column(partitionField, DataTypes.STRING())
+ .partitionKeys(partitionField);
+ }
+
+ if (additionalColumns) {
+ builder.column("extra_info", DataTypes.STRING()).column("extra_value", DataTypes.DOUBLE());
+ }
+
+ return builder.build();
+ }
+
+ private GenericRow buildGenericRow(int rowIdx, TableSchema schema, String partitionValue) {
+ List
+
+
+ org.apache.paimon
+ paimon-bundle
+
+
+ org.apache.paimon
+ paimon-spark-${spark.version.prefix}
+ ${paimon.version}
+ test
+
org.apache.parquet
diff --git a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
index 0e7a7e26b..c53aa52ea 100644
--- a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
+++ b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
@@ -22,6 +22,7 @@
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -97,6 +98,15 @@ public static void setupOnce() {
.sparkContext()
.hadoopConfiguration()
.set("parquet.avro.write-old-list-structure", "false");
+ sparkSession
+ .sparkContext()
+ .conf()
+ .set(
+ "spark.sql.extensions",
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .set("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
+ .set("spark.sql.catalog.paimon.warehouse", tempDir.toUri().toString());
+
jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
} catch (IOException e) {
throw new RuntimeException(e);
@@ -116,14 +126,18 @@ public void setUp() {
new DeltaConversionSourceProvider();
ConversionSourceProvider icebergConversionSourceProvider =
new IcebergConversionSourceProvider();
+ ConversionSourceProvider paimonConversionSourceProvider =
+ new org.apache.xtable.paimon.PaimonConversionSourceProvider();
hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
+ paimonConversionSourceProvider.init(jsc.hadoopConfiguration());
sourceProviders.put(HUDI, hudiConversionSourceProvider);
sourceProviders.put(DELTA, deltaConversionSourceProvider);
sourceProviders.put(ICEBERG, icebergConversionSourceProvider);
+ sourceProviders.put(PAIMON, paimonConversionSourceProvider);
this.conversionService =
new ConversionService(
@@ -232,7 +246,7 @@ public void testVariousOperations(String sourceTableFormat, boolean isPartitione
private static Stream generateTestParametersFormatsAndPartitioning() {
List arguments = new ArrayList<>();
- for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
+ for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) {
for (boolean isPartitioned : new boolean[] {true, false}) {
arguments.add(Arguments.of(sourceTableFormat, isPartitioned));
}
@@ -243,6 +257,7 @@ private static Stream generateTestParametersFormatsAndPartitioning()
protected static List getOtherFormats(String sourceTableFormat) {
return Arrays.stream(TableFormat.values())
.filter(format -> !format.equals(sourceTableFormat))
+ .filter(format -> !format.equals(PAIMON)) // Paimon target not supported yet
.collect(Collectors.toList());
}