forked from Xeli/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-10127] [core] Add TypeInformation for java.time.Instant.
This closes apache#6549.
- Loading branch information
Showing
8 changed files
with
354 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
106 changes: 106 additions & 0 deletions
106
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantComparator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.common.typeutils.base; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.api.common.typeutils.TypeComparator; | ||
import org.apache.flink.core.memory.DataInputView; | ||
import org.apache.flink.core.memory.MemorySegment; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
|
||
/** | ||
* Comparator for comparing Java Instant. | ||
*/ | ||
@Internal | ||
public final class InstantComparator extends BasicTypeComparator<Instant>{ | ||
|
||
private static final long serialVersionUID = 1L; | ||
private static final long SECONDS_MIN_VALUE = Instant.MIN.getEpochSecond(); | ||
|
||
public InstantComparator(boolean ascending) { | ||
super(ascending); | ||
} | ||
|
||
@Override | ||
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { | ||
final long lSeconds = firstSource.readLong(); | ||
final long rSeconds = secondSource.readLong(); | ||
final int comp; | ||
if (lSeconds == rSeconds) { | ||
final int lNanos = firstSource.readInt(); | ||
final int rNanos = secondSource.readInt(); | ||
comp = (lNanos < rNanos ? -1 : (lNanos == rNanos ? 0 : 1)); | ||
} else { | ||
comp = lSeconds < rSeconds ? -1 : 1; | ||
} | ||
return ascendingComparison ? comp : -comp; | ||
} | ||
|
||
@Override | ||
public boolean supportsNormalizedKey() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public int getNormalizeKeyLen() { | ||
return InstantSerializer.SECONDS_BYTES + InstantSerializer.NANOS_BYTES; | ||
} | ||
|
||
@Override | ||
public boolean isNormalizedKeyPrefixOnly(int keyBytes) { | ||
return keyBytes < getNormalizeKeyLen(); | ||
} | ||
|
||
@Override | ||
public void putNormalizedKey(Instant record, MemorySegment target, int offset, int numBytes) { | ||
final int secondsBytes = InstantSerializer.SECONDS_BYTES; | ||
final long normalizedSeconds = record.getEpochSecond() - SECONDS_MIN_VALUE; | ||
if (numBytes >= secondsBytes) { | ||
target.putLongBigEndian(offset, normalizedSeconds); | ||
offset += secondsBytes; | ||
numBytes -= secondsBytes; | ||
|
||
final int nanosBytes = InstantSerializer.NANOS_BYTES; | ||
if (numBytes >= nanosBytes) { | ||
target.putIntBigEndian(offset, record.getNano()); | ||
offset += nanosBytes; | ||
numBytes -= nanosBytes; | ||
for (int i = 0; i < numBytes; i++) { | ||
target.put(offset + i, (byte) 0); | ||
} | ||
} else { | ||
final int nanos = record.getNano(); | ||
for (int i = 0; i < numBytes; i++) { | ||
target.put(offset + i, (byte) (nanos >>> ((3 - i) << 3))); | ||
} | ||
} | ||
} else { | ||
for (int i = 0; i < numBytes; i++) { | ||
target.put(offset + i, (byte) (normalizedSeconds >>> ((7 - i) << 3))); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public TypeComparator<Instant> duplicate() { | ||
return new InstantComparator(ascendingComparison); | ||
} | ||
} |
105 changes: 105 additions & 0 deletions
105
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.common.typeutils.base; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.core.memory.DataInputView; | ||
import org.apache.flink.core.memory.DataOutputView; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
|
||
/** | ||
* Serializer for serializing/deserializing Instant values including null values. | ||
*/ | ||
@Internal | ||
public final class InstantSerializer extends TypeSerializerSingleton<Instant> { | ||
static final int SECONDS_BYTES = Long.BYTES; | ||
static final int NANOS_BYTES = Integer.BYTES; | ||
|
||
private static final long NULL_SECONDS = Long.MIN_VALUE; | ||
//Nanos of normal Instant is between 0 and 999,999,999, | ||
//therefore we can use Integer.MIN_VALUE to represent NULL Instant | ||
//regardless supported range of seconds | ||
private static final int NULL_NANOS = Integer.MIN_VALUE; | ||
|
||
public static final InstantSerializer INSTANCE = new InstantSerializer(); | ||
|
||
@Override | ||
public boolean isImmutableType() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public Instant createInstance() { | ||
return Instant.EPOCH; | ||
} | ||
|
||
@Override | ||
public Instant copy(Instant from) { | ||
return from; | ||
} | ||
|
||
@Override | ||
public Instant copy(Instant from, Instant reuse) { | ||
return from; | ||
} | ||
|
||
@Override | ||
public int getLength() { | ||
return SECONDS_BYTES + NANOS_BYTES; | ||
} | ||
|
||
@Override | ||
public void serialize(Instant record, DataOutputView target) throws IOException { | ||
if (record == null) { | ||
target.writeLong(NULL_SECONDS); | ||
target.writeInt(NULL_NANOS); | ||
} else { | ||
target.writeLong(record.getEpochSecond()); | ||
target.writeInt(record.getNano()); | ||
} | ||
} | ||
|
||
@Override | ||
public Instant deserialize(DataInputView source) throws IOException { | ||
final long seconds = source.readLong(); | ||
final int nanos = source.readInt(); | ||
if (seconds == NULL_SECONDS && nanos == NULL_NANOS) { | ||
return null; | ||
} | ||
return Instant.ofEpochSecond(seconds, nanos); | ||
} | ||
|
||
@Override | ||
public Instant deserialize(Instant reuse, DataInputView source) throws IOException { | ||
return deserialize(source); | ||
} | ||
|
||
@Override | ||
public void copy(DataInputView source, DataOutputView target) throws IOException { | ||
target.writeLong(source.readLong()); | ||
target.writeInt(source.readInt()); | ||
} | ||
|
||
@Override | ||
public boolean canEqual(Object obj) { | ||
return obj instanceof InstantSerializer; | ||
} | ||
} |
56 changes: 56 additions & 0 deletions
56
...-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantComparatorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.common.typeutils.base; | ||
|
||
import org.apache.flink.api.common.typeutils.ComparatorTestBase; | ||
import org.apache.flink.api.common.typeutils.TypeComparator; | ||
import org.apache.flink.api.common.typeutils.TypeSerializer; | ||
|
||
import java.time.Instant; | ||
|
||
/** | ||
* A test for the {@link InstantComparator}. | ||
*/ | ||
public class InstantComparatorTest extends ComparatorTestBase<Instant> { | ||
|
||
@Override | ||
protected TypeComparator<Instant> createComparator(boolean ascending) { | ||
return new InstantComparator(ascending); | ||
} | ||
|
||
@Override | ||
protected TypeSerializer<Instant> createSerializer() { | ||
return new InstantSerializer(); | ||
} | ||
|
||
@Override | ||
protected Instant[] getSortedTestData() { | ||
return new Instant[] { | ||
Instant.EPOCH, | ||
Instant.parse("1970-01-01T00:00:00.001Z"), | ||
Instant.parse("1990-10-14T02:42:25.123Z"), | ||
Instant.parse("1990-10-14T02:42:25.123000001Z"), | ||
Instant.parse("1990-10-14T02:42:25.123000002Z"), | ||
Instant.parse("2013-08-12T14:15:59.478Z"), | ||
Instant.parse("2013-08-12T14:15:59.479Z"), | ||
Instant.parse("2040-05-12T18:00:45.999Z"), | ||
Instant.MAX | ||
}; | ||
} | ||
} |
67 changes: 67 additions & 0 deletions
67
...-core/src/test/java/org/apache/flink/api/common/typeutils/base/InstantSerializerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.common.typeutils.base; | ||
|
||
import org.apache.flink.api.common.typeutils.SerializerTestBase; | ||
import org.apache.flink.api.common.typeutils.TypeSerializer; | ||
|
||
import java.time.Instant; | ||
import java.util.Random; | ||
|
||
/** | ||
* A test for the {@link InstantSerializer}. | ||
*/ | ||
public class InstantSerializerTest extends SerializerTestBase<Instant> { | ||
@Override | ||
protected TypeSerializer<Instant> createSerializer() { | ||
return new InstantSerializer(); | ||
} | ||
|
||
@Override | ||
protected int getLength() { | ||
return 12; | ||
} | ||
|
||
@Override | ||
protected Class<Instant> getTypeClass() { | ||
return Instant.class; | ||
} | ||
|
||
|
||
private static long rndSeconds(Random rnd) { | ||
return (long) (Instant.MIN.getEpochSecond() | ||
+ rnd.nextDouble() * (Instant.MAX.getEpochSecond() - Instant.MIN.getEpochSecond())); | ||
} | ||
|
||
private static int rndNanos(Random rnd) { | ||
return (int) (rnd.nextDouble() * 999999999); | ||
} | ||
|
||
@Override | ||
protected Instant[] getTestData() { | ||
final Random rnd = new Random(874597969123412341L); | ||
|
||
return new Instant[] { | ||
Instant.EPOCH, Instant.MIN, Instant.MAX, | ||
Instant.ofEpochSecond(rndSeconds(rnd), rndNanos(rnd)), | ||
Instant.ofEpochSecond(1534135584,949495), | ||
Instant.ofEpochSecond(56090783) | ||
}; | ||
} | ||
} |
Oops, something went wrong.