Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
Expand All @@ -35,8 +36,6 @@
* Convert Object to java.time.LocalDateTime using instanceof evaluation and optional format pattern for DateTimeFormatter
*/
class ObjectLocalDateTimeFieldConverter implements FieldConverter<Object, LocalDateTime> {
private static final long YEAR_TEN_THOUSAND = 253_402_300_800_000L;

private static final TemporalQuery<LocalDateTime> LOCAL_DATE_TIME_TEMPORAL_QUERY = new LocalDateTimeQuery();

/**
Expand Down Expand Up @@ -65,13 +64,14 @@ public LocalDateTime convertField(final Object field, final Optional<String> pat
final Instant instant = Instant.ofEpochMilli(date.getTime());
return ofInstant(instant);
}
case BigDecimal bd -> {
return toLocalDateTime(bd);
}
case Long value -> {
return toLocalDateTime(value);
}
case final Number number -> {
// If value is a floating point number, we consider it as seconds since epoch plus a decimal part for fractions of a second.
if (field instanceof Double || field instanceof Float) {
return toLocalDateTime(number.doubleValue());
}

return toLocalDateTime(number.longValue());
return toLocalDateTime(number.toString());
}
case String ignored -> {
final String string = field.toString().trim();
Expand Down Expand Up @@ -99,44 +99,30 @@ public LocalDateTime convertField(final Object field, final Optional<String> pat

private LocalDateTime tryParseAsNumber(final String value, final String fieldName) {
try {
// If decimal, treat as a double and convert to seconds and nanoseconds.
if (value.contains(".")) {
final double number = Double.parseDouble(value);
return toLocalDateTime(number);
}

// attempt to parse as a long value
final long number = Long.parseLong(value);
return toLocalDateTime(number);
return toLocalDateTime(value);
} catch (final NumberFormatException e) {
throw new FieldConversionException(LocalDateTime.class, value, fieldName, e);
}
}

private LocalDateTime toLocalDateTime(final double secondsSinceEpoch) {
// Determine the number of micros past the second by subtracting the number of seconds from the decimal value and multiplying by 1 million.
final double micros = 1_000_000 * (secondsSinceEpoch - (long) secondsSinceEpoch);
// Convert micros to nanos. Note that we perform this as a separate operation, rather than multiplying by 1_000,000,000 in order to avoid
// issues that occur with rounding at high precision.
final long nanos = (long) micros * 1000L;
private LocalDateTime toLocalDateTime(final BigDecimal epochMilliseconds) {
final BigDecimal[] parts = epochMilliseconds.divideAndRemainder(BigDecimal.ONE);

return toLocalDateTime((long) secondsSinceEpoch, nanos);
}
final long wholeMillis = parts[0].longValueExact();
final BigDecimal fractionalMillis = parts[1];

final long nanos = fractionalMillis.multiply(BigDecimal.valueOf(1_000_000)).longValue();

private LocalDateTime toLocalDateTime(final long epochSeconds, final long nanosPastSecond) {
final Instant instant = Instant.ofEpochSecond(epochSeconds).plusNanos(nanosPastSecond);
final Instant instant = Instant.ofEpochMilli(wholeMillis).plusNanos(nanos);
return ofInstant(instant);
}

private LocalDateTime toLocalDateTime(final long value) {
if (value > YEAR_TEN_THOUSAND) {
// Value is too large. Assume microseconds instead of milliseconds.
final Instant microsInstant = Instant.ofEpochSecond(value / 1_000_000, (value % 1_000_000) * 1_000);
return ofInstant(microsInstant);
}

final Instant instant = Instant.ofEpochMilli(value);
private LocalDateTime toLocalDateTime(final String epochMilliseconds) {
return toLocalDateTime(new BigDecimal(epochMilliseconds));
}

private LocalDateTime toLocalDateTime(final Long epochMilliseconds) {
final Instant instant = Instant.ofEpochMilli(epochMilliseconds);
return ofInstant(instant);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -28,49 +29,75 @@

public class TestObjectLocalDateTimeFieldConverter {
private static final String FIELD_NAME = "test";

private static final long MILLIS_TIMESTAMP_LONG = 1707238288351L;
private static final long MICROS_TIMESTAMP_LONG = 1707238288351567L;
private static final String MICROS_TIMESTAMP_STRING = Long.toString(MICROS_TIMESTAMP_LONG);
private static final double MICROS_TIMESTAMP_DOUBLE = ((double) MICROS_TIMESTAMP_LONG) / 1000000D;
private static final long NANOS_AFTER_SECOND = 351567000L;
private static final long NANO_SECONDS_FROM_TIMESTAMP = (MILLIS_TIMESTAMP_LONG % 1000) * 1_000_000L;
private static final long EXTRA_NANO_SECONDS = 351567L;

private static final BigDecimal MILLIS_TIMESTAMP = BigDecimal.valueOf(MILLIS_TIMESTAMP_LONG);
private static final BigDecimal MILLIS_TIMESTAMP_FRACTIONAL = MILLIS_TIMESTAMP.add(BigDecimal.valueOf(EXTRA_NANO_SECONDS, 6));

private static final String MILLIS_TIMESTAMP_STRING = Long.toString(MILLIS_TIMESTAMP_LONG);
private static final String MILLIS_TIMESTAMP_FRACTIONAL_STRING = MILLIS_TIMESTAMP_FRACTIONAL.toString();

private static final Instant INSTANT_MILLIS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG);
// Create an instant to represent the same time as the microsecond precision timestamp. We add nanoseconds after second but then have to subtract the milliseconds after the second that are already
// present in the MILLIS_TIMESTAMP_LONG value.
private static final Instant INSTANT_MICROS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(NANOS_AFTER_SECOND).minusMillis(MILLIS_TIMESTAMP_LONG % 1000);
private static final Instant INSTANT_MICROS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(EXTRA_NANO_SECONDS);

private static final LocalDateTime LOCAL_DATE_TIME_MILLIS_PRECISION = LocalDateTime.ofInstant(INSTANT_MILLIS_PRECISION, ZoneId.systemDefault());
private static final LocalDateTime LOCAL_DATE_TIME_MICROS_PRECISION = LocalDateTime.ofInstant(INSTANT_MICROS_PRECISION, ZoneId.systemDefault());

private final ObjectLocalDateTimeFieldConverter converter = new ObjectLocalDateTimeFieldConverter();


@Test
public void testConvertTimestampMillis() {
public void testConvertTimestampLong() {
final LocalDateTime result = converter.convertField(MILLIS_TIMESTAMP_LONG, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MILLIS_PRECISION, result);
}

@Test
public void testConvertTimestampMicros() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_LONG, Optional.empty(), FIELD_NAME);
assertEquals(MILLIS_TIMESTAMP_LONG, result.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
public void testConvertTimestampString() {
final LocalDateTime result = converter.convertField(MILLIS_TIMESTAMP_STRING, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MILLIS_PRECISION, result);
}

final Instant resultInstant = result.atZone(ZoneId.systemDefault()).toInstant();
assertEquals(NANOS_AFTER_SECOND, resultInstant.getNano());
@Test
public void testConvertTimestampBigDecimal() {
final LocalDateTime result = converter.convertField(MILLIS_TIMESTAMP_FRACTIONAL, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
assertEquals(NANO_SECONDS_FROM_TIMESTAMP + EXTRA_NANO_SECONDS, result.getNano());
}

@Test
public void testDoubleAsEpochSeconds() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_DOUBLE, Optional.empty(), FIELD_NAME);
public void testConvertTimestampStringPrecise() {
final LocalDateTime result = converter.convertField(MILLIS_TIMESTAMP_FRACTIONAL_STRING, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
assertEquals(NANOS_AFTER_SECOND, result.getNano(), 1D);
}

@Test
public void testDoubleAsEpochSecondsAsString() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_STRING, Optional.empty(), FIELD_NAME);
public void testConvertTimestampDouble() {
// Less precise timestamp than other tests as double is less precise than BigDecimal
final double timestamp = 1764673335503.607;

final BigDecimal bd = new BigDecimal(Double.toString(timestamp));
final BigDecimal[] parts = bd.divideAndRemainder(BigDecimal.ONE);

final long millis = parts[0].longValueExact();
final long nanos = parts[1].multiply(BigDecimal.valueOf(1_000_000)).longValue();

final Instant instant = Instant.ofEpochMilli(millis).plusNanos(nanos);
final LocalDateTime date = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
final LocalDateTime result = converter.convertField(timestamp, Optional.empty(), FIELD_NAME);

assertEquals(date, result);
}

@Test
public void testConvertTimestampOversized() {
// Ensure we truncate extra fractional digits beyond what Java time can represent
final String timestamp = MILLIS_TIMESTAMP_FRACTIONAL_STRING + "123456";
final LocalDateTime result = converter.convertField(timestamp, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
final double expectedNanos = 351567000L;
assertEquals(expectedNanos, result.getNano(), 1D);
}

@Test
Expand All @@ -81,8 +108,14 @@ public void testWithDateFormatMillisPrecision() {
}

@Test
public void testWithDateFormatMicrosecondPrecision() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_LONG, Optional.of("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"), FIELD_NAME);
public void testWithDateFormatMillisecond() {
final LocalDateTime result = converter.convertField(MILLIS_TIMESTAMP_STRING, Optional.of("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MILLIS_PRECISION, result);
}

@Test
public void testWithDateFormatMillisecondPrecision() {
final LocalDateTime result = converter.convertField(MILLIS_TIMESTAMP_FRACTIONAL_STRING, Optional.of("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.postgresql.PostgreSQLContainer;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
Expand All @@ -50,11 +51,16 @@
public class PutDatabaseRecordIT {

private final long MILLIS_TIMESTAMP_LONG = 1707238288351L;
private final long MICROS_TIMESTAMP_LONG = 1707238288351567L;
private final String MICROS_TIMESTAMP_FORMATTED = "2024-02-06 11:51:28.351567";
private final double MICROS_TIMESTAMP_DOUBLE = ((double) MICROS_TIMESTAMP_LONG) / 1000000D;
private final long NANOS_AFTER_SECOND = 351567000L;
private final Instant INSTANT_MICROS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(NANOS_AFTER_SECOND).minusMillis(MILLIS_TIMESTAMP_LONG % 1000);

private final long EXTRA_NANO_SECONDS = 351000L; // Max precision PostgreSQL supports

private final BigDecimal MILLIS_TIMESTAMP = BigDecimal.valueOf(MILLIS_TIMESTAMP_LONG);
private final BigDecimal MILLIS_TIMESTAMP_FRACTIONAL = MILLIS_TIMESTAMP.add(BigDecimal.valueOf(EXTRA_NANO_SECONDS, 6));

private final Instant INSTANT_MILLIS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG);
private final Instant INSTANT_MICROS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(EXTRA_NANO_SECONDS);


private static final String SIMPLE_INPUT_RECORD = """
{
Expand Down Expand Up @@ -205,31 +211,20 @@ public void testWithStringTimestampUsingMicros() throws SQLException {
}

@Test
public void testWithNumericTimestampUsingMicros() throws SQLException {
runner.enqueue(createJson(MICROS_TIMESTAMP_LONG));
public void testWithNumericTimestampFullMilliseconds() throws SQLException {
runner.enqueue(createJson(MILLIS_TIMESTAMP_LONG));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);

final Map<String, Object> results = getResults();
final Timestamp lastTransactionTime = (Timestamp) results.get("lasttransactiontime");
assertEquals(INSTANT_MICROS_PRECISION, lastTransactionTime.toInstant());
assertEquals(INSTANT_MILLIS_PRECISION, lastTransactionTime.toInstant());
}


@Test
public void testWithDecimalTimestampUsingMicros() throws SQLException {
runner.enqueue(createJson(Double.toString(MICROS_TIMESTAMP_DOUBLE)));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);

final Map<String, Object> results = getResults();
final Timestamp lastTransactionTime = (Timestamp) results.get("lasttransactiontime");
assertEquals(INSTANT_MICROS_PRECISION, lastTransactionTime.toInstant());
}

@Test
public void testWithDecimalTimestampUsingMicrosAsString() throws SQLException {
runner.enqueue(createJson(Double.toString(MICROS_TIMESTAMP_DOUBLE)));
public void testWithStringTimestampFractionalMilliseconds() throws SQLException {
runner.enqueue(createJson(MILLIS_TIMESTAMP_FRACTIONAL.toString()));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);

Expand Down
Loading