Skip to content

Commit 0ab1947

Browse files
author
araika
committed
HIVE-20889: Support timestamp-micros in AvroSerDe
1 parent 3520f4f commit 0ab1947

File tree

11 files changed

+123
-11
lines changed

11 files changed

+123
-11
lines changed

common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java

+17
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ public long toEpochMilli() {
162162
return localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
163163
}
164164

165+
public long toEpochMicro() {
166+
return localDateTime.toEpochSecond(ZoneOffset.UTC) * 1_000_000
167+
+ localDateTime.getNano() / 1000;
168+
}
169+
165170
public long toEpochMilli(ZoneId id) {
166171
return localDateTime.atZone(id).toInstant().toEpochMilli();
167172
}
@@ -236,6 +241,18 @@ public static Timestamp ofEpochMilli(long epochMilli, int nanos) {
236241
.withNano(nanos));
237242
}
238243

244+
public static Timestamp ofEpochMicro(long epochMicro) {
245+
int nanos = (int) ((epochMicro % 1000000) * 1000);
246+
epochMicro -= nanos / 1_000_000;
247+
248+
Instant instant = Instant.ofEpochSecond(
249+
epochMicro / 1_000_000,
250+
nanos
251+
);
252+
253+
return new Timestamp(LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
254+
}
255+
239256
public void setNanos(int nanos) {
240257
localDateTime = localDateTime.withNano(nanos);
241258
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
CREATE EXTERNAL TABLE hive_test(`dt` timestamp) STORED AS AVRO;
2+
INSERT INTO hive_test VALUES (cast('2024-08-09 14:08:26.326107' as timestamp));
3+
SELECT * FROM hive_test;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
PREHOOK: query: CREATE EXTERNAL TABLE hive_test(`dt` timestamp) STORED AS AVRO
2+
PREHOOK: type: CREATETABLE
3+
PREHOOK: Output: database:default
4+
PREHOOK: Output: default@hive_test
5+
POSTHOOK: query: CREATE EXTERNAL TABLE hive_test(`dt` timestamp) STORED AS AVRO
6+
POSTHOOK: type: CREATETABLE
7+
POSTHOOK: Output: database:default
8+
POSTHOOK: Output: default@hive_test
9+
PREHOOK: query: INSERT INTO hive_test VALUES (cast('2024-08-09 14:08:26.326107' as timestamp))
10+
PREHOOK: type: QUERY
11+
PREHOOK: Input: _dummy_database@_dummy_table
12+
PREHOOK: Output: default@hive_test
13+
POSTHOOK: query: INSERT INTO hive_test VALUES (cast('2024-08-09 14:08:26.326107' as timestamp))
14+
POSTHOOK: type: QUERY
15+
POSTHOOK: Input: _dummy_database@_dummy_table
16+
POSTHOOK: Output: default@hive_test
17+
POSTHOOK: Lineage: hive_test.dt SCRIPT []
18+
PREHOOK: query: SELECT * FROM hive_test
19+
PREHOOK: type: QUERY
20+
PREHOOK: Input: default@hive_test
21+
PREHOOK: Output: hdfs://### HDFS PATH ###
22+
POSTHOOK: query: SELECT * FROM hive_test
23+
POSTHOOK: type: QUERY
24+
POSTHOOK: Input: default@hive_test
25+
POSTHOOK: Output: hdfs://### HDFS PATH ###
26+
2024-08-09 14:08:26.326107

serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.TimeZone;
3333

34+
import org.apache.avro.LogicalType;
3435
import org.apache.avro.Schema;
3536
import org.apache.avro.Schema.Type;
3637
import org.apache.avro.generic.GenericData;
@@ -388,11 +389,21 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco
388389
skipProlepticConversion = HiveConf.ConfVars.HIVE_AVRO_PROLEPTIC_GREGORIAN_DEFAULT.defaultBoolVal;
389390
}
390391
}
391-
Timestamp timestamp = TimestampTZUtil.convertTimestampToZone(
392-
Timestamp.ofEpochMilli((Long) datum), ZoneOffset.UTC, convertToTimeZone, legacyConversion);
393-
if (!skipProlepticConversion) {
392+
LogicalType logicalType = recordSchema.getLogicalType();
393+
Timestamp timestamp;
394+
if (logicalType != null && logicalType.getName().equals(AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS)) {
395+
timestamp = Timestamp.ofEpochMicro((Long) datum);
396+
} else {
397+
timestamp = Timestamp.ofEpochMilli((Long) datum);
398+
}
399+
timestamp = TimestampTZUtil.convertTimestampToZone(
400+
timestamp, ZoneOffset.UTC, convertToTimeZone, legacyConversion);
401+
if (!skipProlepticConversion && logicalType.getName().equals(AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS)) {
402+
timestamp = Timestamp.ofEpochMicro(
403+
CalendarUtils.convertTimeToProlepticMicros(timestamp.toEpochMicro()));
404+
} else if (!skipProlepticConversion) {
394405
timestamp = Timestamp.ofEpochMilli(
395-
CalendarUtils.convertTimeToProleptic(timestamp.toEpochMilli()));
406+
CalendarUtils.convertTimeToProleptic(timestamp.toEpochMilli()));
396407
}
397408
return timestamp;
398409
}

serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public class AvroSerDe extends AbstractSerDe {
5757
public static final String CHAR_TYPE_NAME = "char";
5858
public static final String VARCHAR_TYPE_NAME = "varchar";
5959
public static final String DATE_TYPE_NAME = "date";
60-
public static final String TIMESTAMP_TYPE_NAME = "timestamp-millis";
60+
public static final String TIMESTAMP_TYPE_NAME_MILLIS = "timestamp-millis";
61+
public static final String TIMESTAMP_TYPE_NAME_MICROS = "timestamp-micros";
6162
public static final String WRITER_TIME_ZONE = "writer.time.zone";
6263
public static final String WRITER_PROLEPTIC = "writer.proleptic";
6364
public static final String WRITER_ZONE_CONVERSION_LEGACY = "writer.zone.conversion.legacy";

serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
*/
1818
package org.apache.hadoop.hive.serde2.avro;
1919

20+
import java.time.Instant;
21+
import java.time.LocalDateTime;
2022
import java.time.ZoneOffset;
2123
import java.util.LinkedHashMap;
2224
import java.util.List;
2325
import java.util.Map;
2426
import java.util.Set;
2527
import java.util.TimeZone;
2628

29+
import org.apache.avro.LogicalType;
2730
import org.apache.avro.Schema;
2831
import org.apache.avro.Schema.Field;
2932
import org.apache.avro.Schema.Type;
@@ -231,10 +234,18 @@ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fi
231234
case TIMESTAMP:
232235
Timestamp timestamp =
233236
((TimestampObjectInspector) fieldOI).getPrimitiveJavaObject(structFieldData);
237+
LogicalType logicalType = schema.getLogicalType();
238+
if (logicalType != null && logicalType.getName().equals(AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS)) {
239+
long micros = defaultProleptic ? timestamp.toEpochMicro() :
240+
CalendarUtils.convertTimeToProlepticMicros(timestamp.toEpochMicro());
241+
timestamp = TimestampTZUtil.convertTimestampToZone(
242+
Timestamp.ofEpochMicro(micros), TimeZone.getDefault().toZoneId(), ZoneOffset.UTC, legacyConversion);
243+
return timestamp.toEpochMicro();
244+
}
234245
long millis = defaultProleptic ? timestamp.toEpochMilli() :
235-
CalendarUtils.convertTimeToHybrid(timestamp.toEpochMilli());
246+
CalendarUtils.convertTimeToHybrid(timestamp.toEpochMilli());
236247
timestamp = TimestampTZUtil.convertTimestampToZone(
237-
Timestamp.ofEpochMilli(millis), TimeZone.getDefault().toZoneId(), ZoneOffset.UTC, legacyConversion);
248+
Timestamp.ofEpochMilli(millis), TimeZone.getDefault().toZoneId(), ZoneOffset.UTC, legacyConversion);
238249
return timestamp.toEpochMilli();
239250
case UNKNOWN:
240251
throw new AvroSerdeException("Received UNKNOWN primitive category.");

serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ public static TypeInfo generateTypeInfo(Schema schema,
180180
}
181181

