Skip to content

Commit af8efa8

Browse files
committed
Fix reading specifying format for date partition projection
Transform user-specified date partition projection format into Hive-compatible format for partition consistency.
1 parent 538068e commit af8efa8

File tree

3 files changed

+98
-10
lines changed

3 files changed

+98
-10
lines changed

plugin/trino-hive/src/main/java/io/trino/plugin/hive/projection/DateProjection.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_RANGE;
4848
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.getProjectionPropertyRequiredValue;
4949
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.getProjectionPropertyValue;
50+
import static io.trino.plugin.hive.util.HiveWriteUtils.toDatePartitionValue;
51+
import static io.trino.plugin.hive.util.HiveWriteUtils.toTimestampPartitionValue;
5052
import static io.trino.spi.predicate.Domain.singleValue;
5153
import static java.lang.String.format;
5254
import static java.time.ZoneOffset.UTC;
@@ -72,6 +74,7 @@ final class DateProjection
7274
private static final Pattern DATE_RANGE_BOUND_EXPRESSION_PATTERN = Pattern.compile("^\\s*NOW\\s*(([+-])\\s*([0-9]+)\\s*(DAY|HOUR|MINUTE|SECOND)S?\\s*)?$");
7375

7476
private final String columnName;
77+
private final Type columnType;
7578
private final DateTimeFormatter dateFormat;
7679
private final Instant leftBound;
7780
private final Instant rightBound;
@@ -87,6 +90,7 @@ public DateProjection(String columnName, Type columnType, Map<String, Object> co
8790
}
8891

8992
this.columnName = requireNonNull(columnName, "columnName is null");
93+
this.columnType = requireNonNull(columnType, "columnType is null");
9094

9195
String dateFormatPattern = getProjectionPropertyRequiredValue(
9296
columnName,
@@ -138,7 +142,7 @@ public List<String> getProjectedValues(Optional<Domain> partitionValueFilter)
138142

139143
Instant currentValue = leftBound;
140144
while (!currentValue.isAfter(rightBound)) {
141-
String currentValueFormatted = formatValue(currentValue);
145+
String currentValueFormatted = formatValueToHiveCompatibleFormat(currentValue);
142146
if (isValueInDomain(partitionValueFilter, currentValue, currentValueFormatted)) {
143147
builder.add(currentValueFormatted);
144148
}
@@ -152,7 +156,8 @@ public List<String> getProjectedValues(Optional<Domain> partitionValueFilter)
152156

153157
private Instant adjustBoundToDateFormat(Instant value)
154158
{
155-
String formatted = formatValue(value.with(ChronoField.MILLI_OF_SECOND, 0));
159+
LocalDateTime localDateTime = LocalDateTime.ofInstant(value.with(ChronoField.MILLI_OF_SECOND, 0), UTC_TIME_ZONE_ID);
160+
String formatted = localDateTime.format(dateFormat);
156161
try {
157162
return parse(formatted, dateFormat);
158163
}
@@ -161,10 +166,25 @@ private Instant adjustBoundToDateFormat(Instant value)
161166
}
162167
}
163168

164-
private String formatValue(Instant current)
169+
// TODO: Remove this method when we support write to Hive partition value with respect the
170+
// the user defined format. Currently, we are using the default Hive format. Only reading
171+
// and generating partition values is done with respect to the user defined format.
172+
private String formatValueToHiveCompatibleFormat(Instant current)
165173
{
166-
LocalDateTime localDateTime = LocalDateTime.ofInstant(current, UTC_TIME_ZONE_ID);
167-
return localDateTime.format(dateFormat);
174+
if (columnType instanceof VarcharType) {
175+
LocalDateTime localDateTime = LocalDateTime.ofInstant(current, UTC_TIME_ZONE_ID);
176+
return localDateTime.format(dateFormat);
177+
}
178+
179+
if (columnType instanceof DateType) {
180+
return toDatePartitionValue(MILLISECONDS.toDays(current.toEpochMilli()));
181+
}
182+
183+
if (columnType instanceof TimestampType timestampType && timestampType.isShort()) {
184+
return toTimestampPartitionValue(MILLISECONDS.toMicros(current.toEpochMilli()));
185+
}
186+
187+
throw new InvalidProjectionException(columnName, columnType);
168188
}
169189

170190
private boolean isValueInDomain(Optional<Domain> valueDomain, Instant value, String formattedValue)

plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,27 @@ private static String toPartitionValue(Type type, Block block, int position)
140140
return padSpaces(charType.getSlice(block, position), charType).toStringUtf8();
141141
}
142142
if (DATE.equals(type)) {
143-
return LocalDate.ofEpochDay(DATE.getInt(block, position)).format(HIVE_DATE_FORMATTER);
143+
return toDatePartitionValue(DATE.getInt(block, position));
144144
}
145145
if (TIMESTAMP_MILLIS.equals(type)) {
146-
long epochMicros = type.getLong(block, position);
147-
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
148-
int nanosOfSecond = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
149-
return LocalDateTime.ofEpochSecond(epochSeconds, nanosOfSecond, ZoneOffset.UTC).format(HIVE_TIMESTAMP_FORMATTER);
146+
return toTimestampPartitionValue(TIMESTAMP_MILLIS.getLong(block, position));
150147
}
151148
if (type instanceof DecimalType decimalType) {
152149
return readBigDecimal(decimalType, block, position).stripTrailingZeros().toPlainString();
153150
}
154151
throw new TrinoException(NOT_SUPPORTED, "Unsupported type for partition: " + type);
155152
}
156153

