Skip to content

Fix reading specifying format for date partition projection in Hive connector #25817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -47,16 +47,19 @@
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_RANGE;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.getProjectionPropertyRequiredValue;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.getProjectionPropertyValue;
import static io.trino.plugin.hive.util.HiveWriteUtils.toDatePartitionValue;
import static io.trino.plugin.hive.util.HiveWriteUtils.toTimestampPartitionValue;
import static io.trino.spi.predicate.Domain.singleValue;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static java.time.temporal.ChronoUnit.DAYS;
import static java.time.temporal.ChronoUnit.HOURS;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.MONTHS;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.nonNull;
import static java.util.Objects.requireNonNull;
import static java.util.TimeZone.getTimeZone;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

final class DateProjection
Expand All @@ -71,9 +74,10 @@ final class DateProjection
private static final Pattern DATE_RANGE_BOUND_EXPRESSION_PATTERN = Pattern.compile("^\\s*NOW\\s*(([+-])\\s*([0-9]+)\\s*(DAY|HOUR|MINUTE|SECOND)S?\\s*)?$");

private final String columnName;
private final DateFormat dateFormat;
private final Supplier<Instant> leftBound;
private final Supplier<Instant> rightBound;
private final Type columnType;
private final DateTimeFormatter dateFormat;
private final Instant leftBound;
private final Instant rightBound;
private final int interval;
private final ChronoUnit intervalUnit;

Expand All @@ -86,6 +90,7 @@ public DateProjection(String columnName, Type columnType, Map<String, Object> co
}

this.columnName = requireNonNull(columnName, "columnName is null");
this.columnType = requireNonNull(columnType, "columnType is null");

