Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-20889: Support timestamp-micros in AvroSerDe #5679

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public long toEpochMilli() {
return localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
}

public long toEpochMicro() {
return localDateTime.toEpochSecond(ZoneOffset.UTC) * 1_000_000
+ localDateTime.getNano() / 1000;
}

public long toEpochMilli(ZoneId id) {
return localDateTime.atZone(id).toInstant().toEpochMilli();
}
Expand Down Expand Up @@ -236,6 +241,18 @@ public static Timestamp ofEpochMilli(long epochMilli, int nanos) {
.withNano(nanos));
}

public static Timestamp ofEpochMicro(long epochMicro) {
int nanos = (int) ((epochMicro % 1000000) * 1000);
epochMicro -= nanos / 1_000_000;

Instant instant = Instant.ofEpochSecond(
epochMicro / 1_000_000,
nanos
);

return new Timestamp(LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
}

public void setNanos(int nanos) {
localDateTime = localDateTime.withNano(nanos);
}
Expand Down
3 changes: 3 additions & 0 deletions ql/src/test/queries/clientpositive/avro_timestamp_micros.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE EXTERNAL TABLE hive_test(`dt` timestamp) STORED AS AVRO;
INSERT INTO hive_test VALUES (cast('2024-08-09 14:08:26.326107' as timestamp));
SELECT * FROM hive_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
PREHOOK: query: CREATE EXTERNAL TABLE hive_test(`dt` timestamp) STORED AS AVRO
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@hive_test
POSTHOOK: query: CREATE EXTERNAL TABLE hive_test(`dt` timestamp) STORED AS AVRO
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@hive_test
PREHOOK: query: INSERT INTO hive_test VALUES (cast('2024-08-09 14:08:26.326107' as timestamp))
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@hive_test
POSTHOOK: query: INSERT INTO hive_test VALUES (cast('2024-08-09 14:08:26.326107' as timestamp))
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@hive_test
POSTHOOK: Lineage: hive_test.dt SCRIPT []
PREHOOK: query: SELECT * FROM hive_test
PREHOOK: type: QUERY
PREHOOK: Input: default@hive_test
#### A masked pattern was here ####
POSTHOOK: query: SELECT * FROM hive_test
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hive_test
#### A masked pattern was here ####
2024-08-09 14:08:26.326107
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.TimeZone;

import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -388,11 +389,21 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco
skipProlepticConversion = HiveConf.ConfVars.HIVE_AVRO_PROLEPTIC_GREGORIAN_DEFAULT.defaultBoolVal;
}
}
Timestamp timestamp = TimestampTZUtil.convertTimestampToZone(
Timestamp.ofEpochMilli((Long) datum), ZoneOffset.UTC, convertToTimeZone, legacyConversion);
if (!skipProlepticConversion) {
LogicalType logicalType = fileSchema.getLogicalType();
Timestamp timestamp;
if (logicalType != null && logicalType.getName().equals(AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS)) {
timestamp = Timestamp.ofEpochMicro((Long) datum);
} else {
timestamp = Timestamp.ofEpochMilli((Long) datum);
}
timestamp = TimestampTZUtil.convertTimestampToZone(
timestamp, ZoneOffset.UTC, convertToTimeZone, legacyConversion);
if (!skipProlepticConversion && logicalType != null && logicalType.getName().equals(AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS)) {
timestamp = Timestamp.ofEpochMicro(
CalendarUtils.convertTimeToProlepticMicros(timestamp.toEpochMicro()));
} else if (!skipProlepticConversion) {
timestamp = Timestamp.ofEpochMilli(
CalendarUtils.convertTimeToProleptic(timestamp.toEpochMilli()));
CalendarUtils.convertTimeToProleptic(timestamp.toEpochMilli()));
}
return timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class AvroSerDe extends AbstractSerDe {
public static final String CHAR_TYPE_NAME = "char";
public static final String VARCHAR_TYPE_NAME = "varchar";
public static final String DATE_TYPE_NAME = "date";
public static final String TIMESTAMP_TYPE_NAME = "timestamp-millis";
public static final String TIMESTAMP_TYPE_NAME_MILLIS = "timestamp-millis";
public static final String TIMESTAMP_TYPE_NAME_MICROS = "timestamp-micros";
public static final String WRITER_TIME_ZONE = "writer.time.zone";
public static final String WRITER_PROLEPTIC = "writer.proleptic";
public static final String WRITER_ZONE_CONVERSION_LEGACY = "writer.zone.conversion.legacy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.TimeZone;

import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
Expand Down Expand Up @@ -231,10 +232,18 @@ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fi
case TIMESTAMP:
Timestamp timestamp =
((TimestampObjectInspector) fieldOI).getPrimitiveJavaObject(structFieldData);
LogicalType logicalType = schema.getLogicalType();
if (logicalType != null && logicalType.getName().equals(AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS)) {
long micros = defaultProleptic ? timestamp.toEpochMicro() :
CalendarUtils.convertTimeToHybridMicros(timestamp.toEpochMicro());
timestamp = TimestampTZUtil.convertTimestampToZone(
Timestamp.ofEpochMicro(micros), TimeZone.getDefault().toZoneId(), ZoneOffset.UTC, legacyConversion);
return timestamp.toEpochMicro();
}
long millis = defaultProleptic ? timestamp.toEpochMilli() :
CalendarUtils.convertTimeToHybrid(timestamp.toEpochMilli());
CalendarUtils.convertTimeToHybrid(timestamp.toEpochMilli());
timestamp = TimestampTZUtil.convertTimestampToZone(
Timestamp.ofEpochMilli(millis), TimeZone.getDefault().toZoneId(), ZoneOffset.UTC, legacyConversion);
Timestamp.ofEpochMilli(millis), TimeZone.getDefault().toZoneId(), ZoneOffset.UTC, legacyConversion);
return timestamp.toEpochMilli();
case UNKNOWN:
throw new AvroSerdeException("Received UNKNOWN primitive category.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public static TypeInfo generateTypeInfo(Schema schema,
}