154+
public static String toDatePartitionValue(long epochDays) {
155+
return LocalDate.ofEpochDay(epochDays).format(HIVE_DATE_FORMATTER);
156+
}
157+
158+
public static String toTimestampPartitionValue(long epochMicros) {
159+
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
160+
int nanosOfSecond = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
161+
return LocalDateTime.ofEpochSecond(epochSeconds, nanosOfSecond, ZoneOffset.UTC).format(HIVE_TIMESTAMP_FORMATTER);
162+
}
163+
157164
public static void checkTableIsWritable(Table table, boolean writesToNonManagedTablesEnabled)
158165
{
159166
if (table.getTableType().equals(MATERIALIZED_VIEW.name())) {

plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,67 @@ public void testIntegerPartitionProjectionOnIntegerColumnWithDefaults()
917917
"VALUES ('POLAND_1'), ('POLAND_2'), ('CZECH_1'), ('CZECH_2')");
918918
}
919919

920+
@Test
921+
public void testDatePartitionProjectionWithFormat()
922+
{
923+
String tableName = "partition_projection_custom_date" + randomNameSuffix();
924+
String fullyQualifiedTestTableName = getFullyQualifiedTestTableName(tableName);
925+
926+
computeActual(
927+
"CREATE TABLE " + fullyQualifiedTestTableName + " ( " +
928+
" name varchar(25), " +
929+
" comment varchar(152), " +
930+
" dt DATE WITH (" +
931+
" partition_projection_format='yyyy/MM/dd',\n" +
932+
" partition_projection_interval=1,\n" +
933+
" partition_projection_interval_unit='DAYS', \n" +
934+
" partition_projection_range=ARRAY['2025/01/01', '2025/01/30'], \n" +
935+
" partition_projection_type='date'" +
936+
" ), " +
937+
" ts timestamp WITH (" +
938+
" partition_projection_type='date', " +
939+
" partition_projection_format='yyyy/M/dd HH@mm@ss', " +
940+
" partition_projection_range=ARRAY['2025/1/20 00@00@00', '2025/1/21 00@00@00'], " +
941+
" partition_projection_interval=1, " +
942+
" partition_projection_interval_unit='HOURS'" +
943+
" )" +
944+
") WITH ( " +
945+
" partitioned_by=ARRAY['dt','ts'], " +
946+
" partition_projection_enabled=true " +
947+
")");
948+
949+
assertThat(
950+
hiveMinioDataLake
951+
.runOnHive("SHOW TBLPROPERTIES " + getHiveTestTableName(tableName)))
952+
.containsPattern("[ |]+projection\\.enabled[ |]+true[ |]+")
953+
.containsPattern("[ |]+projection\\.dt\\.type[ |]+date[ |]+")
954+
.containsPattern("[ |]+projection\\.dt\\.format[ |]+yyyy/MM/dd[ |]+")
955+
.containsPattern("[ |]+projection\\.dt\\.interval.unit[ |]+days[ |]+")
956+
.containsPattern("[ |]+projection\\.dt\\.range[ |]+2025/01/01,2025/01/30[ |]+")
957+
.containsPattern("[ |]+projection\\.ts\\.type[ |]+date[ |]+")
958+
.containsPattern("[ |]+projection\\.ts\\.format[ |]+yyyy/M/dd HH@mm@ss[ |]+")
959+
.containsPattern("[ |]+projection\\.ts\\.interval.unit[ |]+hours[ |]+")
960+
.containsPattern("[ |]+projection\\.ts\\.range[ |]+2025/1/20 00@00@00,2025/1/21 00@00@00[ |]+");
961+
962+
computeActual(createInsertStatement(
963+
fullyQualifiedTestTableName,
964+
ImmutableList.of(
965+
ImmutableList.of("'POLAND_1'", "'Comment'", "DATE '2025-1-23'", "TIMESTAMP '2025-1-20 01:00:00'"),
966+
ImmutableList.of("'POLAND_2'", "'Comment'", "DATE '2025-1-23'", "TIMESTAMP '2025-1-20 02:00:00'"),
967+
ImmutableList.of("'CZECH_2'", "'Comment'", "DATE '2025-1-24'", "TIMESTAMP '2025-1-20 10:00:00'"))));
968+
969+
assertQuery("SELECT * FROM " + fullyQualifiedTestTableName + " WHERE name = 'POLAND_1'", "VALUES ('POLAND_1', 'Comment', DATE '2025-01-23', TIMESTAMP '2025-01-20 01:00:00')");
970+
971+
assertQuery("SELECT * FROM " + fullyQualifiedTestTableName + " WHERE dt = DATE '2025-01-23'",
972+
"VALUES ('POLAND_1', 'Comment', DATE '2025-01-23', TIMESTAMP '2025-01-20 01:00:00')," +
973+
"('POLAND_2', 'Comment', DATE '2025-01-23', TIMESTAMP '2025-01-20 02:00:00')");
974+
assertQuery("SELECT * FROM " + fullyQualifiedTestTableName + " WHERE dt = DATE '2025-01-23' AND ts > TIMESTAMP '2025-1-20 01:00:00'",
975+
"VALUES ('POLAND_2', 'Comment', DATE '2025-01-23', TIMESTAMP '2025-01-20 02:00:00')");
976+
assertQuery("SELECT * FROM " + fullyQualifiedTestTableName + " WHERE dt IN (DATE '2025-01-23', DATE '2025-01-24', DATE '2025-01-25') AND ts > TIMESTAMP '2025-1-20 01:00:00'",
977+
"VALUES ('POLAND_2', 'Comment', DATE '2025-01-23', TIMESTAMP '2025-01-20 02:00:00')," +
978+
"('CZECH_2', 'Comment', DATE '2025-1-24', TIMESTAMP '2025-1-20 10:00:00')");
979+
}
980+
920981
@Test
921982
public void testDatePartitionProjectionOnDateColumnWithDefaults()
922983
{

0 commit comments

Comments
 (0)