From cf369a0f24489788cf2ae54b034e25b57b5ac0f7 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Wed, 18 Dec 2024 17:25:27 +0800 Subject: [PATCH] [Feature][API] Support timestamp with timezone offset --- docs/en/concept/schema-feature.md | 39 ++++---- docs/zh/concept/schema-feature.md | 39 ++++---- .../SeaTunnelDataTypeConvertorUtil.java | 2 + .../table/converter/BasicDataConverter.java | 89 +++++++++++++++++++ .../seatunnel/api/table/type/ArrayType.java | 3 + .../api/table/type/LocalTimeType.java | 3 + .../seatunnel/api/table/type/MapType.java | 1 + .../api/table/type/SeaTunnelRow.java | 3 + .../seatunnel/api/table/type/SqlType.java | 1 + .../assertion/excecutor/AssertExecutor.java | 1 + .../fake/source/FakeDataGenerator.java | 26 ++++++ .../src/test/resources/fake_to_assert.conf | 10 +++ .../serialization/RowConverter.java | 1 + .../serialization/InternalRowConverter.java | 22 ++++- .../serialization/SeaTunnelRowConverter.java | 10 +++ .../spark/utils/OffsetDateTimeUtils.java | 50 +++++++++++ .../spark/utils/TypeConverterUtils.java | 26 ++++-- 17 files changed, 279 insertions(+), 47 deletions(-) create mode 100644 seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/OffsetDateTimeUtils.java diff --git a/docs/en/concept/schema-feature.md b/docs/en/concept/schema-feature.md index 3a4e83e06e57..5521275386d6 100644 --- a/docs/en/concept/schema-feature.md +++ b/docs/en/concept/schema-feature.md @@ -70,25 +70,26 @@ columns = [ #### What type supported at now -| Data type | Value type in Java | Description | -|:----------|:---------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| string | `java.lang.String` | string | -| boolean | `java.lang.Boolean` | boolean | -| tinyint | `java.lang.Byte` | -128 to 127 regular. 0 to 255 unsigned*. Specify the maximum number of digits in parentheses. | -| smallint | `java.lang.Short` | -32768 to 32767 General. 0 to 65535 unsigned*. Specify the maximum number of digits in parentheses. | -| int | `java.lang.Integer` | All numbers from -2,147,483,648 to 2,147,483,647 are allowed. | -| bigint | `java.lang.Long` | All numbers between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed. | -| float | `java.lang.Float` | Float-precision numeric data from -1.79E+308 to 1.79E+308. | -| double | `java.lang.Double` | Double precision floating point. Handle most decimals. | -| decimal | `java.math.BigDecimal` | Double type stored as a string, allowing a fixed decimal point. | -| null | `java.lang.Void` | null | -| bytes | `byte[]` | bytes | -| date | `java.time.LocalDate` | Only the date is stored. From January 1, 0001 to December 31, 9999. | -| time | `java.time.LocalTime` | Only store time. Accuracy is 100 nanoseconds. | -| timestamp | `java.time.LocalDateTime` | Stores a unique number that is updated whenever a row is created or modified. timestamp is based on the internal clock and does not correspond to real time. There can only be one timestamp variable per table. | -| row | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | Row type, can be nested. | -| map | `java.util.Map` | A Map is an object that maps keys to values. The key type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` , and the value type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` `array` `map` `row`. | -| array | `ValueType[]` | A array is a data type that represents a collection of elements. The element type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double`. | +| Data type | Value type in Java | Description | +|:-------------|:---------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| string | `java.lang.String` | string | +| boolean | `java.lang.Boolean` | boolean | +| tinyint | `java.lang.Byte` | -128 to 127 regular. 0 to 255 unsigned*. Specify the maximum number of digits in parentheses. | +| smallint | `java.lang.Short` | -32768 to 32767 General. 0 to 65535 unsigned*. Specify the maximum number of digits in parentheses. | +| int | `java.lang.Integer` | All numbers from -2,147,483,648 to 2,147,483,647 are allowed. | +| bigint | `java.lang.Long` | All numbers between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed. | +| float | `java.lang.Float` | Float-precision numeric data from -1.79E+308 to 1.79E+308. | +| double | `java.lang.Double` | Double precision floating point. Handle most decimals. | +| decimal | `java.math.BigDecimal` | Double type stored as a string, allowing a fixed decimal point. | +| null | `java.lang.Void` | null | +| bytes | `byte[]` | bytes | +| date | `java.time.LocalDate` | Only the date is stored. From January 1, 0001 to December 31, 9999. | +| time | `java.time.LocalTime` | Only store time. Accuracy is 100 nanoseconds. | +| timestamp | `java.time.LocalDateTime` | Stores a unique number that is updated whenever a row is created or modified. timestamp is based on the internal clock and does not correspond to real time. There can only be one timestamp variable per table. | +| timestamp_tz | `java.time.OffsetDateTime` | Stores a unique number that is updated whenever a row is created or modified. timestamp is based on the internal clock and does not correspond to real time. There can only be one timestamp variable per table. | +| row | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | Row type, can be nested. | +| map | `java.util.Map` | A Map is an object that maps keys to values. The key type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` , and the value type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` `array` `map` `row`. | +| array | `ValueType[]` | A array is a data type that represents a collection of elements. The element type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double`. | #### How to declare type supported diff --git a/docs/zh/concept/schema-feature.md b/docs/zh/concept/schema-feature.md index b504d264f83e..0659328fb20d 100644 --- a/docs/zh/concept/schema-feature.md +++ b/docs/zh/concept/schema-feature.md @@ -70,25 +70,26 @@ columns = [ #### 目前支持哪些类型 -| 数据类型 | Java中的值类型 | 描述 | -|:----------|:---------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| string | `java.lang.String` | 字符串 | -| boolean | `java.lang.Boolean` | 布尔 | -| tinyint | `java.lang.Byte` | 常规-128 至 127 。 0 到 255 无符号*。 指定括号中的最大位数。 | -| smallint | `java.lang.Short` | 常规-32768 至 32767。 0 到 65535 无符号*。 指定括号中的最大位数。 | -| int | `java.lang.Integer` | 允许从 -2,147,483,648 到 2,147,483,647 的所有数字。 | -| bigint | `java.lang.Long` | 允许 -9,223,372,036,854,775,808 和 9,223,372,036,854,775,807 之间的所有数字。 | -| float | `java.lang.Float` | 从-1.79E+308 到 1.79E+308浮点精度数值数据。 | -| double | `java.lang.Double` | 双精度浮点。 处理大多数小数。 | -| decimal | `java.math.BigDecimal` | Double 类型存储为字符串,允许固定小数点。 | -| null | `java.lang.Void` | null | -| bytes | `byte[]` | 字节。 | -| date | `java.time.LocalDate` | 仅存储日期。从0001年1月1日到9999 年 12 月 31 日。 | -| time | `java.time.LocalTime` | 仅存储时间。精度为 100 纳秒。 | -| timestamp | `java.time.LocalDateTime` | 存储一个唯一的编号,每当创建或修改行时都会更新该编号。 时间戳基于内部时钟,与实际时间不对应。 每个表只能有一个时间戳变量。 | -| row | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | 行类型,可以嵌套。 | -| map | `java.util.Map` | Map 是将键映射到值的对象。 键类型包括: `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` , and the value type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` `array` `map` `row`. | -| array | `ValueType[]` | 数组是一种表示元素集合的数据类型。 元素类型包括: `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double`. | +| 数据类型 | Java中的值类型 | 描述 | +|:-------------|:---------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| string | `java.lang.String` | 字符串 | +| boolean | `java.lang.Boolean` | 布尔 | +| tinyint | `java.lang.Byte` | 常规-128 至 127 。 0 到 255 无符号*。 指定括号中的最大位数。 | +| smallint | `java.lang.Short` | 常规-32768 至 32767。 0 到 65535 无符号*。 指定括号中的最大位数。 | +| int | `java.lang.Integer` | 允许从 -2,147,483,648 到 2,147,483,647 的所有数字。 | +| bigint | `java.lang.Long` | 允许 -9,223,372,036,854,775,808 和 9,223,372,036,854,775,807 之间的所有数字。 | +| float | `java.lang.Float` | 从-1.79E+308 到 1.79E+308浮点精度数值数据。 | +| double | `java.lang.Double` | 双精度浮点。 处理大多数小数。 | +| decimal | `java.math.BigDecimal` | Double 类型存储为字符串,允许固定小数点。 | +| null | `java.lang.Void` | null | +| bytes | `byte[]` | 字节。 | +| date | `java.time.LocalDate` | 仅存储日期。从0001年1月1日到9999 年 12 月 31 日。 | +| time | `java.time.LocalTime` | 仅存储时间。精度为 100 纳秒。 | +| timestamp | `java.time.LocalDateTime` | 存储一个唯一的编号,每当创建或修改行时都会更新该编号。 时间戳基于内部时钟,与实际时间不对应。 每个表只能有一个时间戳变量。 | +| timestamp_tz | `java.time.OffsetDateTime` | 存储一个唯一的编号,每当创建或修改行时都会更新该编号。 时间戳基于内部时钟,与实际时间不对应。 每个表只能有一个时间戳变量。 | +| row | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | 行类型,可以嵌套。 | +| map | `java.util.Map` | Map 是将键映射到值的对象。 键类型包括: `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` , and the value type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` `array` `map` `row`. | +| array | `ValueType[]` | 数组是一种表示元素集合的数据类型。 元素类型包括: `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double`. | #### 如何声明支持的类型 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java index 8230ca32eed8..c5109f2e14ed 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java @@ -79,6 +79,8 @@ public static SeaTunnelDataType deserializeSeaTunnelDataType( return LocalTimeType.LOCAL_TIME_TYPE; case TIMESTAMP: return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case TIMESTAMP_TZ: + return LocalTimeType.OFFSET_DATE_TIME_TYPE; case MAP: return parseMapType(field, columnType); case BINARY_VECTOR: diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicDataConverter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicDataConverter.java index bae76cb57838..d134ed713c3e 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicDataConverter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicDataConverter.java @@ -74,6 +74,8 @@ default Object convert(SeaTunnelDataType typeDefine, Object value) { return convertTime(value); case TIMESTAMP: return convertLocalDateTime(value); + case TIMESTAMP_TZ: + return convertOffsetDateTime(value); case BYTES: return convertBytes(value); case STRING: @@ -123,6 +125,8 @@ default Object convert(T typeDefine, Column columnDefine, Object value) { return convertTime(typeDefine, value); case TIMESTAMP: return convertLocalDateTime(typeDefine, value); + case TIMESTAMP_TZ: + return convertOffsetDateTime(typeDefine, value); case BYTES: return convertBytes(typeDefine, value); case STRING: @@ -434,6 +438,50 @@ default LocalDateTime convertLocalDateTime(T typeDefine, Object value) + typeDefine); } + default OffsetDateTime convertOffsetDateTime(T typeDefine, Object value) + throws UnsupportedOperationException { + if (value instanceof OffsetDateTime) { + return (OffsetDateTime) value; + } + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).atZone(ZoneId.systemDefault()).toOffsetDateTime(); + } + if (value instanceof Instant) { + return ((Instant) value).atZone(ZoneId.systemDefault()).toOffsetDateTime(); + } + if (value instanceof java.sql.Date) { + return ((java.sql.Date) value) + .toLocalDate() + .atTime(LocalTime.MIDNIGHT) + .atZone(ZoneId.systemDefault()) + .toOffsetDateTime(); + } + if (value instanceof java.sql.Timestamp) { + return ((java.sql.Timestamp) value) + .toInstant() + .atZone(ZoneId.systemDefault()) + .toOffsetDateTime(); + } + if (value instanceof Date) { + return ((Date) value).toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime(); + } + if (value instanceof LocalDate) { + return ((LocalDate) value) + .atTime(LocalTime.MIDNIGHT) + .atZone(ZoneId.systemDefault()) + .toOffsetDateTime(); + } + if (value instanceof String) { + return OffsetDateTime.parse((String) value); + } + + throw new UnsupportedOperationException( + "Unsupported convert " + + value.getClass() + + " to OffsetDateTime, typeDefine: " + + typeDefine); + } + default LocalDateTime convertLocalDateTime(T typeDefine, Instant value) { return convertLocalDateTime(value); } @@ -482,6 +530,47 @@ default LocalDateTime convertLocalDateTime(Object value) throws UnsupportedOpera "Unsupported convert " + value.getClass() + " to LocalDateTime"); } + default OffsetDateTime convertOffsetDateTime(Object value) + throws UnsupportedOperationException { + if (value instanceof OffsetDateTime) { + return (OffsetDateTime) value; + } + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).atZone(ZoneId.systemDefault()).toOffsetDateTime(); + } + if (value instanceof Instant) { + return ((Instant) value).atZone(ZoneId.systemDefault()).toOffsetDateTime(); + } + if (value instanceof java.sql.Date) { + return ((java.sql.Date) value) + .toLocalDate() + .atTime(LocalTime.MIDNIGHT) + .atZone(ZoneId.systemDefault()) + .toOffsetDateTime(); + } + if (value instanceof java.sql.Timestamp) { + return ((java.sql.Timestamp) value) + .toInstant() + .atZone(ZoneId.systemDefault()) + .toOffsetDateTime(); + } + if (value instanceof Date) { + return ((Date) value).toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime(); + } + if (value instanceof LocalDate) { + return ((LocalDate) value) + .atTime(LocalTime.MIDNIGHT) + .atZone(ZoneId.systemDefault()) + .toOffsetDateTime(); + } + if (value instanceof String) { + return OffsetDateTime.parse((String) value); + } + + throw new UnsupportedOperationException( + "Unsupported convert " + value.getClass() + " to LocalDateTime"); + } + default LocalDateTime convertLocalDateTime(Instant value) { return value.atZone(ZoneId.systemDefault()).toLocalDateTime(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java index 6b20d85a2d17..36c3362108af 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java @@ -48,6 +48,9 @@ public class ArrayType implements SeaTunnelDataType { public static final ArrayType LOCAL_DATE_TIME_ARRAY_TYPE = new ArrayType(LocalTimeType[].class, LocalTimeType.LOCAL_DATE_TIME_TYPE); + public static final ArrayType OFFSET_DATE_TIME_ARRAY_TYPE = + new ArrayType(LocalTimeType[].class, LocalTimeType.OFFSET_DATE_TIME_TYPE); + // -------------------------------------------------------------------------------------------- private final Class arrayClass; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LocalTimeType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LocalTimeType.java index 9976d0d2856b..e5213531b06c 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LocalTimeType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LocalTimeType.java @@ -20,6 +20,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.time.temporal.Temporal; import java.util.Objects; @@ -32,6 +33,8 @@ public class LocalTimeType implements SeaTunnelDataType { new LocalTimeType<>(LocalTime.class, SqlType.TIME); public static final LocalTimeType LOCAL_DATE_TIME_TYPE = new LocalTimeType<>(LocalDateTime.class, SqlType.TIMESTAMP); + public static final LocalTimeType OFFSET_DATE_TIME_TYPE = + new LocalTimeType<>(OffsetDateTime.class, SqlType.TIMESTAMP_TZ); private final Class typeClass; private final SqlType sqlType; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java index c217e2416b41..3e19b3fec5ef 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java @@ -40,6 +40,7 @@ public class MapType implements CompositeType> { SqlType.DATE, SqlType.TIME, SqlType.TIMESTAMP, + SqlType.TIMESTAMP_TZ, SqlType.FLOAT, SqlType.DOUBLE, SqlType.STRING, diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 84e172f2dfd9..39f61aee5d00 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -155,6 +155,7 @@ private int getBytesForValue(Object v, SeaTunnelDataType dataType) { case TIME: return 12; case TIMESTAMP: + case TIMESTAMP_TZ: return 48; case FLOAT_VECTOR: case FLOAT16_VECTOR: @@ -177,6 +178,7 @@ private int getBytesForValue(Object v, SeaTunnelDataType dataType) { case TIME: return ((Object[]) v).length * 12; case TIMESTAMP: + case TIMESTAMP_TZ: return ((Object[]) v).length * 48; default: throw new UnsupportedOperationException( @@ -285,6 +287,7 @@ private int getBytesForValue(Object v) { case "LocalTime": return 12; case "LocalDateTime": + case "OffsetDateTime": return 48; case "String[]": return getBytesForArray(v, BasicType.STRING_TYPE); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java index e33ceb8d3ce5..eba5a7d4b545 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java @@ -35,6 +35,7 @@ public enum SqlType { DATE, TIME, TIMESTAMP, + TIMESTAMP_TZ, BINARY_VECTOR, FLOAT_VECTOR, FLOAT16_VECTOR, diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java index cc1e12a80f70..53f8a40194e1 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java @@ -218,6 +218,7 @@ private boolean compareValue(Object value, SeaTunnelDataType type, Object con case DECIMAL: case TIME: case TIMESTAMP: + case TIMESTAMP_TZ: case DATE: default: return value.equals(confValue); diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java index 4f385754d597..382795c5bbc3 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java @@ -48,6 +48,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; @@ -184,6 +186,10 @@ private String getNewValueForField(SqlType sqlType, String fieldValue) { return fieldValue.equalsIgnoreCase(CURRENT_TIMESTAMP) ? LocalDateTime.now().toString() : null; + case TIMESTAMP_TZ: + return fieldValue.equalsIgnoreCase(CURRENT_TIMESTAMP) + ? OffsetDateTime.now().toString() + : null; default: return null; } @@ -292,6 +298,26 @@ private Object randomColumnValue(Column column) { : dateTimeFormatter); }, fakeDataRandomUtils::randomLocalDateTime); + case TIMESTAMP_TZ: + return value( + column, + defaultValue -> { + if (defaultValue.equalsIgnoreCase(CURRENT_TIMESTAMP)) { + return OffsetDateTime.now(); + } + DateTimeFormatter dateTimeFormatter = + DateTimeUtils.matchDateTimeFormatter(defaultValue); + return OffsetDateTime.parse( + defaultValue, + dateTimeFormatter == null + ? DateTimeFormatter.ISO_OFFSET_DATE_TIME + : dateTimeFormatter); + }, + c -> + fakeDataRandomUtils + .randomLocalDateTime(c) + .atZone(ZoneId.systemDefault()) + .toOffsetDateTime()); case ROW: SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes(); Object[] objects = new Object[fieldTypes.length]; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert.conf index 9c60ed73c3f4..7542bea0b21c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert.conf @@ -60,6 +60,7 @@ source { c_date = date c_decimal = "decimal(38, 18)" c_timestamp = timestamp + c_timestamp_tz = timestamp_tz } } } @@ -112,6 +113,15 @@ sink { rule_type = NOT_NULL } ] + }, + { + field_name = c_timestamp_tz + field_type = timestamp_tz + field_value = [ + { + rule_type = NOT_NULL + } + ] } ] } diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java index c19c42281101..a954abaa7dad 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java @@ -87,6 +87,7 @@ protected boolean validate(Object field, SeaTunnelDataType dataType) { case DATE: case TIME: case TIMESTAMP: + case TIMESTAMP_TZ: case FLOAT: case DOUBLE: case STRING: diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java index 7e2ee4ef877b..1521409f594d 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.translation.serialization.RowConverter; import org.apache.seatunnel.translation.spark.utils.InstantConverterUtils; +import org.apache.seatunnel.translation.spark.utils.OffsetDateTimeUtils; import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; import org.apache.spark.sql.catalyst.InternalRow; @@ -48,8 +49,10 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; +import scala.Some; import scala.Tuple2; import scala.collection.immutable.HashMap.HashTrieMap; +import scala.collection.immutable.List; import scala.collection.mutable.WrappedArray; import java.io.IOException; @@ -60,6 +63,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -101,6 +105,8 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { case TIMESTAMP: return InstantConverterUtils.toEpochMicro( Timestamp.valueOf((LocalDateTime) field).toInstant()); + case TIMESTAMP_TZ: + return Decimal.apply(OffsetDateTimeUtils.toBigDecimal((OffsetDateTime) field)); case MAP: return convertMap((Map) field, (MapType) dataType); case STRING: @@ -127,8 +133,8 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { } return ArrayData.toArrayData(field); default: - if (field instanceof scala.Some) { - return ((scala.Some) field).get(); + if (field instanceof Some) { + return ((Some) field).get(); } return field; } @@ -216,8 +222,8 @@ private static Map reconvertMap( Map newMap = new LinkedHashMap<>(num); SeaTunnelDataType keyType = mapType.getKeyType(); SeaTunnelDataType valueType = mapType.getValueType(); - scala.collection.immutable.List keyList = hashTrieMap.keySet().toList(); - scala.collection.immutable.List valueList = hashTrieMap.values().toList(); + List keyList = hashTrieMap.keySet().toList(); + List valueList = hashTrieMap.values().toList(); for (int i = 0; i < num; i++) { Object key = keyList.apply(i); Object value = valueList.apply(i); @@ -298,6 +304,14 @@ private static Object reconvert(Object field, SeaTunnelDataType dataType) { } return Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field)) .toLocalDateTime(); + case TIMESTAMP_TZ: + BigDecimal timeWithDecimal = null; + if (field instanceof Decimal) { + timeWithDecimal = ((Decimal) field).toJavaBigDecimal(); + } else if (field instanceof BigDecimal) { + timeWithDecimal = (BigDecimal) field; + } + return OffsetDateTimeUtils.toOffsetDateTime(timeWithDecimal); case MAP: if (field instanceof MapData) { return reconvertMap((MapData) field, (MapType) dataType); diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java index d76aa61e4652..697193690124 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.translation.serialization.RowConverter; +import org.apache.seatunnel.translation.spark.utils.OffsetDateTimeUtils; import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.unsafe.types.UTF8String; @@ -34,11 +35,13 @@ import scala.collection.mutable.WrappedArray; import java.io.IOException; +import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -93,6 +96,11 @@ private Object convert(Object field, SeaTunnelDataType dataType) { return Date.valueOf((LocalDate) field); case TIMESTAMP: return Timestamp.valueOf((LocalDateTime) field); + case TIMESTAMP_TZ: + if (field instanceof BigDecimal) { + return field; + } + return OffsetDateTimeUtils.toBigDecimal((OffsetDateTime) field); case TIME: if (field instanceof LocalTime) { return ((LocalTime) field).toNanoOfDay(); @@ -202,6 +210,8 @@ private Object reconvert(Object field, SeaTunnelDataType dataType) { return ((Date) field).toLocalDate(); case TIMESTAMP: return ((Timestamp) field).toLocalDateTime(); + case TIMESTAMP_TZ: + return OffsetDateTimeUtils.toOffsetDateTime((BigDecimal) field); case TIME: if (field instanceof Timestamp) { return ((Timestamp) field).toLocalDateTime().toLocalTime(); diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/OffsetDateTimeUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/OffsetDateTimeUtils.java new file mode 100644 index 000000000000..ea5a58654497 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/OffsetDateTimeUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.utils; + +import org.apache.spark.sql.types.DecimalType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; + +public class OffsetDateTimeUtils { + public static final String LOGICAL_TIMESTAMP_WITH_OFFSET_TYPE_FLAG = + "logical_timestamp_with_offset_type"; + + // epochMilli length 13, timezone offset length 5 + public static final DecimalType OFFSET_DATETIME_WITH_DECIMAL = new DecimalType(18, 5); + + public static BigDecimal toBigDecimal(OffsetDateTime time) { + return new BigDecimal( + time.toInstant().toEpochMilli() + "." + time.getOffset().getTotalSeconds()); + } + + public static OffsetDateTime toOffsetDateTime(BigDecimal timeWithDecimal) { + BigInteger epochMilli = + timeWithDecimal.unscaledValue().divide(BigInteger.TEN.pow(timeWithDecimal.scale())); + BigInteger offset = + timeWithDecimal + .unscaledValue() + .remainder(BigInteger.TEN.pow(timeWithDecimal.scale())); + return Instant.ofEpochMilli(epochMilli.longValue()) + .atOffset(ZoneOffset.ofTotalSeconds(offset.intValue())); + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java index 9b40f984c7af..45ead72e6ad6 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java @@ -60,7 +60,6 @@ public class TypeConverterUtils { TO_SEA_TUNNEL_TYPES.put(DataTypes.DoubleType, BasicType.DOUBLE_TYPE); TO_SEA_TUNNEL_TYPES.put(DataTypes.BinaryType, PrimitiveByteArrayType.INSTANCE); TO_SEA_TUNNEL_TYPES.put(DataTypes.DateType, LocalTimeType.LOCAL_DATE_TYPE); - TO_SEA_TUNNEL_TYPES.put(DataTypes.TimestampType, LocalTimeType.LOCAL_DATE_TIME_TYPE); } private TypeConverterUtils() { @@ -97,6 +96,8 @@ public static DataType convert(SeaTunnelDataType dataType) { return DataTypes.LongType; case TIMESTAMP: return DataTypes.TimestampType; + case TIMESTAMP_TZ: + return OffsetDateTimeUtils.OFFSET_DATETIME_WITH_DECIMAL; case ARRAY: return DataTypes.createArrayType( convert(((ArrayType) dataType).getElementType())); @@ -120,10 +121,19 @@ private static StructType convert(SeaTunnelRowType rowType) { StructField[] fields = new StructField[rowType.getFieldNames().length]; for (int i = 0; i < rowType.getFieldNames().length; i++) { SeaTunnelDataType fieldType = rowType.getFieldTypes()[i]; - Metadata metadata = - fieldType.getSqlType() == SqlType.TIME - ? new MetadataBuilder().putBoolean(LOGICAL_TIME_TYPE_FLAG, true).build() - : Metadata.empty(); + Metadata metadata; + if (fieldType.getSqlType() == SqlType.TIME) { + metadata = new MetadataBuilder().putBoolean(LOGICAL_TIME_TYPE_FLAG, true).build(); + } else if (fieldType.getSqlType() == SqlType.TIMESTAMP_TZ) { + metadata = + new MetadataBuilder() + .putBoolean( + OffsetDateTimeUtils.LOGICAL_TIMESTAMP_WITH_OFFSET_TYPE_FLAG, + true) + .build(); + } else { + metadata = Metadata.empty(); + } fields[i] = new StructField(rowType.getFieldNames()[i], convert(fieldType), true, metadata); @@ -204,6 +214,12 @@ private static SeaTunnelRowType convert(StructType structType) { && metadata.contains(LOGICAL_TIME_TYPE_FLAG) && metadata.getBoolean(LOGICAL_TIME_TYPE_FLAG)) { fieldTypes[i] = LocalTimeType.LOCAL_TIME_TYPE; + } else if (metadata != null + && metadata.contains( + OffsetDateTimeUtils.LOGICAL_TIMESTAMP_WITH_OFFSET_TYPE_FLAG) + && metadata.getBoolean( + OffsetDateTimeUtils.LOGICAL_TIMESTAMP_WITH_OFFSET_TYPE_FLAG)) { + fieldTypes[i] = LocalTimeType.OFFSET_DATE_TIME_TYPE; } else { fieldTypes[i] = convert(structFields[i].dataType()); }