if (type == LONG &&
AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
(AvroSerDe.TIMESTAMP_TYPE_NAME_MILLIS.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)) ||
AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))) {
return TypeInfoFactory.timestampTypeInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private Schema createAvroPrimitive(TypeInfo typeInfo) {
case TIMESTAMP:
schema = AvroSerdeUtils.getSchemaFor("{" +
"\"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," +
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}");
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME_MICROS + "\"}");
break;
case VOID:
schema = Schema.create(Schema.Type.NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public class TestAvroObjectInspectorGenerator {
" \"fields\" : [\n" +
" {\"name\":\"timestampField\", " +
" \"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\", " +
" \"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}" +
" \"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME_MILLIS + "\"}" +
" ]\n" +
"}";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void canSerializeDoubles() throws SerDeException, IOException {
public void canSerializeTimestamps() throws SerDeException, IOException {
singleFieldTest("timestamp1", Timestamp.valueOf("2011-01-01 00:00:00").toEpochMilli(),
"\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," +
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"");
"\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME_MILLIS + "\"");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void createAvroDateSchema() {
public void createAvroTimestampSchema() {
final String specificSchema = "{" +
"\"type\":\"long\"," +
"\"logicalType\":\"timestamp-millis\"}";
"\"logicalType\":\"timestamp-micros\"}";
String expectedSchema = genSchema(specificSchema);

Assert.assertEquals("Test for timestamp in avro schema failed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,27 @@ public static long convertTimeToProleptic(long hybrid) {
return proleptic;
}

/**
* Convert epoch microseconds from the hybrid Julian/Gregorian calendar to the
* proleptic Gregorian.
* @param hybridMicros Microseconds of epoch in the hybrid Julian/Gregorian
* @return Microseconds of epoch in the proleptic Gregorian
*/
public static long convertTimeToProlepticMicros(long hybridMicros) {
long prolepticMicros = hybridMicros;
long hybridMillis = hybridMicros / 1_000L; // Convert micros to millis

if (hybridMillis < SWITCHOVER_MILLIS) {
String dateStr = HYBRID_TIME_FORMAT.get().format(new Date(hybridMillis));
try {
prolepticMicros = PROLEPTIC_TIME_FORMAT.get().parse(dateStr).getTime() * 1_000L; // Convert millis back to micros
} catch (ParseException e) {
throw new IllegalArgumentException("Can't parse " + dateStr, e);
}
}
return prolepticMicros;
}

/**
* Convert epoch millis from the proleptic Gregorian calendar to the hybrid
* Julian/Gregorian.
Expand All @@ -176,6 +197,27 @@ public static long convertTimeToHybrid(long proleptic) {
return hybrid;
}

/**
* Convert epoch microseconds from the proleptic Gregorian calendar to the
* hybrid Julian/Gregorian.
* @param prolepticMicros Microseconds of epoch in the proleptic Gregorian
* @return Microseconds of epoch in the hybrid Julian/Gregorian
*/
public static long convertTimeToHybridMicros(long prolepticMicros) {
long hybridMicros = prolepticMicros;
long prolepticMillis = prolepticMicros / 1_000L; // Convert micros to millis

if (prolepticMillis < SWITCHOVER_MILLIS) {
String dateStr = PROLEPTIC_TIME_FORMAT.get().format(new Date(prolepticMillis));
try {
hybridMicros = HYBRID_TIME_FORMAT.get().parse(dateStr).getTime() * 1_000L; // Convert millis back to micros
} catch (ParseException e) {
throw new IllegalArgumentException("Can't parse " + dateStr, e);
}
}
return hybridMicros;
}

/**
*
* Formats epoch day to date according to proleptic or hybrid calendar
Expand Down
Loading