Skip to content

Commit c7888b2

Browse files
committed
Fix missing file extension in Hive connector output files
1 parent 9ef74ae commit c7888b2

File tree

16 files changed

+97
-36
lines changed

16 files changed

+97
-36
lines changed

lib/trino-metastore/src/main/java/io/trino/metastore/StorageFormat.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,20 @@
2828
@Immutable
2929
public class StorageFormat
3030
{
31-
public static final StorageFormat NULL_STORAGE_FORMAT = new StorageFormat(null, null, null);
31+
public static final StorageFormat NULL_STORAGE_FORMAT = new StorageFormat(null, null, null, null);
3232
public static final StorageFormat VIEW_STORAGE_FORMAT = NULL_STORAGE_FORMAT;
3333

3434
private final String serde;
3535
private final String inputFormat;
3636
private final String outputFormat;
37+
private final String fileExtension;
3738

38-
private StorageFormat(String serde, String inputFormat, String outputFormat)
39+
private StorageFormat(String serde, String inputFormat, String outputFormat, String fileExtension)
3940
{
4041
this.serde = serde;
4142
this.inputFormat = inputFormat;
4243
this.outputFormat = outputFormat;
44+
this.fileExtension = fileExtension;
4345
}
4446

4547
public String getSerde()
@@ -66,6 +68,14 @@ public String getOutputFormat()
6668
return outputFormat;
6769
}
6870

71+
public String getFileExtension()
72+
{
73+
if (fileExtension == null) {
74+
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, "FileExtension is not present in StorageFormat");
75+
}
76+
return fileExtension;
77+
}
78+
6979
@JsonProperty("serde")
7080
public String getSerDeNullable()
7181
{
@@ -84,24 +94,32 @@ public String getOutputFormatNullable()
8494
return outputFormat;
8595
}
8696

87-
public static StorageFormat create(String serde, String inputFormat, String outputFormat)
97+
@JsonProperty("fileExtension")
98+
public String getFileExtensionNullable()
99+
{
100+
return fileExtension;
101+
}
102+
103+
public static StorageFormat create(String serde, String inputFormat, String outputFormat, String fileExtension)
88104
{
89105
return new StorageFormat(
90106
requireNonNull(serde, "serde is null"),
91107
requireNonNull(inputFormat, "inputFormat is null"),
92-
requireNonNull(outputFormat, "outputFormat is null"));
108+
requireNonNull(outputFormat, "outputFormat is null"),
109+
requireNonNull(fileExtension, "fileExtension is null"));
93110
}
94111

95112
@JsonCreator
96113
public static StorageFormat createNullable(
97114
@JsonProperty("serde") String serde,
98115
@JsonProperty("inputFormat") String inputFormat,
99-
@JsonProperty("outputFormat") String outputFormat)
116+
@JsonProperty("outputFormat") String outputFormat,
117+
@JsonProperty("fileExtension") String fileExtension)
100118
{
101-
if (serde == null && inputFormat == null && outputFormat == null) {
119+
if (serde == null && inputFormat == null && outputFormat == null && fileExtension == null) {
102120
return NULL_STORAGE_FORMAT;
103121
}
104-
return new StorageFormat(serde, inputFormat, outputFormat);
122+
return new StorageFormat(serde, inputFormat, outputFormat, fileExtension);
105123
}
106124

107125
@Override
@@ -116,13 +134,14 @@ public boolean equals(Object o)
116134
StorageFormat that = (StorageFormat) o;
117135
return Objects.equals(serde, that.serde) &&
118136
Objects.equals(inputFormat, that.inputFormat) &&
119-
Objects.equals(outputFormat, that.outputFormat);
137+
Objects.equals(outputFormat, that.outputFormat) &&
138+
Objects.equals(fileExtension, that.fileExtension);
120139
}
121140

122141
@Override
123142
public int hashCode()
124143
{
125-
return Objects.hash(serde, inputFormat, outputFormat);
144+
return Objects.hash(serde, inputFormat, outputFormat, fileExtension);
126145
}
127146

