Skip to content

Commit d3199f8

Browse files
committed
refactor DeltaTable interface
1 parent f1a7a12 commit d3199f8

16 files changed

+550
-503
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.delta.flink;
2+
3+
import io.delta.kernel.data.FilteredColumnarBatch;
4+
import io.delta.kernel.data.Row;
5+
6+
import java.io.IOException;
7+
import java.io.Serializable;
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
import io.delta.kernel.expressions.Literal;
12+
import io.delta.kernel.types.StructType;
13+
import io.delta.kernel.utils.CloseableIterable;
14+
import io.delta.kernel.utils.CloseableIterator;
15+
16+
/**
17+
* This is an interface to access the Delta table.
18+
* It contains table info such as table schema, partition columns, and also
19+
* implements methods to read and write table content.
20+
*/
21+
public interface DeltaTable extends Serializable {
22+
23+
/**
24+
* An id that can uniquely identify the table.
25+
*/
26+
String getId();
27+
28+
StructType getSchema();
29+
30+
List<String> getPartitionColumns();
31+
32+
/**
33+
* Commit a new version to the table.
34+
* @param actions actions to be committed
35+
*/
36+
void commit(CloseableIterable<Row> actions);
37+
38+
/**
39+
* Write parquet files and create an associated addfile action.
40+
*
41+
* @param pathSuffix string to be appended to the table path
42+
* @param data row data to be written
43+
* @param partitionValues partition values
44+
* @return a row representing the written action
45+
*/
46+
CloseableIterator<Row> writeParquet(
47+
String pathSuffix,
48+
CloseableIterator<FilteredColumnarBatch> data,
49+
Map<String, Literal> partitionValues) throws IOException;
50+
}

connectors/flink/v1.20/src/main/java/io/delta/flink/sink/Conversions.java

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -61,48 +61,49 @@ public static DataType dataType(LogicalType flinkType) {
6161
}
6262

6363
public static Map<String, Literal> partitionValues(
64-
RowType rowType, RowData rowData, Collection<String> partitionColNames) {
64+
StructType rowType, Collection<String> partitionColNames, RowData rowData) {
6565
return partitionColNames.stream()
6666
.map(
6767
name -> {
68-
final int partitionValueColIdx = rowType.getFieldIndex(name);
68+
final int partitionValueColIdx = rowType.indexOf(name);
6969
return new Object[] {
7070
name, Conversions.FlinkToDelta.data(rowType, rowData, partitionValueColIdx)
7171
};
7272
})
7373
.collect(Collectors.toMap(o -> (String) o[0], o -> (Literal) o[1]));
7474
}
7575

