Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ hs_err_pid*
# Ignore java-version and idea files.
.java-version
.idea
.vscode

# Ignore Gradle project-specific cache directory
.gradle
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<spark.version.prefix>3.4</spark.version.prefix>
<iceberg.version>1.4.2</iceberg.version>
<delta.version>2.4.0</delta.version>
<paimon.version>1.2.0</paimon.version>
<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
<apache.rat.version>0.16.1</apache.rat.version>
Expand Down Expand Up @@ -333,6 +334,13 @@
<version>${delta.hive.version}</version>
</dependency>

<!-- Paimon -->
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
<version>${paimon.version}</version>
</dependency>

<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
public static final String PAIMON = "PAIMON";
public static final String PARQUET = "PARQUET";

public static String[] values() {
return new String[] {"HUDI", "ICEBERG", "DELTA"};
return new String[] {"HUDI", "ICEBERG", "DELTA", "PAIMON"};
}
}
12 changes: 12 additions & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@
<scope>test</scope>
</dependency>

<!-- Paimon dependencies -->
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-spark-${spark.version.prefix}</artifactId>
<version>${paimon.version}</version>
<scope>test</scope>
</dependency>

<!-- Hadoop dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.paimon;

import static org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION;

import java.io.IOException;
import java.time.Instant;
import java.util.List;

import lombok.extern.log4j.Log4j2;

import org.apache.paimon.Snapshot;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.*;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.extractor.ConversionSource;

@Log4j2
public class PaimonConversionSource implements ConversionSource<Snapshot> {

private final FileStoreTable paimonTable;
private final SchemaManager schemaManager;
private final SnapshotManager snapshotManager;

private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance();
private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance();
private final PaimonPartitionExtractor partitionSpecExtractor =
PaimonPartitionExtractor.getInstance();

public PaimonConversionSource(FileStoreTable paimonTable) {
this.paimonTable = paimonTable;
this.schemaManager = paimonTable.schemaManager();
this.snapshotManager = paimonTable.snapshotManager();
}

@Override
public InternalTable getTable(Snapshot snapshot) {
TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema);

List<String> partitionKeys = paimonTable.partitionKeys();
List<InternalPartitionField> partitioningFields =
partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema);

DataLayoutStrategy dataLayoutStrategy =
partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : HIVE_STYLE_PARTITION;

return InternalTable.builder()
.name(paimonTable.name())
.tableFormat(TableFormat.PAIMON)
.readSchema(internalSchema)
.layoutStrategy(dataLayoutStrategy)
.basePath(paimonTable.location().toString())
.partitioningFields(partitioningFields)
.latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
.latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
.build();
}

@Override
public InternalTable getCurrentTable() {
SnapshotManager snapshotManager = paimonTable.snapshotManager();
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
throw new ReadException("No snapshots found for table " + paimonTable.name());
}
return getTable(snapshot);
}

@Override
public InternalSnapshot getCurrentSnapshot() {
SnapshotManager snapshotManager = paimonTable.snapshotManager();
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
throw new ReadException("No snapshots found for table " + paimonTable.name());
}

InternalTable internalTable = getTable(snapshot);
List<InternalDataFile> internalDataFiles =
dataFileExtractor.toInternalDataFiles(paimonTable, snapshot);

return InternalSnapshot.builder()
.table(internalTable)
.version(Long.toString(snapshot.timeMillis()))
.partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
// TODO : Implement pending commits extraction, required for incremental sync
.sourceIdentifier(getCommitIdentifier(snapshot))
.build();
}

@Override
public TableChange getTableChangeForCommit(Snapshot snapshot) {
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
}

@Override
public CommitsBacklog<Snapshot> getCommitsBacklog(
InstantsForIncrementalSync instantsForIncrementalSync) {
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
}

@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
return false; // Incremental sync is not supported yet
}

@Override
public String getCommitIdentifier(Snapshot snapshot) {
return Long.toString(snapshot.commitIdentifier());
}

@Override
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.paimon;

import java.io.IOException;

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;

import org.apache.xtable.conversion.ConversionSourceProvider;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.spi.extractor.ConversionSource;

public class PaimonConversionSourceProvider extends ConversionSourceProvider<Snapshot> {
@Override
public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable sourceTableConfig) {
try {
Options catalogOptions = new Options();
CatalogContext context = CatalogContext.create(catalogOptions, hadoopConf);

Path path = new Path(sourceTableConfig.getDataPath());
FileIO fileIO = FileIO.get(path, context);
FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path);

return new PaimonConversionSource(paimonTable);
} catch (IOException e) {
throw new ReadException(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.paimon;

import java.util.*;

import org.apache.paimon.Snapshot;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.snapshot.SnapshotReader;

import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.InternalDataFile;

public class PaimonDataFileExtractor {

private final PaimonPartitionExtractor partitionExtractor =
PaimonPartitionExtractor.getInstance();

private static final PaimonDataFileExtractor INSTANCE = new PaimonDataFileExtractor();

public static PaimonDataFileExtractor getInstance() {
return INSTANCE;
}

public List<InternalDataFile> toInternalDataFiles(FileStoreTable table, Snapshot snapshot) {
List<InternalDataFile> result = new ArrayList<>();
Iterator<ManifestEntry> manifestEntryIterator =
newSnapshotReader(table, snapshot).readFileIterator();
while (manifestEntryIterator.hasNext()) {
result.add(toInternalDataFile(table, manifestEntryIterator.next()));
}
return result;
}

private InternalDataFile toInternalDataFile(FileStoreTable table, ManifestEntry entry) {
return InternalDataFile.builder()
.physicalPath(toFullPhysicalPath(table, entry))
.fileSizeBytes(entry.file().fileSize())
.lastModified(entry.file().creationTimeEpochMillis())
.recordCount(entry.file().rowCount())
.partitionValues(partitionExtractor.toPartitionValues(table, entry.partition()))
.columnStats(toColumnStats(entry.file()))
.build();
}

private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) {
String basePath = table.location().toString();
String bucketPath = "bucket-" + entry.bucket();
String filePath = entry.file().fileName();

Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, entry.partition());
if (partitionPath.isPresent()) {
return String.join("/", basePath, partitionPath.get(), bucketPath, filePath);
} else {
return String.join("/", basePath, bucketPath, filePath);
}
}

private List<ColumnStat> toColumnStats(DataFileMeta file) {
// TODO: Implement logic to extract column stats from the file meta
return Collections.emptyList();
}

private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) {
// If the table has primary keys, we read only the top level files
// which means we can only consider fully compacted files.
if (!table.schema().primaryKeys().isEmpty()) {
return table
.newSnapshotReader()
.withLevel(table.coreOptions().numLevels() - 1)
.withSnapshot(snapshot);
} else {
return table.newSnapshotReader().withSnapshot(snapshot);
}
}
}
Loading