128147
@Override
@@ -132,6 +151,7 @@ public String toString()
132151
.add("serde", serde)
133152
.add("inputFormat", inputFormat)
134153
.add("outputFormat", outputFormat)
154+
.add("fileExtension", fileExtension)
135155
.toString();
136156
}
137157
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,8 @@ public class DeltaLakeMetadata
379379
public static final StorageFormat DELTA_STORAGE_FORMAT = create(
380380
LAZY_SIMPLE_SERDE_CLASS,
381381
SEQUENCEFILE_INPUT_FORMAT_CLASS,
382-
HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS);
382+
HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS,
383+
".seq");
383384
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
384385
public static final String CREATE_OR_REPLACE_TABLE_AS_OPERATION = "CREATE OR REPLACE TABLE AS SELECT";
385386
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.trino.metastore.Table;
2929
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
3030
import io.trino.plugin.hive.HiveCompressionCodec;
31+
import io.trino.plugin.hive.HiveStorageFormat;
32+
import io.trino.plugin.hive.HiveWriterFactory;
3133
import io.trino.plugin.tpch.TpchPlugin;
3234
import io.trino.spi.connector.ColumnMetadata;
3335
import io.trino.sql.planner.plan.FilterNode;
@@ -103,6 +105,7 @@
103105
import static java.util.Map.entry;
104106
import static java.util.concurrent.TimeUnit.MILLISECONDS;
105107
import static java.util.concurrent.TimeUnit.SECONDS;
108+
import static org.assertj.core.api.Assertions.as;
106109
import static org.assertj.core.api.Assertions.assertThat;
107110
import static org.assertj.core.api.Assertions.assertThatThrownBy;
108111
import static org.junit.jupiter.api.Assumptions.abort;

plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,57 +61,70 @@ public enum HiveStorageFormat
6161
ORC(
6262
ORC_SERDE_CLASS,
6363
ORC_INPUT_FORMAT_CLASS,
64-
ORC_OUTPUT_FORMAT_CLASS),
64+
ORC_OUTPUT_FORMAT_CLASS,
65+
".orc"),
6566
PARQUET(
6667
PARQUET_HIVE_SERDE_CLASS,
6768
MAPRED_PARQUET_INPUT_FORMAT_CLASS,
68-
MAPRED_PARQUET_OUTPUT_FORMAT_CLASS),
69+
MAPRED_PARQUET_OUTPUT_FORMAT_CLASS,
70+
".parquet"),
6971
AVRO(
7072
AVRO_SERDE_CLASS,
7173
AVRO_CONTAINER_INPUT_FORMAT_CLASS,
72-
AVRO_CONTAINER_OUTPUT_FORMAT_CLASS),
74+
AVRO_CONTAINER_OUTPUT_FORMAT_CLASS,
75+
".avro"),
7376
RCBINARY(
7477
LAZY_BINARY_COLUMNAR_SERDE_CLASS,
7578
RCFILE_INPUT_FORMAT_CLASS,
76-
RCFILE_OUTPUT_FORMAT_CLASS),
79+
RCFILE_OUTPUT_FORMAT_CLASS,
80+
".rc"),
7781
RCTEXT(
7882
COLUMNAR_SERDE_CLASS,
7983
RCFILE_INPUT_FORMAT_CLASS,
80-
RCFILE_OUTPUT_FORMAT_CLASS),
84+
RCFILE_OUTPUT_FORMAT_CLASS,
85+
".rc"),
8186
SEQUENCEFILE(
8287
LAZY_SIMPLE_SERDE_CLASS,
8388
SEQUENCEFILE_INPUT_FORMAT_CLASS,
84-
HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS),
89+
HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS,
90+
".seq"),
8591
JSON(
8692
JSON_SERDE_CLASS,
8793
TEXT_INPUT_FORMAT_CLASS,
88-
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS),
94+
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS,
95+
".json"),
8996
OPENX_JSON(
9097
OPENX_JSON_SERDE_CLASS,
9198
TEXT_INPUT_FORMAT_CLASS,
92-
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS),
99+
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS,
100+
".json"),
93101
TEXTFILE(
94102
LAZY_SIMPLE_SERDE_CLASS,
95103
TEXT_INPUT_FORMAT_CLASS,
96-
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS),
104+
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS,
105+
".txt"),
97106
CSV(
98107
OPENCSV_SERDE_CLASS,
99108
TEXT_INPUT_FORMAT_CLASS,
100-
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS),
109+
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS,
110+
".csv"),
101111
REGEX(
102112
REGEX_SERDE_CLASS,
103113
TEXT_INPUT_FORMAT_CLASS,
104-
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS);
114+
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS,
115+
"");
105116

106117
private final String serde;
107118
private final String inputFormat;
108119
private final String outputFormat;
120+
private final String fileExtension;
109121

110-
HiveStorageFormat(String serde, String inputFormat, String outputFormat)
122+
HiveStorageFormat(String serde, String inputFormat, String outputFormat, String fileExtension)
111123
{
112124
this.serde = requireNonNull(serde, "serde is null");
113125
this.inputFormat = requireNonNull(inputFormat, "inputFormat is null");
114126
this.outputFormat = requireNonNull(outputFormat, "outputFormat is null");
127+
this.fileExtension = requireNonNull(fileExtension, "fileExtension is null");
115128
}
116129

117130
public String getSerde()
@@ -140,7 +153,7 @@ public boolean isSplittable(String path)
140153

141154
public StorageFormat toStorageFormat()
142155
{
143-
return StorageFormat.create(serde, inputFormat, outputFormat);
156+
return StorageFormat.create(serde, inputFormat, outputFormat, fileExtension);
144157
}
145158