76-
public static Literal data(RowType rowType, RowData rowData, int colIdx) {
77-
final LogicalType flinkType = rowType.getTypeAt(colIdx);
78-
final LogicalTypeRoot typeRoot = flinkType.getTypeRoot();
79-
switch (typeRoot) {
80-
case INTEGER:
81-
return Literal.ofInt(rowData.getInt(colIdx));
82-
case BIGINT:
83-
return Literal.ofLong(rowData.getLong(colIdx));
84-
case VARCHAR:
85-
case CHAR:
86-
return Literal.ofString(rowData.getString(colIdx).toString());
87-
case DOUBLE:
88-
return Literal.ofDouble(rowData.getDouble(colIdx));
89-
case FLOAT:
90-
return Literal.ofFloat(rowData.getFloat(colIdx));
91-
case DECIMAL:
92-
DecimalType decimalType = (DecimalType) flinkType;
93-
int precision = decimalType.getPrecision();
94-
int scale = decimalType.getScale();
95-
return Literal.ofDecimal(
96-
rowData.getDecimal(colIdx, precision, scale).toBigDecimal(), precision, scale);
97-
case DATE:
98-
return Literal.ofDate(rowData.getInt(colIdx));
99-
case TIMESTAMP_WITH_TIME_ZONE:
100-
return Literal.ofTimestamp(rowData.getLong(colIdx));
101-
case TIMESTAMP_WITHOUT_TIME_ZONE:
102-
return Literal.ofTimestampNtz(rowData.getLong(colIdx));
103-
default:
104-
throw new UnsupportedOperationException(
105-
String.format("Type not supported: %s", flinkType));
76+
public static Literal data(StructType rowType, RowData rowData, int colIdx) {
77+
final StructField field = rowType.at(colIdx);
78+
final DataType dataType = field.getDataType();
79+
if (dataType.equivalent(IntegerType.INTEGER)) {
80+
return Literal.ofInt(rowData.getInt(colIdx));
81+
} else if (dataType.equivalent(LongType.LONG)) {
82+
return Literal.ofLong(rowData.getLong(colIdx));
83+
} else if (dataType.equivalent(StringType.STRING)) {
84+
return Literal.ofString(rowData.getString(colIdx).toString());
85+
} else if (dataType.equivalent(DoubleType.DOUBLE)) {
86+
return Literal.ofDouble(rowData.getDouble(colIdx));
87+
} else if (dataType.equivalent(FloatType.FLOAT)) {
88+
return Literal.ofFloat(rowData.getFloat(colIdx));
89+
} else if (dataType instanceof io.delta.kernel.types.DecimalType) {
90+
io.delta.kernel.types.DecimalType decimalType =
91+
(io.delta.kernel.types.DecimalType) dataType;
92+
int precision = decimalType.getPrecision();
93+
int scale = decimalType.getScale();
94+
return Literal.ofDecimal(
95+
rowData.getDecimal(colIdx, precision, scale).toBigDecimal(),
96+
precision,
97+
scale
98+
);
99+
} else if (dataType.equivalent(DateType.DATE)) {
100+
return Literal.ofDate(rowData.getInt(colIdx));
101+
} else if (dataType.equivalent(TimestampType.TIMESTAMP)) {
102+
return Literal.ofTimestamp(rowData.getLong(colIdx));
103+
} else if (dataType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
104+
return Literal.ofTimestampNtz(rowData.getLong(colIdx));
105+
} else {
106+
throw new UnsupportedOperationException("Unsupported data type: " + dataType);
106107
}
107108
}
108109
}

connectors/flink/v1.20/src/main/java/io/delta/flink/sink/DeltaCommittable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
import io.delta.kernel.defaults.internal.json.JsonUtils;
55
import io.delta.kernel.internal.actions.SingleAction;
66
import io.delta.kernel.internal.util.Preconditions;
7+
import org.apache.flink.core.io.SimpleVersionedSerializer;
8+
79
import java.io.*;
810
import java.util.ArrayList;
911
import java.util.List;
1012
import java.util.stream.Collectors;
11-
import org.apache.flink.core.io.SimpleVersionedSerializer;
1213

1314
public class DeltaCommittable {
1415
private final String jobId;
Lines changed: 19 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
package io.delta.flink.sink;
22

3-
import io.delta.kernel.*;
3+
import io.delta.flink.DeltaTable;
44
import io.delta.kernel.data.Row;
5-
import io.delta.kernel.defaults.internal.json.JsonUtils;
65
import io.delta.kernel.engine.Engine;
7-
import io.delta.kernel.exceptions.TableNotFoundException;
8-
import io.delta.kernel.internal.data.TransactionStateRow;
9-
import io.delta.kernel.internal.util.Preconditions;
106
import io.delta.kernel.internal.util.Utils;
11-
import io.delta.kernel.types.StructType;
127
import io.delta.kernel.utils.CloseableIterable;
138
import io.delta.kernel.utils.CloseableIterator;
14-
import java.io.IOException;
15-
import java.util.*;
16-
import java.util.stream.Collectors;
179
import org.apache.flink.api.connector.sink2.Committer;
1810
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
1911
import org.slf4j.Logger;
2012
import org.slf4j.LoggerFactory;
2113

14+
import java.io.IOException;
15+
import java.util.Collection;
16+
import java.util.List;
17+
import java.util.Objects;
18+
import java.util.TreeMap;
19+
import java.util.stream.Collectors;
20+
2221
/**
2322
* The Committer is responsible for committing the data staged by the CommittingSinkWriter in the
2423
* second step of a two-phase commit protocol.
@@ -51,38 +50,23 @@ public class DeltaCommitter implements Committer<DeltaCommittable> {
5150
// All committables should have the same job id as the committer.
5251
// For simplicity, we get the job id from constructor.
5352
private String jobId;
54-
private Engine engine;
55-
private Table table;
56-
private final Row committerContext;
53+
private DeltaTable deltaTable;
5754

5855
private SinkCommitterMetricGroup metricGroup;
5956

60-
private boolean creatingNewTable;
61-
6257
private DeltaCommitter(
63-
String jobId,
64-
Engine engine,
65-
Table table,
66-
Row committerContext,
67-
SinkCommitterMetricGroup metricGroup) {
58+
String jobId,
59+
DeltaTable deltaTable,
60+
SinkCommitterMetricGroup metricGroup) {
6861
this.jobId = jobId;
69-
this.engine = engine;
70-
this.table = table;
71-
this.committerContext = committerContext;
72-
62+
this.deltaTable = deltaTable;
7363
this.metricGroup = metricGroup;
7464
}
7565

7666
@Override
7767
public void commit(Collection<CommitRequest<DeltaCommittable>> committables)
7868
throws IOException, InterruptedException {
7969
LOG.debug("Starting commit");
80-
try {
81-
table.getLatestSnapshot(engine);
82-
creatingNewTable = false;
83-
} catch (TableNotFoundException e) {
84-
creatingNewTable = true;
85-
}
8670
sortCommittablesByCheckpointId(committables).forEach(this::commitForSingleCheckpointId);
8771
}
8872

@@ -93,44 +77,6 @@ private void commitForSingleCheckpointId(
9377
long checkpointId, List<CommitRequest<DeltaCommittable>> committables) {
9478
LOG.debug("Committing {} committables on checkpoint {}", committables.size(), checkpointId);
9579

96-
TransactionBuilder txnBuilder =
97-
table
98-
.createTransactionBuilder(
99-
engine,
100-
"DeltaSink/Kernel",
101-
creatingNewTable ? Operation.CREATE_TABLE : Operation.WRITE)
102-
.withTransactionId(engine, jobId, checkpointId);
103-
104-
if (creatingNewTable) {
105-
// For a new table set the table schema in the transaction builder
106-
txnBuilder =
107-
txnBuilder
108-
.withSchema(engine, TransactionStateRow.getLogicalSchema(committerContext))
109-
.withPartitionColumns(
110-
engine, TransactionStateRow.getPartitionColumnsList(committerContext));
111-
}
112-
final Transaction txn = txnBuilder.build(engine);
113-
114-
// We check the table's latest schema is still the same as committer schema.
115-
// The check is delayed here to detect external modification to the table schema.
116-
if (!creatingNewTable) {
117-
final Snapshot readSnapshot = table.getSnapshotAsOfVersion(engine, txn.getReadTableVersion());
118-
final StructType tableSchema = txn.getSchema(engine);
119-
final StructType committerSchema = TransactionStateRow.getLogicalSchema(committerContext);
120-
Preconditions.checkArgument(
121-
readSnapshot.getPath().equals(TransactionStateRow.getTablePath(this.committerContext)),
122-
String.format(
123-
"Committer path does not match the latest table path."
124-
+ "Table path: %s, Committer path: %s",
125-
readSnapshot.getPath(), TransactionStateRow.getTablePath(this.committerContext)));
126-
Preconditions.checkArgument(
127-
committerSchema.equivalent(tableSchema),
128-
String.format(
129-
"DeltaSink does not support schema evolution. "
130-
+ "Table schema: %s, Committer schema: %s",
131-
tableSchema, committerSchema));
132-
}
133-
13480
final CloseableIterable<Row> dataActions =
13581
new CloseableIterable<Row>() {
13682
@Override
@@ -147,8 +93,7 @@ public void close() throws IOException {
14793
}
14894
};
14995

150-
txn.commit(engine, dataActions);
151-
creatingNewTable = false;
96+
deltaTable.commit(dataActions);
15297
}
15398

15499
private TreeMap<Long, List<CommitRequest<DeltaCommittable>>> sortCommittablesByCheckpointId(
@@ -163,9 +108,7 @@ private TreeMap<Long, List<CommitRequest<DeltaCommittable>>> sortCommittablesByC
163108

164109
public static final class Builder {
165110
private String jobId;
166-
private Engine engine;
167-
private Table table;
168-
private Row committerContext;
111+
private DeltaTable deltaTable;
169112
private SinkCommitterMetricGroup metricGroup;
170113

171114
public Builder() {}
@@ -175,24 +118,8 @@ public Builder withJobId(String jobId) {
175118
return this;
176119
}
177120

178-
public Builder withEngine(Engine engine) {
179-
this.engine = engine;
180-
return this;
181-
}
182-
183-
public Builder withTable(Table table) {
184-
this.table = table;
185-
return this;
186-
}
187-
188-
public Builder withCommitterContext(Row committerContext) {
189-
this.committerContext = committerContext;
190-
return this;
191-
}
192-
193-
public Builder withCommitterContext(String committerContextJson) {
194-
this.committerContext =
195-
JsonUtils.rowFromJson(committerContextJson, TransactionStateRow.SCHEMA);
121+
public Builder withDeltaTable(DeltaTable deltaTable) {
122+
this.deltaTable = deltaTable;
196123
return this;
197124
}
198125

@@ -203,12 +130,10 @@ public Builder withMetricGroup(SinkCommitterMetricGroup metricGroup) {
203130

204131
public DeltaCommitter build() {
205132
Objects.requireNonNull(jobId, "jobId must not be null");
206-
Objects.requireNonNull(engine, "engine must not be null");
207-
Objects.requireNonNull(table, "table must not be null");
208-
Objects.requireNonNull(committerContext, "committerContext must not be null");
133+
Objects.requireNonNull(deltaTable, "tableLoader must not be null");
209134
Objects.requireNonNull(metricGroup, "metricGroup must not be null");
210135

211-
return new DeltaCommitter(jobId, engine, table, committerContext, metricGroup);
136+
return new DeltaCommitter(jobId, deltaTable, metricGroup);
212137
}
213138
}
214139
}

0 commit comments

Comments
 (0)