Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
aniket486 committed Jul 20, 2013
0 parents commit b87bbc9
Show file tree
Hide file tree
Showing 22 changed files with 500,969 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
parquet-mr-tests/.classpath
parquet-mr-tests/.project
parquet-mr-tests/.settings/
parquet-mr-tests/target/
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
parquet-compatibility
=====================

compatibility tests to make sur C and Java implementations can read each other

Submodules:
-----------
* parquet-compat
* parquet-testdata
* parquet-compat-$version

parquet-compat
--------------
This stores the test sources. This is not a maven submodule.

parquet-testdata
----------------
stores all the csvs and impala files (todo discuss impala compatibility test)

parquet-compat-$version:
------------------------
pom.xml has dependency on the corresponding version of parquet-mr.
src is a symlink to sources in parquet-compat (../parquet-compat/src).
39 changes: 39 additions & 0 deletions parquet-compat-1.0.0-t1/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>parquet</groupId>
<artifactId>parquet-compat-1.0.0-t1</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Parquet Compatibility 1.0.0-t1</name>
<url>https://github.com/Parquet/parquet-compatibility</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
1 change: 1 addition & 0 deletions parquet-compat-1.0.0-t1/src
4 changes: 4 additions & 0 deletions parquet-compat-1.0.0/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*~
.*
!.gitignore
target
39 changes: 39 additions & 0 deletions parquet-compat-1.0.0/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>parquet</groupId>
<artifactId>parquet-compat-1.0.0</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Parquet Compatibility 1.0.0</name>
<url>https://github.com/Parquet/parquet-compatibility</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
1 change: 1 addition & 0 deletions parquet-compat-1.0.0/src
200 changes: 200 additions & 0 deletions parquet-compat/src/test/java/parquet/compat/test/ConvertUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet.compat.test;

import static java.lang.String.format;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import parquet.Log;
import parquet.Preconditions;
import parquet.column.page.PageReadStore;
import parquet.example.data.Group;
import parquet.example.data.simple.convert.GroupRecordConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.example.GroupReadSupport;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.ColumnIOFactory;
import parquet.io.MessageColumnIO;
import parquet.io.RecordReader;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
import parquet.schema.Type;
import parquet.schema.PrimitiveType.PrimitiveTypeName;

public class ConvertUtils {

private static final Log LOG = Log.getLog(ConvertUtils.class);

public static final String CSV_DELIMITER= "|";

private static String readFile(String path) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(path));
StringBuilder stringBuilder = new StringBuilder();

try {
String line = null;
String ls = System.getProperty("line.separator");

while ((line = reader.readLine()) != null ) {
stringBuilder.append(line);
stringBuilder.append(ls);
}
} finally {
Utils.closeQuietly(reader);
}

return stringBuilder.toString();
}

public static String getSchema(File csvFile) throws IOException {
String fileName = csvFile.getName().substring(
0, csvFile.getName().length() - ".csv".length()) + ".schema";
File schemaFile = new File(csvFile.getParentFile(), fileName);
return readFile(schemaFile.getAbsolutePath());
}

public static void convertCsvToParquet(File csvFile, File outputParquetFile) throws IOException {
convertCsvToParquet(csvFile, outputParquetFile, false);
}

public static void convertCsvToParquet(File csvFile, File outputParquetFile, boolean enableDictionary) throws IOException {
LOG.info("Converting " + csvFile.getName() + " to " + outputParquetFile.getName());
String rawSchema = getSchema(csvFile);
if(outputParquetFile.exists()) {
throw new IOException("Output file " + outputParquetFile.getAbsolutePath() +
" already exists");
}

Path path = new Path(outputParquetFile.toURI());

MessageType schema = MessageTypeParser.parseMessageType(rawSchema);
CsvParquetWriter writer = new CsvParquetWriter(path, schema, enableDictionary);

BufferedReader br = new BufferedReader(new FileReader(csvFile));
String line;
int lineNumber = 0;
try {
while ((line = br.readLine()) != null) {
String[] fields = line.split(Pattern.quote(CSV_DELIMITER));
writer.write(Arrays.asList(fields));
++lineNumber;
}
} finally {
LOG.info("Number of lines: " + lineNumber);
Utils.closeQuietly(br);
Utils.closeQuietly(writer);
}
}