182182
if (type == LONG &&
183-
AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
183+
(AvroSerDe.TIMESTAMP_TYPE_NAME_MILLIS.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)) ||
184+
AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))) {
184185
return TypeInfoFactory.timestampTypeInfo;
185186
}
186187

serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private Schema createAvroPrimitive(TypeInfo typeInfo) {
159159
case TIMESTAMP:
160160
schema = AvroSerdeUtils.getSchemaFor("{" +
161161
"\"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," +
162-
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}");
162+
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS + "\"}");
163163
break;
164164
case VOID:
165165
schema = Schema.create(Schema.Type.NULL);

serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public class TestAvroObjectInspectorGenerator {
227227
" \"fields\" : [\n" +
228228
" {\"name\":\"timestampField\", " +
229229
" \"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\", " +
230-
" \"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}" +
230+
" \"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME_MILLIS + "\"}" +
231231
" ]\n" +
232232
"}";
233233

serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void canSerializeDoubles() throws SerDeException, IOException {
125125
public void canSerializeTimestamps() throws SerDeException, IOException {
126126
singleFieldTest("timestamp1", Timestamp.valueOf("2011-01-01 00:00:00").toEpochMilli(),
127127
"\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," +
128-
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"");
128+
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME_MILLIS + "\"");
129129
}
130130

131131
@Test

storage-api/src/java/org/apache/hadoop/hive/common/type/CalendarUtils.java

+42
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,27 @@ public static long convertTimeToProleptic(long hybrid) {
157157
return proleptic;
158158
}
159159

160+
/**
161+
* Convert epoch microseconds from the hybrid Julian/Gregorian calendar to the
162+
* proleptic Gregorian.
163+
* @param hybridMicros Microseconds of epoch in the hybrid Julian/Gregorian
164+
* @return Microseconds of epoch in the proleptic Gregorian
165+
*/
166+
public static long convertTimeToProlepticMicros(long hybridMicros) {
167+
long prolepticMicros = hybridMicros;
168+
long hybridMillis = hybridMicros / 1_000L; // Convert micros to millis
169+
170+
if (hybridMillis < SWITCHOVER_MILLIS) {
171+
String dateStr = HYBRID_TIME_FORMAT.get().format(new Date(hybridMillis));
172+
try {
173+
prolepticMicros = PROLEPTIC_TIME_FORMAT.get().parse(dateStr).getTime() * 1_000L; // Convert millis back to micros
174+
} catch (ParseException e) {
175+
throw new IllegalArgumentException("Can't parse " + dateStr, e);
176+
}
177+
}
178+
return prolepticMicros;
179+
}
180+
160181
/**
161182
* Convert epoch millis from the proleptic Gregorian calendar to the hybrid
162183
* Julian/Gregorian.
@@ -176,6 +197,27 @@ public static long convertTimeToHybrid(long proleptic) {
176197
return hybrid;
177198
}
178199

200+
/**
201+
* Convert epoch microseconds from the proleptic Gregorian calendar to the
202+
* hybrid Julian/Gregorian.
203+
* @param prolepticMicros Microseconds of epoch in the proleptic Gregorian
204+
* @return Microseconds of epoch in the hybrid Julian/Gregorian
205+
*/
206+
public static long convertTimeToHybridMicros(long prolepticMicros) {
207+
long hybridMicros = prolepticMicros;
208+
long prolepticMillis = prolepticMicros / 1_000L; // Convert micros to millis
209+
210+
if (prolepticMillis < SWITCHOVER_MILLIS) {
211+
String dateStr = PROLEPTIC_TIME_FORMAT.get().format(new Date(prolepticMillis));
212+
try {
213+
hybridMicros = HYBRID_TIME_FORMAT.get().parse(dateStr).getTime() * 1_000L; // Convert millis back to micros
214+
} catch (ParseException e) {
215+
throw new IllegalArgumentException("Can't parse " + dateStr, e);
216+
}
217+
}
218+
return hybridMicros;
219+
}
220+
179221
/**
180222
*
181223
* Formats epoch day to date according to proleptic or hybrid calendar

0 commit comments

Comments
 (0)