Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ val scalaTestVersionForConnectors = "3.0.8"
val parquet4sVersion = "1.9.4"
val protoVersion = "3.25.1"
val grpcVersion = "1.62.2"
val flink120Version = "1.20.3"

// For Java 11 use the following on command line
// sbt 'set targetJvm := "11"' [commands]
Expand Down Expand Up @@ -1278,6 +1279,51 @@ lazy val hudi = (project in file("hudi"))
TestParallelization.settings
)

lazy val flink = (project in file("connectors/flink/v1.20"))
// .dependsOn(kernelApi)
.dependsOn(kernelDefaults)
.dependsOn(kernelUnityCatalog)
.settings(
name := "delta-flink-v1.20",
commonSettings,
releaseSettings,
javafmtCheckSettings,
scalafmtCheckSettings,
publishArtifact := scalaBinaryVersion.value == "2.12", // only publish once
autoScalaLibrary := false, // exclude scala-library from dependencies
Compile / unmanagedJars += (kernelApi / Compile / packageBin).value,
Test / unmanagedJars += (kernelApi / Compile / packageBin).value,

// Make sure the shaded JAR is produced before we compile/run tests
Compile / compile := (Compile / compile).dependsOn(kernelApi / Compile / packageBin).value,
Test / test := (Test / test).dependsOn(kernelApi / Compile / packageBin).value,
Test / unmanagedJars += (kernelApi / Test / packageBin).value,

Test / publishArtifact := false,
Test / javaOptions ++= Seq(
"--add-opens=java.base/java.util=ALL-UNNAMED" // for Flink with Java 17.
),
crossPaths := false,
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-core" % flink120Version % "provided",
"org.apache.flink" % "flink-table-common" % flink120Version % "provided",
"org.apache.flink" % "flink-streaming-java" % flink120Version % "provided",
"io.unitycatalog" % "unitycatalog-client" % "0.3.0",

"org.apache.flink" % "flink-test-utils" % flink120Version % "test",
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
"org.apache.flink" % "flink-clients" % flink120Version % "test",
"org.apache.flink" % "flink-table-api-java-bridge" % flink120Version % Test,
"org.apache.flink" % "flink-table-planner-loader" % flink120Version % Test,
"org.apache.flink" % "flink-table-runtime" % flink120Version % Test,
"org.apache.flink" % "flink-test-utils-junit" % flink120Version % Test,
"org.slf4j" % "slf4j-log4j12" % "2.0.17" % "test",
// The below test dependencies are only needed for real E2E integration tests against a real
// UC endpoint.
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "test",
)
)

lazy val goldenTables = (project in file("connectors/golden-tables"))
.disablePlugins(JavaFormatterPlugin, ScalafmtPlugin)
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.delta.flink.sink;

import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.*;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

