partitionColNames, RowData rowData) {
+ return partitionColNames.stream()
+ .map(
+ name -> {
+ final int partitionValueColIdx = rowType.indexOf(name);
+ return new Object[] {
+ name, Conversions.FlinkToDelta.data(rowType, rowData, partitionValueColIdx)
+ };
+ })
+ .collect(Collectors.toMap(o -> (String) o[0], o -> (Literal) o[1]));
+ }
+
+ public static Literal data(StructType rowType, RowData rowData, int colIdx) {
+ final StructField field = rowType.at(colIdx);
+ final DataType dataType = field.getDataType();
+ if (dataType.equivalent(io.delta.kernel.types.IntegerType.INTEGER)) {
+ return Literal.ofInt(rowData.getInt(colIdx));
+ } else if (dataType.equivalent(io.delta.kernel.types.LongType.LONG)) {
+ return Literal.ofLong(rowData.getLong(colIdx));
+ } else if (dataType.equivalent(io.delta.kernel.types.StringType.STRING)) {
+ return Literal.ofString(rowData.getString(colIdx).toString());
+ } else if (dataType.equivalent(io.delta.kernel.types.DoubleType.DOUBLE)) {
+ return Literal.ofDouble(rowData.getDouble(colIdx));
+ } else if (dataType.equivalent(io.delta.kernel.types.FloatType.FLOAT)) {
+ return Literal.ofFloat(rowData.getFloat(colIdx));
+ } else if (dataType instanceof io.delta.kernel.types.DecimalType) {
+ io.delta.kernel.types.DecimalType decimalType =
+ (io.delta.kernel.types.DecimalType) dataType;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ return Literal.ofDecimal(
+ rowData.getDecimal(colIdx, precision, scale).toBigDecimal(), precision, scale);
+ } else if (dataType.equivalent(io.delta.kernel.types.DateType.DATE)) {
+ return Literal.ofDate(rowData.getInt(colIdx));
+ } else if (dataType.equivalent(io.delta.kernel.types.TimestampType.TIMESTAMP)) {
+ return Literal.ofTimestamp(rowData.getLong(colIdx));
+ } else if (dataType.equivalent(io.delta.kernel.types.TimestampNTZType.TIMESTAMP_NTZ)) {
+ return Literal.ofTimestampNtz(rowData.getLong(colIdx));
+ } else {
+ throw new UnsupportedOperationException("Unsupported data type: " + dataType);
+ }
+ }
+ }
+}
diff --git a/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaCommittable.java b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaCommittable.java
new file mode 100644
index 00000000000..d617c0ccf12
--- /dev/null
+++ b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaCommittable.java
@@ -0,0 +1,142 @@
+package io.delta.flink.sink;
+
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.json.JsonUtils;
+import io.delta.kernel.internal.actions.SingleAction;
+import io.delta.kernel.internal.util.Preconditions;
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * A {@code DeltaCommittable} represents a unit of pending work produced by a Delta sink writer that
+ * is ready to be committed to a Delta table.
+ *
+ * {@code DeltaCommittable} instances are emitted by {@link
+ * org.apache.flink.api.connector.sink.SinkWriter} implementations during checkpointing and are
+ * later consumed by a {@link DeltaCommitter} or global committer to finalize changes in the Delta
+ * transaction log.
+ *
+ *
Each committable encapsulates:
+ *
+ *
+ * - one or more Delta actions (e.g., {@code AddFile} actions) produced by a writer,
+ *
- checkpoint-scoped context that allows the commit process to be retried safely.
+ *
+ *
+ * During recovery or retries, the same {@code DeltaCommittable} may be delivered multiple times
+ * to the committer. Implementations must therefore ensure that committing a committable is either
+ * idempotent or protected by higher-level deduplication mechanisms (for example, checkpoint
+ * tracking stored in the Delta table metadata).
+ *
+ *
{@code DeltaCommittable} is a transport object only; it does not perform I/O or commit
+ * operations itself. All side effects are applied by the corresponding committer.
+ *
+ *
This class is typically serialized and checkpointed by Flink and must therefore remain stable
+ * and backward-compatible across versions of the connector.
+ */
+public class DeltaCommittable {
+ private final String jobId;
+ private final String operatorId;
+ private final long checkpointId;
+ private final String schemaDigest;
+ private final List deltaActions;
+
+ public DeltaCommittable(
+ String jobId,
+ String operatorId,
+ long checkpointId,
+ String schemaDigest,
+ List deltaActions) {
+ this.jobId = jobId;
+ this.operatorId = operatorId;
+ this.checkpointId = checkpointId;
+ this.schemaDigest = schemaDigest;
+ this.deltaActions = deltaActions;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ public String getSchemaDigest() {
+ return schemaDigest;
+ }
+
+ public List getDeltaActions() {
+ return deltaActions;
+ }
+
+ @Override
+ public String toString() {
+ return "DeltaCommittable{"
+ + "jobId='"
+ + jobId
+ + '\''
+ + ", operatorId='"
+ + operatorId
+ + '\''
+ + ", checkpointId="
+ + checkpointId
+ + ", schemaDigest="
+ + schemaDigest
+ + ", deltaActions="
+ + deltaActions.stream().map(JsonUtils::rowToJson).collect(Collectors.joining(","))
+ + '}';
+ }
+
+ static class Serializer implements SimpleVersionedSerializer {
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(DeltaCommittable obj) throws IOException {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos)) {
+ out.writeUTF(obj.getJobId());
+ out.writeUTF(obj.getOperatorId());
+ out.writeLong(obj.getCheckpointId());
+ out.writeUTF(obj.getSchemaDigest());
+ out.writeInt(obj.getDeltaActions().size());
+ for (Row row : obj.getDeltaActions()) {
+ Preconditions.checkArgument(
+ row.getSchema().equivalent(SingleAction.FULL_SCHEMA), "Need to be an action");
+ out.writeUTF(JsonUtils.rowToJson(row));
+ }
+ out.flush();
+ out.close();
+ return bos.toByteArray();
+ }
+ }
+
+ @Override
+ public DeltaCommittable deserialize(int version, byte[] serialized) throws IOException {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+ ObjectInputStream in = new ObjectInputStream(bis)) {
+ final String jobId = in.readUTF();
+ final String operatorId = in.readUTF();
+ final long checkpointId = in.readLong();
+ final String schemaDigest = in.readUTF();
+ final int numActions = in.readInt();
+ List actions = new ArrayList<>(numActions);
+ for (int i = 0; i < numActions; i++) {
+ final String actionJson = in.readUTF();
+ actions.add(JsonUtils.rowFromJson(actionJson, SingleAction.FULL_SCHEMA));
+ }
+ return new DeltaCommittable(jobId, operatorId, checkpointId, schemaDigest, actions);
+ }
+ }
+ }
+}
diff --git a/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaCommitter.java b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaCommitter.java
new file mode 100644
index 00000000000..22b0da6eb33
--- /dev/null
+++ b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaCommitter.java
@@ -0,0 +1,144 @@
+package io.delta.flink.sink;
+
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.internal.util.Preconditions;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Committer is responsible for committing the data staged by the CommittingSinkWriter in the
+ * second step of a two-phase commit protocol.
+ *
+ * A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will
+ * restart from the last successful checkpoint and re-attempt to commit all committables. There are
+ * two cases of failures:
+ *
+ *
+ * - Flink fails before completing checkpoint N. In this case, Flink discards all committables
+ * related to checkpoint N, and restart from reading the rows after checkpoint N-1. Flink
+ * calls the writer and committer to re-create the committables. Old committables are simply
+ * discarded. As the changes in checkpoint N is not written to Delta table, no special
+ * handling is needed in DeltaSink/DeltaCommitter.
+ *
- Flink fails after completing checkpoint N. In this case, the changes in checkpoint N has
+ * been written to the Delta table. Flink will load committables from the persisted checkpoint
+ * N and replay them. This will cause the changes in checkpoint N to be inserted twice into
+ * Delta table as duplicated add files. We rely on Delta to auto dedup these duplicated add
+ * files. See @link{io.delta.kernel.TransactionBuilder::withTransactionId}
+ *
+ *
+ * NOTE: Unlike IcebergCommitter, which writes the checkpoint ID into snapshot to prevent a data
+ * file from being added twice to the table, DeltaCommitter relies on Delta protocol to handle
+ * duplicated files. Thus we don't explicitly write jobId/checkpointId into DeltaLog.
+ */
+public class DeltaCommitter implements Committer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DeltaCommitter.class);
+
+ // All committables should have the same job id as the committer.
+ // For simplicity, we get the job id from constructor.
+ private String jobId;
+ private DeltaTable deltaTable;
+
+ private SinkCommitterMetricGroup metricGroup;
+
+ private DeltaCommitter(
+ String jobId, DeltaTable deltaTable, SinkCommitterMetricGroup metricGroup) {
+ this.jobId = jobId;
+ this.deltaTable = deltaTable;
+ this.metricGroup = metricGroup;
+ }
+
+ @Override
+ public void commit(Collection> committables)
+ throws IOException, InterruptedException {
+ LOG.debug("Starting commit");
+ sortCommittablesByCheckpointId(committables).forEach(this::commitForSingleCheckpointId);
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ private void commitForSingleCheckpointId(
+ long checkpointId, List> committables) {
+ LOG.debug("Committing {} committables on checkpoint {}", committables.size(), checkpointId);
+
+ deltaTable.refresh();
+ String latestSchemaDigest = new DeltaSchemaDigest(deltaTable.getSchema()).sha256();
+ Set committingSchemaDigest =
+ committables.stream()
+ .map(CommitRequest::getCommittable)
+ .map(DeltaCommittable::getSchemaDigest)
+ .collect(Collectors.toSet());
+ Preconditions.checkArgument(
+ committingSchemaDigest.size() == 1 && committingSchemaDigest.contains(latestSchemaDigest),
+ "Committing Schema is different from latest table Schema");
+
+ final CloseableIterable dataActions =
+ new CloseableIterable() {
+ @Override
+ public CloseableIterator iterator() {
+ return Utils.toCloseableIterator(
+ committables.stream()
+ .flatMap(req -> req.getCommittable().getDeltaActions().stream())
+ .iterator());
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to close
+ }
+ };
+
+ deltaTable.commit(dataActions);
+ }
+
+ private TreeMap>> sortCommittablesByCheckpointId(
+ Collection> committables) {
+ return committables.stream()
+ .collect(
+ Collectors.groupingBy(
+ commitRequest -> commitRequest.getCommittable().getCheckpointId(),
+ TreeMap::new,
+ Collectors.toList()));
+ }
+
+ public static final class Builder {
+ private String jobId;
+ private DeltaTable deltaTable;
+ private SinkCommitterMetricGroup metricGroup;
+
+ public Builder() {}
+
+ public Builder withJobId(String jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public Builder withDeltaTable(DeltaTable deltaTable) {
+ this.deltaTable = deltaTable;
+ return this;
+ }
+
+ public Builder withMetricGroup(SinkCommitterMetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ return this;
+ }
+
+ public DeltaCommitter build() {
+ Objects.requireNonNull(jobId, "jobId must not be null");
+ Objects.requireNonNull(deltaTable, "tableLoader must not be null");
+ Objects.requireNonNull(metricGroup, "metricGroup must not be null");
+
+ return new DeltaCommitter(jobId, deltaTable, metricGroup);
+ }
+ }
+}
diff --git a/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSchemaDigest.java b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSchemaDigest.java
new file mode 100644
index 00000000000..fcedb297b37
--- /dev/null
+++ b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSchemaDigest.java
@@ -0,0 +1,52 @@
+package io.delta.flink.sink;
+
+import io.delta.kernel.types.StructType;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * {@code DeltaSchemaDigest} computes a deterministic, compact digest representation of a Delta
+ * table schema.
+ *
+ * The digest captures the essential structural aspects of a schema—such as column names, data
+ * types, nullability, and nested field structure—so that two schemas with the same logical
+ * definition produce the same digest value.
+ *
+ *
This abstraction is primarily used to enable efficient schema evolution detection without
+ * performing expensive deep schema comparisons. By comparing digests, callers can quickly determine
+ * whether a schema has changed in a way that may require action (for example, triggering a schema
+ * update, compatibility check, or commit-time validation).
+ *
+ *
{@code DeltaSchemaDigest} is intended for internal use by components involved in schema
+ * tracking, commit coordination, or metadata validation, and does not replace full schema
+ * inspection when detailed differences are required.
+ */
+public class DeltaSchemaDigest {
+ private final StructType schema;
+
+ public DeltaSchemaDigest(StructType schema) {
+ this.schema = schema;
+ }
+
+ /**
+ * Generate a SHA-256 digest
+ *
+ * @return schema digest in SHA256
+ */
+ public String sha256() {
+ byte[] schemaBytes = schema.toJson().getBytes(StandardCharsets.UTF_8);
+ try {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ byte[] digestBytes = digest.digest(schemaBytes);
+ StringBuilder sb = new StringBuilder(digestBytes.length * 2);
+ for (byte b : digestBytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ } catch (NoSuchAlgorithmException e) {
+ // SHA-256 is guaranteed to exist in Java
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSink.java b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSink.java
new file mode 100644
index 00000000000..c5ac9f2b42f
--- /dev/null
+++ b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSink.java
@@ -0,0 +1,253 @@
+package io.delta.flink.sink;
+
+import io.delta.flink.table.*;
+import io.delta.kernel.internal.util.Preconditions;
+import io.delta.kernel.types.StructType;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import jdk.jfr.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.*;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.*;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink v2 sink offer different hooks to insert custom topologies into the sink. We will use the
+ * following:
+ *
+ *
+ * - {@link SupportsPreWriteTopology} which redistributes the data to the writers
+ *
- {@link org.apache.flink.api.connector.sink2.SinkWriter} which writes data/delete files, and
+ * generates the {@link DeltaWriterResult} objects for the files
+ *
- {@link SupportsPreCommitTopology} which we use to place the {@link
+ * DeltaWriterResultAggregator} which merges the individual {@link
+ * org.apache.flink.api.connector.sink2.SinkWriter}'s {@link DeltaWriterResult}s to a single
+ * {@link DeltaCommittable}
+ *
- {@link DeltaCommitter} which commits the incoming{@link DeltaCommittable}s to the Iceberg
+ * table
+ *
- {@link SupportsPostCommitTopology} we could use for incremental compaction later. This is
+ * not implemented yet.
+ *
+ *
+ * The job graph looks like below:
+ *
+ *
{@code
+ * Flink sink
+ * +-----------------------------------------------------------------------------------+
+ * | |
+ * +-------+ | +----------+ +-------------+ +---------------+ |
+ * | Map 1 | ==> | | writer 1 | | committer 1 | ---> | post commit 1 | |
+ * +-------+ | +----------+ +-------------+ +---------------+ |
+ * | \ / \ |
+ * | DeltaWriterResults DeltaCommittables \ |
+ * | \ / \ |
+ * +-------+ | +----------+ \ +-------------------+ / \ +---------------+ |
+ * | Map 2 | ==> | | writer 2 | --->| commit aggregator | | post commit 2 | |
+ * +-------+ | +----------+ +-------------------+ +---------------+ |
+ * | Commit only on |
+ * | a single committer |
+ * +-----------------------------------------------------------------------------------+
+ * }
+ */
+@Experimental
+public class DeltaSink
+ implements Sink,
+ SupportsCommitter,
+ SupportsPreCommitTopology,
+ SupportsPreWriteTopology,
+ SupportsPostCommitTopology {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DeltaSink.class);
+
+ private final DeltaTable deltaTable;
+
+ public DeltaSink(DeltaTable deltaTable) {
+ this.deltaTable = deltaTable;
+ }
+
+ @Override
+ public SinkWriter createWriter(InitContext context) throws IOException {
+ return new DeltaSinkWriter.Builder()
+ .withJobId(context.getJobInfo().getJobId().toString())
+ .withSubtaskId(context.getTaskInfo().getIndexOfThisSubtask())
+ .withAttemptNumber(context.getTaskInfo().getAttemptNumber())
+ .withDeltaTable(deltaTable)
+ .withMetricGroup(context.metricGroup())
+ .build();
+ }
+
+ @Override
+ public SinkWriter createWriter(WriterInitContext context) throws IOException {
+ return new DeltaSinkWriter.Builder()
+ .withJobId(context.getJobInfo().getJobId().toString())
+ .withSubtaskId(context.getTaskInfo().getIndexOfThisSubtask())
+ .withAttemptNumber(context.getTaskInfo().getAttemptNumber())
+ .withDeltaTable(deltaTable)
+ .withMetricGroup(context.metricGroup())
+ .build();
+ }
+
+ @Override
+ public Committer createCommitter(CommitterInitContext context)
+ throws IOException {
+ return new DeltaCommitter.Builder()
+ .withJobId(context.getJobInfo().getJobId().toString())
+ .withDeltaTable(deltaTable)
+ .withMetricGroup(context.metricGroup())
+ .build();
+ }
+
+ /**
+ * This method ensures that all rows with the same partitionHash will be sent to the same {@link
+ * DeltaSinkWriter}. It makes no promises about how many unique partitionHash's that a {@link
+ * DeltaSinkWriter} will handle (it may even be 0).
+ *
+ * TODO This design may cause imbalanced workload if the data distribution is skewed.
+ */
+ @Override
+ public DataStream addPreWriteTopology(DataStream inputDataStream) {
+ return inputDataStream.keyBy(
+ (KeySelector)
+ value ->
+ Conversions.FlinkToDelta.partitionValues(
+ deltaTable.getSchema(), deltaTable.getPartitionColumns(), value)
+ .entrySet().stream()
+ .collect(
+ Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString()))
+ .hashCode());
+ }
+
+ @Override
+ public DataStream> addPreCommitTopology(
+ DataStream> writerResults) {
+ TypeInformation> typeInformation =
+ CommittableMessageTypeInfo.of(this::getCommittableSerializer);
+ String uid = String.format("DeltaSink preCommit aggregator: %s", deltaTable.getId());
+ // global forces all output records send to subtask 0 of the downstream committer operator.
+ // This is to ensure commit only happen in one committer subtask.
+ return writerResults
+ .global()
+ .transform(uid, typeInformation, new DeltaWriterResultAggregator())
+ .uid(uid)
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ // global forces all output records send to subtask 0 of the downstream committer operator.
+ // This is to ensure commit only happen in one committer subtask.
+ // Once upstream Flink provides the capability of setting committer operator
+ // parallelism to 1, this can be removed.
+ .global();
+ }
+
+ @Override
+ public SimpleVersionedSerializer getWriteResultSerializer() {
+ return new DeltaWriterResult.Serializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer getCommittableSerializer() {
+ return new DeltaCommittable.Serializer();
+ }
+
+ @Override
+ public void addPostCommitTopology(DataStream> committables) {
+ committables.global().process(new PostCommitOperator()).uid("DeltaSink postCommit processor");
+ }
+
+ public static class Builder {
+ private DeltaTable deltaTable;
+ // For file-based tables
+ private String tablePath;
+ private RowType flinkSchema;
+ private List partitionColNames;
+ // For catalog-based tables
+ private String tableId;
+ private String catalogEndpoint;
+ private String catalogToken;
+ private Map configurations;
+
+ public Builder withDeltaTable(DeltaTable deltaTable) {
+ this.deltaTable = deltaTable;
+ return this;
+ }
+
+ // For file-based tables
+ public Builder withTablePath(String tablePath) {
+ this.tablePath = tablePath;
+ return this;
+ }
+
+ public Builder withFlinkSchema(RowType flinkSchema) {
+ this.flinkSchema = flinkSchema;
+ return this;
+ }
+
+ public Builder withPartitionColNames(List partitionColNames) {
+ this.partitionColNames = partitionColNames;
+ return this;
+ }
+
+ // For catalog-based tables
+ public Builder withTableId(String tableId) {
+ this.tableId = tableId;
+ return this;
+ }
+
+ public Builder withCatalogEndpoint(String catalogEndpoint) {
+ this.catalogEndpoint = catalogEndpoint;
+ return this;
+ }
+
+ public Builder withCatalogToken(String catalogToken) {
+ this.catalogToken = catalogToken;
+ return this;
+ }
+
+ public Builder withConfigurations(Map configurations) {
+ this.configurations = configurations;
+ return this;
+ }
+
+ public DeltaSink build() {
+ if (configurations == null) {
+ configurations = Map.of();
+ }
+ if (deltaTable == null) {
+ // Can use only one from tablePath or tableId
+ Preconditions.checkArgument(
+ (tablePath != null) ^ (tableId != null), "Use either tablePath or tableId");
+ if (tablePath != null) {
+ // File-based table
+ StructType tableSchema = null;
+ if (flinkSchema != null) {
+ tableSchema = Conversions.FlinkToDelta.schema(flinkSchema);
+ }
+ deltaTable =
+ new HadoopTable(
+ URI.create(tablePath), configurations, tableSchema, partitionColNames);
+ } else {
+ // Catalog-based table
+ Objects.requireNonNull(catalogEndpoint);
+ Objects.requireNonNull(catalogToken);
+ Map finalConf = new HashMap<>(configurations);
+ finalConf.put(CCv2Table.CATALOG_ENDPOINT, catalogEndpoint);
+ finalConf.put(CCv2Table.CATALOG_TOKEN, catalogToken);
+ // TODO Support separated endpoints for catalog and table
+ Catalog restCatalog = new RESTCatalog(catalogEndpoint, catalogToken);
+ deltaTable = new CCv2Table(restCatalog, tableId, finalConf);
+ }
+ }
+ return new DeltaSink(deltaTable);
+ }
+ }
+}
diff --git a/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSinkWriter.java b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSinkWriter.java
new file mode 100644
index 00000000000..e4321beb7c6
--- /dev/null
+++ b/connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaSinkWriter.java
@@ -0,0 +1,164 @@
+package io.delta.flink.sink;
+
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.expressions.Literal;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Delta writer implementation based on Flink’s Sink V2 Connector API.
+ *
+ * This writer is responsible for writing incoming records to the target Delta table storage and
+ * producing {@link DeltaWriterResult} objects that describe the data written by this writer since
+ * the last successful checkpoint.
+ *
+ *
At each checkpoint, the writer emits a {@code DeltaWriterResult} containing the Delta {@code
+ * AddFile} actions (and any other relevant actions) generated during that checkpoint interval.
+ * These results are subsequently aggregated and committed by the downstream committer components to
+ * create a new Delta table version.
+ *
+ *
This implementation follows Flink’s checkpointing and fault-tolerance model:
+ *
+ *
+ * - Writes are buffered and tracked per checkpoint,
+ *
- {@code DeltaWriterResult}s are emitted during checkpoint preparation, and
+ *
- Commit responsibility is delegated to the committer to ensure correctness and exactly-once
+ * or at-least-once semantics.
+ *
+ *
+ * The writer does not perform table commits directly. Instead, it focuses solely on producing
+ * durable data files and describing their effects via {@code DeltaWriterResult}, allowing commit
+ * coordination and deduplication to be handled centrally.
+ */
+public class DeltaSinkWriter implements CommittingSinkWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(DeltaSinkWriter.class);
+
+ private final String jobId;
+ private final int subtaskId;
+ private final int attemptNumber;
+
+ private final DeltaTable deltaTable;
+
+ private final Map