146159
public void validateColumns(List<HiveColumnHandle> handles)

plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ public static int getBucketFromFileName(String fileName)
675675
public static String getFileExtension(HiveCompressionCodec compression, StorageFormat format)
676676
{
677677
// text format files must have the correct extension when compressed
678-
return compression.getHiveCompressionKind()
678+
return format.getFileExtension() + compression.getHiveCompressionKind()
679679
.filter(_ -> format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS))
680680
.map(CompressionKind::getFileExtension)
681681
.orElse("");

plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ public final class GlueConverter
108108
StorageFormat.create(
109109
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
110110
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
111-
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
111+
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
112+
".parquet"),
112113
Optional.empty(),
113114
Optional.empty(),
114115
false,
@@ -324,7 +325,7 @@ private static Storage fromGlueStorage(StorageDescriptor sd, String tablePartiti
324325

325326
SerDeInfo serdeInfo = requireNonNull(sd.serdeInfo(), () -> "StorageDescriptor SerDeInfo is null: " + tablePartitionName);
326327
return new Storage(
327-
StorageFormat.createNullable(serdeInfo.serializationLibrary(), sd.inputFormat(), sd.outputFormat()),
328+
StorageFormat.createNullable(serdeInfo.serializationLibrary(), sd.inputFormat(), sd.outputFormat(), ""),
328329
Optional.ofNullable(sd.location()),
329330
bucketProperty,
330331
sd.skewedInfo() != null && !sd.skewedInfo().skewedColumnNames().isEmpty(),

plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ private static void fromMetastoreApiStorageDescriptor(
674674
throw new TrinoException(HIVE_INVALID_METADATA, "Table storage descriptor is missing SerDe info");
675675
}
676676

677-
builder.setStorageFormat(StorageFormat.createNullable(serdeInfo.getSerializationLib(), storageDescriptor.getInputFormat(), storageDescriptor.getOutputFormat()))
677+
builder.setStorageFormat(StorageFormat.createNullable(serdeInfo.getSerializationLib(), storageDescriptor.getInputFormat(), storageDescriptor.getOutputFormat(), null))
678678
.setLocation(nullToEmpty(storageDescriptor.getLocation()))
679679
.setBucketProperty(createBucketProperty(storageDescriptor, tablePartitionName))
680680
.setSkewed(storageDescriptor.isSetSkewedInfo() && storageDescriptor.getSkewedInfo().isSetSkewedColNames() && !storageDescriptor.getSkewedInfo().getSkewedColNames().isEmpty())

plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9402,6 +9402,26 @@ public void testFlushMetadataDisabled()
94029402
assertQuerySucceeds("CALL system.flush_metadata_cache()");
94039403
}
94049404

9405+
@Test
9406+
public void testWriteFileExtension()
9407+
{
9408+
testWithAllStorageFormats(this::testWriteFileExtension);
9409+
}
9410+
9411+
private void testWriteFileExtension(Session session, HiveStorageFormat storageFormat)
9412+
{
9413+
String tableName = "test_write_file_extension_" + randomNameSuffix();
9414+
assertUpdate(session, format("CREATE TABLE %s (id BIGINT) WITH (format = '%s')", tableName, storageFormat));
9415+
assertUpdate(session, format("INSERT INTO %s VALUES(1)", tableName), 1);
9416+
assertQuery(session, format("SELECT * FROM %s", tableName), "VALUES(1)");
9417+
9418+
assertThat((String) computeActual("SELECT \"$path\" FROM " + tableName).getOnlyValue())
9419+
// TODO: get HiveCompressionCodec from session
9420+
.endsWith(HiveWriterFactory.getFileExtension(HiveCompressionCodec.ZSTD, storageFormat.toStorageFormat()));
9421+
9422+
assertUpdate(session, format("DROP TABLE %s", tableName));
9423+
}
9424+
94059425
private static final Set<HiveStorageFormat> NAMED_COLUMN_ONLY_FORMATS = ImmutableSet.of(HiveStorageFormat.AVRO, HiveStorageFormat.JSON);
94069426

94079427
private Session getParallelWriteSession(Session baseSession)

plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveMetadataListing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class TestHiveMetadataListing
6363
Optional.of("comment"),
6464
ImmutableMap.of());
6565
private static final Storage TABLE_STORAGE = new Storage(
66-
StorageFormat.create("serde", "input", "output"),
66+
StorageFormat.create("serde", "input", "output", ".ext"),
6767
Optional.of("location"),
6868
Optional.of(new HiveBucketProperty(ImmutableList.of("column"), 10, ImmutableList.of(new SortingColumn("column", SortingColumn.Order.ASCENDING)))),
6969
true,

plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class TestTransactionScopeCachingDirectoryLister
5353
Optional.of("comment"),
5454
Map.of());
5555
private static final Storage TABLE_STORAGE = new Storage(
56-
StorageFormat.create("serde", "input", "output"),
56+
StorageFormat.create("serde", "input", "output", ".ext"),
5757
Optional.of("location"),
5858
Optional.of(new HiveBucketProperty(ImmutableList.of("column"), 10, ImmutableList.of(new SortingColumn("column", SortingColumn.Order.ASCENDING)))),
5959
true,

plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class TestStorage
2929
public void testRoundTrip()
3030
{
3131
Storage storage = Storage.builder()
32-
.setStorageFormat(StorageFormat.create("abc", "in", "out"))
32+
.setStorageFormat(StorageFormat.create("abc", "in", "out", ".ext"))
3333
.setLocation("/test")
3434
.build();
3535

plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/file/TestFileHiveMetastore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void testPreserveHudiInputFormat()
8989
.setOwnerType(Optional.empty());
9090
getMetastore().createDatabase(database.build());
9191

92-
StorageFormat storageFormat = StorageFormat.create(PARQUET_HIVE_SERDE_CLASS, HUDI_PARQUET_INPUT_FORMAT, MAPRED_PARQUET_OUTPUT_FORMAT_CLASS);
92+
StorageFormat storageFormat = StorageFormat.create(PARQUET_HIVE_SERDE_CLASS, HUDI_PARQUET_INPUT_FORMAT, MAPRED_PARQUET_OUTPUT_FORMAT_CLASS, ".parquet");
9393

9494
String tableName = "some_table_name" + randomNameSuffix();
9595
Table table = Table.builder()

plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class TestGlueConverter
7575
.setPartitionColumns(ImmutableList.of(new Column("table-partition", HIVE_STRING, Optional.of("table partition column comment"), Map.of())))
7676
.setViewOriginalText(Optional.of("originalText"))
7777
.setViewExpandedText(Optional.of("expandedText"))
78-
.withStorage(storage -> storage.setStorageFormat(StorageFormat.create("TableSerdeLib", "TableInputFormat", "TableOutputFormat"))
78+
.withStorage(storage -> storage.setStorageFormat(StorageFormat.create("TableSerdeLib", "TableInputFormat", "TableOutputFormat", ".ext"))
7979
.setLocation("/test-table")
8080
.setBucketProperty(Optional.empty())
8181
.setSerdeParameters(ImmutableMap.of())).build();
@@ -86,7 +86,7 @@ class TestGlueConverter
8686
.setValues(ImmutableList.of("val1"))
8787
.setColumns(ImmutableList.of(new Column("partition-data", HIVE_STRING, Optional.of("partition data column comment"), Map.of())))
8888
.setParameters(ImmutableMap.of())
89-
.withStorage(storage -> storage.setStorageFormat(StorageFormat.create("PartitionSerdeLib", "PartitionInputFormat", "PartitionOutputFormat"))
89+
.withStorage(storage -> storage.setStorageFormat(StorageFormat.create("PartitionSerdeLib", "PartitionInputFormat", "PartitionOutputFormat", ".ext"))
9090
.setLocation("/test-table/partition")
9191
.setBucketProperty(Optional.empty())
9292
.setSerdeParameters(ImmutableMap.of())).build();

plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ private void createTable(
9797
StorageFormat storageFormat = StorageFormat.create(
9898
PARQUET_HIVE_SERDE_CLASS,
9999
HUDI_PARQUET_INPUT_FORMAT,
100-
MAPRED_PARQUET_OUTPUT_FORMAT_CLASS);
100+
MAPRED_PARQUET_OUTPUT_FORMAT_CLASS,
101+
".parquet");
101102

102103
Table table = Table.builder()
103104
.setDatabaseName(schemaName)

plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ private static Table createTableDefinition(String schemaName, TpchTable<?> table
194194
StorageFormat storageFormat = StorageFormat.create(
195195
PARQUET_HIVE_SERDE_CLASS,
196196
HUDI_PARQUET_INPUT_FORMAT,
197-
MAPRED_PARQUET_OUTPUT_FORMAT_CLASS);
197+
MAPRED_PARQUET_OUTPUT_FORMAT_CLASS,
198+
".parquet");
198199

199200
return Table.builder()
200201
.setDatabaseName(schemaName)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public abstract class AbstractIcebergTableOperations
7777
public static final StorageFormat ICEBERG_METASTORE_STORAGE_FORMAT = StorageFormat.create(
7878
LAZY_SIMPLE_SERDE_CLASS,
7979
FILE_INPUT_FORMAT_CLASS,
80-
FILE_OUTPUT_FORMAT_CLASS);
80+
FILE_OUTPUT_FORMAT_CLASS,
81+
"");
8182

8283
protected final ConnectorSession session;
8384
protected final String database;

0 commit comments

Comments
 (0)