-
Notifications
You must be signed in to change notification settings - Fork 101
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit b87bbc9
Showing
22 changed files
with
500,969 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
parquet-mr-tests/.classpath | ||
parquet-mr-tests/.project | ||
parquet-mr-tests/.settings/ | ||
parquet-mr-tests/target/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
parquet-compatibility | ||
===================== | ||
|
||
compatibility tests to make sur C and Java implementations can read each other | ||
|
||
Submodules: | ||
----------- | ||
* parquet-compat | ||
* parquet-testdata | ||
* parquet-compat-$version | ||
|
||
parquet-compat | ||
-------------- | ||
This stores the test sources. This is not a maven submodule. | ||
|
||
parquet-testdata | ||
---------------- | ||
stores all the csvs and impala files (todo discuss impala compatibility test) | ||
|
||
parquet-compat-$version: | ||
------------------------ | ||
pom.xml has dependency on the corresponding version of parquet-mr. | ||
src is a symlink to sources in parquet-compat (../parquet-compat/src). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>parquet</groupId> | ||
<artifactId>parquet-compat-1.0.0-t1</artifactId> | ||
<version>1.0.0-SNAPSHOT</version> | ||
<packaging>jar</packaging> | ||
|
||
<name>Parquet Compatibility 1.0.0-t1</name> | ||
<url>https://github.com/Parquet/parquet-compatibility</url> | ||
|
||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>com.twitter</groupId> | ||
<artifactId>parquet-column</artifactId> | ||
<version>1.0.0-SNAPSHOT</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.twitter</groupId> | ||
<artifactId>parquet-hadoop</artifactId> | ||
<version>1.0.0-SNAPSHOT</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.10</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
../parquet-compat/src |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
*~ | ||
.* | ||
!.gitignore | ||
target |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>parquet</groupId> | ||
<artifactId>parquet-compat-1.0.0</artifactId> | ||
<version>1.0.0-SNAPSHOT</version> | ||
<packaging>jar</packaging> | ||
|
||
<name>Parquet Compatibility 1.0.0</name> | ||
<url>https://github.com/Parquet/parquet-compatibility</url> | ||
|
||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>com.twitter</groupId> | ||
<artifactId>parquet-column</artifactId> | ||
<version>1.0.0-SNAPSHOT</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.twitter</groupId> | ||
<artifactId>parquet-hadoop</artifactId> | ||
<version>1.0.0-SNAPSHOT</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.10</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
../parquet-compat/src |
200 changes: 200 additions & 0 deletions
200
parquet-compat/src/test/java/parquet/compat/test/ConvertUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
/** | ||
* Copyright 2012 Twitter, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package parquet.compat.test; | ||
|
||
import static java.lang.String.format; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.BufferedWriter; | ||
import java.io.File; | ||
import java.io.FileReader; | ||
import java.io.FileWriter; | ||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.regex.Pattern; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
This comment has been minimized.
Sorry, something went wrong. |
||
import org.apache.hadoop.fs.Path; | ||
|
||
import parquet.Log; | ||
import parquet.Preconditions; | ||
import parquet.column.page.PageReadStore; | ||
import parquet.example.data.Group; | ||
import parquet.example.data.simple.convert.GroupRecordConverter; | ||
import parquet.hadoop.ParquetFileReader; | ||
import parquet.hadoop.ParquetReader; | ||
import parquet.hadoop.example.GroupReadSupport; | ||
import parquet.hadoop.metadata.ParquetMetadata; | ||
import parquet.io.ColumnIOFactory; | ||
import parquet.io.MessageColumnIO; | ||
import parquet.io.RecordReader; | ||
import parquet.schema.MessageType; | ||
import parquet.schema.MessageTypeParser; | ||
import parquet.schema.Type; | ||
import parquet.schema.PrimitiveType.PrimitiveTypeName; | ||
|
||
public class ConvertUtils { | ||
|
||
private static final Log LOG = Log.getLog(ConvertUtils.class); | ||
|
||
public static final String CSV_DELIMITER= "|"; | ||
|
||
private static String readFile(String path) throws IOException { | ||
BufferedReader reader = new BufferedReader(new FileReader(path)); | ||
StringBuilder stringBuilder = new StringBuilder(); | ||
|
||
try { | ||
String line = null; | ||
String ls = System.getProperty("line.separator"); | ||
|
||
while ((line = reader.readLine()) != null ) { | ||
stringBuilder.append(line); | ||
stringBuilder.append(ls); | ||
} | ||
} finally { | ||
Utils.closeQuietly(reader); | ||
} | ||
|
||
return stringBuilder.toString(); | ||
} | ||
|
||
public static String getSchema(File csvFile) throws IOException { | ||
String fileName = csvFile.getName().substring( | ||
0, csvFile.getName().length() - ".csv".length()) + ".schema"; | ||
File schemaFile = new File(csvFile.getParentFile(), fileName); | ||
return readFile(schemaFile.getAbsolutePath()); | ||
} | ||
|
||
public static void convertCsvToParquet(File csvFile, File outputParquetFile) throws IOException { | ||
convertCsvToParquet(csvFile, outputParquetFile, false); | ||
} | ||
|
||
public static void convertCsvToParquet(File csvFile, File outputParquetFile, boolean enableDictionary) throws IOException { | ||
LOG.info("Converting " + csvFile.getName() + " to " + outputParquetFile.getName()); | ||
String rawSchema = getSchema(csvFile); | ||
if(outputParquetFile.exists()) { | ||
throw new IOException("Output file " + outputParquetFile.getAbsolutePath() + | ||
" already exists"); | ||
} | ||
|
||
Path path = new Path(outputParquetFile.toURI()); | ||
|
||
MessageType schema = MessageTypeParser.parseMessageType(rawSchema); | ||
CsvParquetWriter writer = new CsvParquetWriter(path, schema, enableDictionary); | ||
|
||
BufferedReader br = new BufferedReader(new FileReader(csvFile)); | ||
String line; | ||
int lineNumber = 0; | ||
try { | ||
while ((line = br.readLine()) != null) { | ||
String[] fields = line.split(Pattern.quote(CSV_DELIMITER)); | ||
writer.write(Arrays.asList(fields)); | ||
++lineNumber; | ||
} | ||
} finally { | ||
LOG.info("Number of lines: " + lineNumber); | ||
Utils.closeQuietly(br); | ||
Utils.closeQuietly(writer); | ||
} | ||
} | ||
|
||
public static void convertParquetToCSV(File parquetFile, File csvOutputFile) throws IOException { | ||
Preconditions.checkArgument(parquetFile.getName().endsWith(".parquet"), | ||
"parquet file should have .parquet extension"); | ||
Preconditions.checkArgument(csvOutputFile.getName().endsWith(".csv"), | ||
"csv file should have .csv extension"); | ||
Preconditions.checkArgument(!csvOutputFile.exists(), | ||
"Output file " + csvOutputFile.getAbsolutePath() + " already exists"); | ||
|
||
LOG.info("Converting " + parquetFile.getName() + " to " + csvOutputFile.getName()); | ||
|
||
|
||
Path parquetFilePath = new Path(parquetFile.toURI()); | ||
|
||
Configuration configuration = new Configuration(true); | ||
|
||
GroupReadSupport readSupport = new GroupReadSupport(); | ||
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath); | ||
MessageType schema = readFooter.getFileMetaData().getSchema(); | ||
|
||
readSupport.init(configuration, null, schema); | ||
BufferedWriter w = new BufferedWriter(new FileWriter(csvOutputFile)); | ||
ParquetReader<Group> reader = new ParquetReader<Group>(parquetFilePath, readSupport); | ||
try{ | ||
Group g = null; | ||
while( (g = reader.read())!= null) { | ||
writeGroup(w, g, schema); | ||
} | ||
} | ||
finally { | ||
Utils.closeQuietly(reader); | ||
Utils.closeQuietly(w); | ||
} | ||
} | ||
|
||
private static void writeGroup(BufferedWriter w, Group g, MessageType schema) | ||
throws IOException{ | ||
for (int j = 0; j < schema.getFieldCount(); j++) { | ||
if (j > 0) { | ||
w.write(CSV_DELIMITER); | ||
} | ||
String valueToString = g.getValueToString(j, 0); | ||
w.write(valueToString); | ||
} | ||
w.write('\n'); | ||
} | ||
|
||
@Deprecated | ||
public static void convertParquetToCSVEx(File parquetFile, File csvOutputFile) throws IOException { | ||
Preconditions.checkArgument(parquetFile.getName().endsWith(".parquet"), | ||
"parquet file should have .parquet extension"); | ||
Preconditions.checkArgument(csvOutputFile.getName().endsWith(".csv"), | ||
"csv file should have .csv extension"); | ||
Preconditions.checkArgument(!csvOutputFile.exists(), | ||
"Output file " + csvOutputFile.getAbsolutePath() + " already exists"); | ||
|
||
LOG.info("Converting " + parquetFile.getName() + " to " + csvOutputFile.getName()); | ||
|
||
Path parquetFilePath = new Path(parquetFile.toURI()); | ||
|
||
Configuration configuration = new Configuration(true); | ||
|
||
// TODO Following can be changed by using ParquetReader instead of ParquetFileReader | ||
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath); | ||
MessageType schema = readFooter.getFileMetaData().getSchema(); | ||
ParquetFileReader parquetFileReader = new ParquetFileReader( | ||
configuration, parquetFilePath, readFooter.getBlocks(), schema.getColumns()); | ||
BufferedWriter w = new BufferedWriter(new FileWriter(csvOutputFile)); | ||
PageReadStore pages = null; | ||
try { | ||
while (null != (pages = parquetFileReader.readNextRowGroup())) { | ||
final long rows = pages.getRowCount(); | ||
LOG.info("Number of rows: " + rows); | ||
|
||
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); | ||
final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); | ||
for (int i = 0; i < rows; i++) { | ||
final Group g = recordReader.read(); | ||
writeGroup(w, g, schema); | ||
} | ||
} | ||
} finally { | ||
Utils.closeQuietly(parquetFileReader); | ||
Utils.closeQuietly(w); | ||
} | ||
} | ||
|
||
} |
41 changes: 41 additions & 0 deletions
41
parquet-compat/src/test/java/parquet/compat/test/CsvParquetWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/** | ||
* Copyright 2012 Twitter, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package parquet.compat.test; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
import org.apache.hadoop.fs.Path; | ||
|
||
import parquet.hadoop.ParquetWriter; | ||
import parquet.hadoop.api.WriteSupport; | ||
import parquet.hadoop.metadata.CompressionCodecName; | ||
import parquet.schema.MessageType; | ||
|
||
public class CsvParquetWriter extends ParquetWriter<List<String>> { | ||
|
||
public CsvParquetWriter(Path file, MessageType schema) throws IOException { | ||
this(file, schema, false); | ||
} | ||
|
||
public CsvParquetWriter(Path file, MessageType schema, boolean enableDictionary) throws IOException { | ||
this(file, schema, CompressionCodecName.UNCOMPRESSED, enableDictionary); | ||
} | ||
|
||
public CsvParquetWriter(Path file, MessageType schema, CompressionCodecName codecName, boolean enableDictionary) throws IOException { | ||
super(file, (WriteSupport<List<String>>) new CsvWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false); | ||
} | ||
} |
Oops, something went wrong.
Dependency to the Apache Hadoop without any good reason.