/** */
public class Conversions {
public static class FlinkToDelta {
public static StructType schema(RowType rowType) {
List<StructField> fields =
rowType.getFields().stream()
.map(
rowField -> {
DataType rowFieldType = dataType(rowField.getType());
return new StructField(
rowField.getName(), rowFieldType, rowField.getType().isNullable());
})
.collect(Collectors.toList());

return new StructType(fields);
}

public static DataType dataType(LogicalType flinkType) {
final LogicalTypeRoot typeRoot = flinkType.getTypeRoot();
switch (typeRoot) {
case INTEGER:
return IntegerType.INTEGER;
case VARCHAR:
case CHAR:
return StringType.STRING;
case BIGINT:
return LongType.LONG;
case DOUBLE:
return DoubleType.DOUBLE;
case FLOAT:
return FloatType.FLOAT;
case DECIMAL:
DecimalType decimalType = (DecimalType) flinkType;
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
return new io.delta.kernel.types.DecimalType(precision, scale);
case DATE:
return DateType.DATE;
case TIMESTAMP_WITH_TIME_ZONE:
return TimestampType.TIMESTAMP;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampNTZType.TIMESTAMP_NTZ;
default:
throw new UnsupportedOperationException(
String.format("Type not supported: %s", flinkType));
}
}

public static Map<String, Literal> partitionValues(
StructType rowType, Collection<String> partitionColNames, RowData rowData) {
return partitionColNames.stream()
.map(
name -> {
final int partitionValueColIdx = rowType.indexOf(name);
return new Object[] {
name, Conversions.FlinkToDelta.data(rowType, rowData, partitionValueColIdx)
};
})
.collect(Collectors.toMap(o -> (String) o[0], o -> (Literal) o[1]));
}

public static Literal data(StructType rowType, RowData rowData, int colIdx) {
final StructField field = rowType.at(colIdx);
final DataType dataType = field.getDataType();
if (dataType.equivalent(IntegerType.INTEGER)) {
return Literal.ofInt(rowData.getInt(colIdx));
} else if (dataType.equivalent(LongType.LONG)) {
return Literal.ofLong(rowData.getLong(colIdx));
} else if (dataType.equivalent(StringType.STRING)) {
return Literal.ofString(rowData.getString(colIdx).toString());
} else if (dataType.equivalent(DoubleType.DOUBLE)) {
return Literal.ofDouble(rowData.getDouble(colIdx));
} else if (dataType.equivalent(FloatType.FLOAT)) {
return Literal.ofFloat(rowData.getFloat(colIdx));
} else if (dataType instanceof io.delta.kernel.types.DecimalType) {
io.delta.kernel.types.DecimalType decimalType =
(io.delta.kernel.types.DecimalType) dataType;
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
return Literal.ofDecimal(
rowData.getDecimal(colIdx, precision, scale).toBigDecimal(), precision, scale);
} else if (dataType.equivalent(DateType.DATE)) {
return Literal.ofDate(rowData.getInt(colIdx));
} else if (dataType.equivalent(TimestampType.TIMESTAMP)) {
return Literal.ofTimestamp(rowData.getLong(colIdx));
} else if (dataType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
return Literal.ofTimestampNtz(rowData.getLong(colIdx));
} else {
throw new UnsupportedOperationException("Unsupported data type: " + dataType);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package io.delta.flink.sink;

import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.internal.json.JsonUtils;
import io.delta.kernel.internal.actions.SingleAction;
import io.delta.kernel.internal.util.Preconditions;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.core.io.SimpleVersionedSerializer;

public class DeltaCommittable {
private final String jobId;
private final String operatorId;
private final long checkpointId;
private final List<Row> deltaActions;

public DeltaCommittable(
String jobId, String operatorId, long checkpointId, List<Row> deltaActions) {
this.jobId = jobId;
this.operatorId = operatorId;
this.checkpointId = checkpointId;
this.deltaActions = deltaActions;
}

public String getJobId() {
return jobId;
}

public String getOperatorId() {
return operatorId;
}

public long getCheckpointId() {
return checkpointId;
}

public List<Row> getDeltaActions() {
return deltaActions;
}

@Override
public String toString() {
return "DeltaCommittable{"
+ "jobId='"
+ jobId
+ '\''
+ ", operatorId='"
+ operatorId
+ '\''
+ ", checkpointId="
+ checkpointId
+ ", deltaActions="
+ deltaActions.stream().map(JsonUtils::rowToJson).collect(Collectors.joining(","))
+ '}';
}

static class Serializer implements SimpleVersionedSerializer<DeltaCommittable> {
@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(DeltaCommittable obj) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos)) {
out.writeUTF(obj.getJobId());
out.writeUTF(obj.getOperatorId());
out.writeLong(obj.getCheckpointId());
out.writeInt(obj.getDeltaActions().size());
for (Row row : obj.getDeltaActions()) {
Preconditions.checkArgument(
row.getSchema().equivalent(SingleAction.FULL_SCHEMA), "Need to be an action");
out.writeUTF(JsonUtils.rowToJson(row));
}
out.flush();
out.close();
return bos.toByteArray();
}
}

@Override
public DeltaCommittable deserialize(int version, byte[] serialized) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
ObjectInputStream in = new ObjectInputStream(bis)) {
final String jobId = in.readUTF();
final String operatorId = in.readUTF();
final long checkpointId = in.readLong();
final int numActions = in.readInt();
List<Row> actions = new ArrayList<>(numActions);
for (int i = 0; i < numActions; i++) {
final String actionJson = in.readUTF();
actions.add(JsonUtils.rowFromJson(actionJson, SingleAction.FULL_SCHEMA));
}
return new DeltaCommittable(jobId, operatorId, checkpointId, actions);
}
}
}
}
Loading
Loading