String dateFormatPattern = getProjectionPropertyRequiredValue(
columnName,
Expand All @@ -104,14 +109,11 @@ public DateProjection(String columnName, Type columnType, Map<String, Object> co
throw invalidRangeProperty(columnName, dateFormatPattern, Optional.empty());
}

SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
dateFormat.setLenient(false);
dateFormat.setTimeZone(getTimeZone(UTC_TIME_ZONE_ID));
this.dateFormat = requireNonNull(dateFormat, "dateFormatPattern is null");
this.dateFormat = DateTimeFormatter.ofPattern(dateFormatPattern, ENGLISH);

leftBound = parseDateRangerBound(columnName, range.get(0), dateFormat);
rightBound = parseDateRangerBound(columnName, range.get(1), dateFormat);
if (!leftBound.get().isBefore(rightBound.get())) {
leftBound = parseDateRangerBound(columnName, range.get(0), dateFormatPattern, dateFormat);
rightBound = parseDateRangerBound(columnName, range.get(1), dateFormatPattern, dateFormat);
if (!leftBound.isBefore(rightBound)) {
throw invalidRangeProperty(columnName, dateFormatPattern, Optional.empty());
}

Expand All @@ -135,12 +137,12 @@ public List<String> getProjectedValues(Optional<Domain> partitionValueFilter)
{
ImmutableList.Builder<String> builder = ImmutableList.builder();

Instant leftBound = adjustBoundToDateFormat(this.leftBound.get());
Instant rightBound = adjustBoundToDateFormat(this.rightBound.get());
Instant leftBound = adjustBoundToDateFormat(this.leftBound);
Instant rightBound = adjustBoundToDateFormat(this.rightBound);

Instant currentValue = leftBound;
while (!currentValue.isAfter(rightBound)) {
String currentValueFormatted = formatValue(currentValue);
String currentValueFormatted = formatValueToHiveCompatibleFormat(currentValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the intended behavior here- when we're enumerating the partition values we want to respect whatever format is specified for the timestamp and not limit it to the Hive compatible encoding. Otherwise #25642 isn't actually fixed

if (isValueInDomain(partitionValueFilter, currentValue, currentValueFormatted)) {
builder.add(currentValueFormatted);
}
Expand All @@ -154,18 +156,35 @@ public List<String> getProjectedValues(Optional<Domain> partitionValueFilter)

private Instant adjustBoundToDateFormat(Instant value)
{
String formatted = formatValue(value.with(ChronoField.MILLI_OF_SECOND, 0));
LocalDateTime localDateTime = LocalDateTime.ofInstant(value.with(ChronoField.MILLI_OF_SECOND, 0), UTC_TIME_ZONE_ID);
String formatted = localDateTime.format(dateFormat);
try {
return dateFormat.parse(formatted).toInstant();
return parse(formatted, dateFormat);
}
catch (ParseException e) {
catch (DateTimeParseException e) {
throw new InvalidProjectionException(formatted, e.getMessage());
}
}

private String formatValue(Instant current)
// TODO: Remove this method when we support write to Hive partition value with respect the
// the user defined format. Currently, we are using the default Hive format. Only reading
// and generating partition values is done with respect to the user defined format.
private String formatValueToHiveCompatibleFormat(Instant current)
{
return dateFormat.format(new Date(current.toEpochMilli()));
if (columnType instanceof VarcharType) {
LocalDateTime localDateTime = LocalDateTime.ofInstant(current, UTC_TIME_ZONE_ID);
return localDateTime.format(dateFormat);
}

if (columnType instanceof DateType) {
return toDatePartitionValue(MILLISECONDS.toDays(current.toEpochMilli()));
}

if (columnType instanceof TimestampType timestampType && timestampType.isShort()) {
return toTimestampPartitionValue(MILLISECONDS.toMicros(current.toEpochMilli()));
}

throw new InvalidProjectionException(columnName, columnType);
}

private boolean isValueInDomain(Optional<Domain> valueDomain, Instant value, String formattedValue)
Expand Down Expand Up @@ -206,35 +225,42 @@ private static ChronoUnit resolveDefaultChronoUnit(String columnName, String dat
return MONTHS;
}

private static Supplier<Instant> parseDateRangerBound(String columnName, String value, SimpleDateFormat dateFormat)
private static Instant parseDateRangerBound(String columnName, String value, String dateFormatPattern, DateTimeFormatter dateFormat)
{
Matcher matcher = DATE_RANGE_BOUND_EXPRESSION_PATTERN.matcher(value);
if (matcher.matches()) {
String operator = matcher.group(2);
String multiplierString = matcher.group(3);
String unitString = matcher.group(4);
if (nonNull(operator) && nonNull(multiplierString) && nonNull(unitString)) {
unitString = unitString.toUpperCase(Locale.ENGLISH);
return new DateExpressionBound(
Integer.parseInt(multiplierString),
ChronoUnit.valueOf(unitString + "S"),
operator.charAt(0) == '+');
unitString = unitString.toUpperCase(ENGLISH);
int multiplier = Integer.parseInt(multiplierString);
boolean increment = operator.charAt(0) == '+';
ChronoUnit unit = ChronoUnit.valueOf(unitString + "S");
return Instant.now().plus(increment ? multiplier : -multiplier, unit);
}
if (value.trim().equals("NOW")) {
Instant now = Instant.now();
return () -> now;
return Instant.now();
}
throw invalidRangeProperty(columnName, dateFormat.toPattern(), Optional.of("Invalid expression"));
throw invalidRangeProperty(columnName, dateFormatPattern, Optional.of("Invalid expression"));
}

Instant dateBound;
try {
dateBound = dateFormat.parse(value).toInstant();
return parse(value, dateFormat);
}
catch (DateTimeParseException e) {
throw invalidRangeProperty(columnName, dateFormatPattern, Optional.of(e.getMessage()));
}
catch (ParseException e) {
throw invalidRangeProperty(columnName, dateFormat.toPattern(), Optional.of(e.getMessage()));
}

private static Instant parse(String value, DateTimeFormatter dateFormat)
throws DateTimeParseException
{
TemporalAccessor parsed = dateFormat.parse(value);
if (parsed.query(TemporalQueries.localDate()) != null && parsed.query(TemporalQueries.localTime()) == null) {
return LocalDate.from(parsed).atStartOfDay().toInstant(UTC);
}
return () -> dateBound;
return LocalDateTime.from(parsed).toInstant(UTC);
}

private static TrinoException invalidRangeProperty(String columnName, String dateFormatPattern, Optional<String> errorDetail)
Expand All @@ -248,14 +274,4 @@ private static TrinoException invalidRangeProperty(String columnName, String dat
DATE_RANGE_BOUND_EXPRESSION_PATTERN.pattern(),
errorDetail.map(error -> ": " + error).orElse("")));
}

private record DateExpressionBound(int multiplier, ChronoUnit unit, boolean increment)
implements Supplier<Instant>
{
@Override
public Instant get()
{
return Instant.now().plus(increment ? multiplier : -multiplier, unit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,29 @@ private static String toPartitionValue(Type type, Block block, int position)
return padSpaces(charType.getSlice(block, position), charType).toStringUtf8();
}
if (DATE.equals(type)) {
return LocalDate.ofEpochDay(DATE.getInt(block, position)).format(HIVE_DATE_FORMATTER);
return toDatePartitionValue(DATE.getInt(block, position));
}
if (TIMESTAMP_MILLIS.equals(type)) {
long epochMicros = type.getLong(block, position);
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanosOfSecond = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
return LocalDateTime.ofEpochSecond(epochSeconds, nanosOfSecond, ZoneOffset.UTC).format(HIVE_TIMESTAMP_FORMATTER);
return toTimestampPartitionValue(TIMESTAMP_MILLIS.getLong(block, position));
}
if (type instanceof DecimalType decimalType) {
return readBigDecimal(decimalType, block, position).stripTrailingZeros().toPlainString();
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported type for partition: " + type);
}

public static String toDatePartitionValue(long epochDays)
{
return LocalDate.ofEpochDay(epochDays).format(HIVE_DATE_FORMATTER);
}

public static String toTimestampPartitionValue(long epochMicros)
{
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanosOfSecond = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
return LocalDateTime.ofEpochSecond(epochSeconds, nanosOfSecond, ZoneOffset.UTC).format(HIVE_TIMESTAMP_FORMATTER);
}

public static void checkTableIsWritable(Table table, boolean writesToNonManagedTablesEnabled)
{
if (table.getTableType().equals(MATERIALIZED_VIEW.name())) {
Expand Down
Loading