public static void convertParquetToCSV(File parquetFile, File csvOutputFile) throws IOException {
Preconditions.checkArgument(parquetFile.getName().endsWith(".parquet"),
"parquet file should have .parquet extension");
Preconditions.checkArgument(csvOutputFile.getName().endsWith(".csv"),
"csv file should have .csv extension");
Preconditions.checkArgument(!csvOutputFile.exists(),
"Output file " + csvOutputFile.getAbsolutePath() + " already exists");

LOG.info("Converting " + parquetFile.getName() + " to " + csvOutputFile.getName());


Path parquetFilePath = new Path(parquetFile.toURI());

Configuration configuration = new Configuration(true);

GroupReadSupport readSupport = new GroupReadSupport();
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath);
MessageType schema = readFooter.getFileMetaData().getSchema();

readSupport.init(configuration, null, schema);
BufferedWriter w = new BufferedWriter(new FileWriter(csvOutputFile));
ParquetReader<Group> reader = new ParquetReader<Group>(parquetFilePath, readSupport);
try{
Group g = null;
while( (g = reader.read())!= null) {
writeGroup(w, g, schema);
}
}
finally {
Utils.closeQuietly(reader);
Utils.closeQuietly(w);
}
}

private static void writeGroup(BufferedWriter w, Group g, MessageType schema)
throws IOException{
for (int j = 0; j < schema.getFieldCount(); j++) {
if (j > 0) {
w.write(CSV_DELIMITER);
}
String valueToString = g.getValueToString(j, 0);
w.write(valueToString);
}
w.write('\n');
}

@Deprecated
public static void convertParquetToCSVEx(File parquetFile, File csvOutputFile) throws IOException {
Preconditions.checkArgument(parquetFile.getName().endsWith(".parquet"),
"parquet file should have .parquet extension");
Preconditions.checkArgument(csvOutputFile.getName().endsWith(".csv"),
"csv file should have .csv extension");
Preconditions.checkArgument(!csvOutputFile.exists(),
"Output file " + csvOutputFile.getAbsolutePath() + " already exists");

LOG.info("Converting " + parquetFile.getName() + " to " + csvOutputFile.getName());

Path parquetFilePath = new Path(parquetFile.toURI());

Configuration configuration = new Configuration(true);

// TODO Following can be changed by using ParquetReader instead of ParquetFileReader
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath);
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader parquetFileReader = new ParquetFileReader(
configuration, parquetFilePath, readFooter.getBlocks(), schema.getColumns());
BufferedWriter w = new BufferedWriter(new FileWriter(csvOutputFile));
PageReadStore pages = null;
try {
while (null != (pages = parquetFileReader.readNextRowGroup())) {
final long rows = pages.getRowCount();
LOG.info("Number of rows: " + rows);

final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
final Group g = recordReader.read();
writeGroup(w, g, schema);
}
}
} finally {
Utils.closeQuietly(parquetFileReader);
Utils.closeQuietly(w);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet.compat.test;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.Path;

import parquet.hadoop.ParquetWriter;
import parquet.hadoop.api.WriteSupport;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.MessageType;

public class CsvParquetWriter extends ParquetWriter<List<String>> {

public CsvParquetWriter(Path file, MessageType schema) throws IOException {
this(file, schema, false);
}

public CsvParquetWriter(Path file, MessageType schema, boolean enableDictionary) throws IOException {
this(file, schema, CompressionCodecName.UNCOMPRESSED, enableDictionary);
}

public CsvParquetWriter(Path file, MessageType schema, CompressionCodecName codecName, boolean enableDictionary) throws IOException {
super(file, (WriteSupport<List<String>>) new CsvWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}
}
Loading

0 comments on commit b87bbc9

Please sign in to comment.