Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make projected parquet collection schema forward compatible with the given file schema #1921

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e373ea2
forward list compat
mickjermsurawong-stripe Sep 27, 2019
721a490
bump travis to openjdk8
mickjermsurawong-stripe Sep 28, 2019
0adbfae
fail when projecting on list of struct
mickjermsurawong-stripe Sep 29, 2019
84c119a
recurse only on elements
mickjermsurawong-stripe Sep 29, 2019
6bf7652
check for non-optional extra fields for projection
mickjermsurawong-stripe Sep 30, 2019
018f267
move to scala
mickjermsurawong-stripe Sep 30, 2019
ae89c06
migrate test to different class
mickjermsurawong-stripe Sep 30, 2019
ca67991
handle map legacy format
mickjermsurawong-stripe Sep 30, 2019
b7363b8
format import
mickjermsurawong-stripe Sep 30, 2019
2996317
improve docs
mickjermsurawong-stripe Sep 30, 2019
e02d0a8
fix style warning
mickjermsurawong-stripe Sep 30, 2019
4d40c06
address PR feedback
mickjermsurawong-stripe Oct 1, 2019
6e551e5
undo import wildcard
mickjermsurawong-stripe Oct 1, 2019
436def8
fix warning descenents of sealed traits
mickjermsurawong-stripe Oct 1, 2019
7ba2305
remove duplicate test
mickjermsurawong-stripe Oct 1, 2019
d2f2fb5
improve test names and remove one duplicate
mickjermsurawong-stripe Oct 1, 2019
9c96d23
add docs
mickjermsurawong-stripe Oct 1, 2019
09bd4dd
Check on schema type mismatch
mickjermsurawong-stripe Oct 3, 2019
83b6898
explicit rename from source/target to projected read schema and file …
mickjermsurawong-stripe Oct 3, 2019
d0ed5d6
support creating _tuple format and generalize compat in all directions
mickjermsurawong-stripe Oct 3, 2019
5de8b64
support legacy spark list of nullable elements
mickjermsurawong-stripe Oct 3, 2019
db937c3
improve docs
mickjermsurawong-stripe Oct 3, 2019
0c38af8
add tests for all supported list compat conversions
mickjermsurawong-stripe Oct 3, 2019
3700a85
improve docs
mickjermsurawong-stripe Oct 4, 2019
3b7255b
remove classtag inference
mickjermsurawong-stripe Oct 4, 2019
3a9be2e
rename schema name and use consistent method
mickjermsurawong-stripe Oct 4, 2019
5b5cf2b
file rename to drop "forward" compat
mickjermsurawong-stripe Oct 4, 2019
e29c7e9
test rename and make variables consistent
mickjermsurawong-stripe Oct 4, 2019
cd7e69c
make names file/read oriented
mickjermsurawong-stripe Oct 4, 2019
b05f13c
improve test make sure formatted type is still compatible with given …
mickjermsurawong-stripe Oct 4, 2019
7bb0382
check for field optional/required
mickjermsurawong-stripe Oct 4, 2019
6365db8
Review suggestions for https://github.com/twitter/scalding/pull/1921 …
joshrosen-stripe Oct 5, 2019
b6c8a92
improve code coverage and remove dead code after restructuring
mickjermsurawong-stripe Oct 5, 2019
579905c
auto-format from running sbt test
mickjermsurawong-stripe Oct 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
package com.twitter.scalding.parquet.scrooge;

import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;

/**
* Compatibility class to convert parquet schema of legacy type to standard one
* namely 3-level list structure as recommended in
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*
* More specifically this handles converting from parquet file created by
* {{@code org.apache.parquet.thrift.ThriftSchemaConvertVisitor}} which always suffix
* list element with "_tuple".
*/
public class ParquetListFormatForwardCompatibility {
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved

private static List<Rule> RULES = Arrays.asList(
new RulePrimitiveElement(),
new RulePrimitiveArray(),

new RuleGroupElement(),
new RuleGroupArray(),

new RuleGroupTuple(),
new RuleStandardThreeLevel());

/**
* Rule describes how to match a repeated type, how to decompose them, and reconstruct a
* repeated type.
*/
abstract static public class Rule {
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
public Type elementType(Type repeatedType) {
if (repeatedType.isPrimitive()) {
return repeatedType;
} else {
return firstField(repeatedType.asGroupType());
}
}

public Boolean isElementRequired(Type repeatedType) {
return true;
}

public String elementName(Type repeatedType) {
return this.elementType(repeatedType).getName();
}

public OriginalType elementOriginalType(Type repeatedType) {
return this.elementType(repeatedType).getOriginalType();
}

abstract Boolean check(Type type);

abstract Type createCompliantRepeatedType(Type type, String name, Boolean isElementRequired, OriginalType originalType);

}

static class RulePrimitiveElement extends Rule {

public String constantElementName() {
return "element";
}

@Override
public Boolean check(Type repeatedType) {
return repeatedType.isPrimitive() && repeatedType.getName().equals(this.constantElementName());
}

@Override
public Type createCompliantRepeatedType(Type type, String name, Boolean isElementRequired, OriginalType originalType) {
if (!isElementRequired) {
throw new IllegalArgumentException("Rule 1 can only take required element");
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
}
if (!type.isPrimitive()) {
throw new IllegalArgumentException(
String.format("Rule 1 cannot take primitive type, but is given %s", type));
}
return new PrimitiveType(
Type.Repetition.REPEATED,
type.asPrimitiveType().getPrimitiveTypeName(),
this.constantElementName(),
originalType
);
}

}

static class RulePrimitiveArray extends RulePrimitiveElement {
@Override
public String constantElementName() {
return "array";
}
}

static class RuleGroupElement extends Rule {
public String constantElementName() {
return "element";
}

public Type elementType(Type repeatedType) {
return repeatedType;
}

@Override
public String elementName(Type repeatedType) {
return this.constantElementName();
}

@Override
public Boolean check(Type repeatedType) {
if (repeatedType.isPrimitive()) {
return false;
} else {
GroupType repeatedGroup = repeatedType.asGroupType();
return repeatedGroup.getFields().size() > 0 && repeatedGroup.getName().equals(this.constantElementName());
}
}

@Override
public Type createCompliantRepeatedType(Type type, String name, Boolean isElementRequired, OriginalType originalType) {
if (type.isPrimitive()) {
return new GroupType(
Type.Repetition.REPEATED,
this.constantElementName(),
type
);
} else {
return new GroupType(
Type.Repetition.REPEATED,
this.constantElementName(),
type.asGroupType().getFields()
);
}
}
}

static class RuleGroupArray extends RuleGroupElement {
@Override
public String constantElementName() {
return "array";
}
}

static class RuleGroupTuple extends Rule {

@Override
public Boolean check(Type repeatedType) {
return repeatedType.getName().endsWith("_tuple");
}

public Type elementType(Type repeatedType) {
return repeatedType;
}

@Override
public Type createCompliantRepeatedType(Type type, String name, Boolean isElementRequired, OriginalType originalType) {
if (!type.isPrimitive()) {
throw new IllegalArgumentException(String.format(
"Rule 3 can only take group type, but found %s", type));
}
if (!name.endsWith("_tuple")) {
name = name + "_tuple";
}
return new PrimitiveType(
Type.Repetition.REPEATED,
type.asPrimitiveType().getPrimitiveTypeName(),
name,
originalType
);
}
}

static class RuleStandardThreeLevel extends Rule {
@Override
public Boolean check(Type repeatedField) {
if (repeatedField.isPrimitive() || !repeatedField.getName().equals("list")) {
return false;
}
Type elementType = firstField(repeatedField.asGroupType());
return elementType.getName().equals("element");
}

@Override
public String elementName(Type repeatedType) {
return "element";
}

@Override
public Type createCompliantRepeatedType(Type type, String name, Boolean isElementRequired, OriginalType originalType) {
Type elementType = null;
if (type.isPrimitive()) {
elementType = new PrimitiveType(
isElementRequired ? Type.Repetition.REQUIRED : Type.Repetition.OPTIONAL,
type.asPrimitiveType().getPrimitiveTypeName(),
"element",
originalType
);
} else {
elementType = new GroupType(
isElementRequired ? Type.Repetition.REQUIRED : Type.Repetition.OPTIONAL,
"element",
// we cannot flatten `list`
type.asGroupType().getName().equals("list") ?
Arrays.asList(type) :
type.asGroupType().getFields()
);
}

return new GroupType(
Type.Repetition.REPEATED,
"list",
Arrays.asList(elementType)
);
}
}

private static org.apache.parquet.schema.Type firstField(GroupType groupType) {
return groupType.getFields().get(0);
}

private static boolean isGroupList(Type projection) {
if (projection.isPrimitive()) {
return false;
}
GroupType groupProjection = projection.asGroupType();
return groupProjection.getOriginalType() == OriginalType.LIST &&
groupProjection.getFieldCount() == 1 &&
groupProjection.getFields().get(0).isRepetition(Type.Repetition.REPEATED);
}

/**
* Resolve list format in forward compatible way.
* @param fileType file type which has new format
* @param projection projection type which has legacy format
* @return projection schema in the new format.
*/
public Type resolveTypeFormat(Type fileType, Type projection) {
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
if (projection.isPrimitive() || fileType.isPrimitive()) {
return projection;
}
ParquetListFormatForwardCompatibility compatibility = new ParquetListFormatForwardCompatibility();
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved

GroupType groupFile = fileType.asGroupType();
GroupType groupProjection = projection.asGroupType();

GroupUnwrapped unwrappedFile = unwrapGroup(groupFile, new Stack<GroupType>());
GroupUnwrapped unwrappedProjection = unwrapGroup(groupProjection, new Stack<GroupType>());

Type repeatedFile = unwrappedFile.repeatedType;
Type repeatedProjection = unwrappedProjection.repeatedType;

if (repeatedProjection != null && repeatedFile != null) {
// Recurse on the repeated content. This is to handle nested list
Type repeatedResolved = resolveTypeFormat(repeatedFile, repeatedProjection);
// Make projected structure compatible with file type
Type repeatedFormatted = compatibility
.makeForwardCompatible(repeatedFile, repeatedResolved);

// Wrap back the groups, this contain field name and whether it's optional/required
Type resolvedGroupType = repeatedFormatted;
while (!unwrappedProjection.wrappers.isEmpty()) {
resolvedGroupType = unwrappedProjection.wrappers.pop().withNewFields(resolvedGroupType);
}
return resolvedGroupType;
} else {
List<Type> fields = new ArrayList<Type>();
for (Type projected : groupProjection.getFields()) {
if (!projected.isPrimitive()) {
// The file type field must be a group type too
int fieldIndex = groupFile.getFieldIndex(projected.getName());
Type fileField = groupFile.getFields().get(fieldIndex);
fields.add(resolveTypeFormat(fileField.asGroupType(), projected.asGroupType()));
} else {
fields.add(projected);
}
}
return groupProjection.withNewFields(fields);
}
}

private Rule findFirstRule(Type repeatedType, String debuggingTypeSource) {
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
Rule matchedRule = null;
for (Rule rule : RULES) {
if (rule.check(repeatedType)) {
matchedRule = rule;
break;
}
}
if (matchedRule == null) {
throw new RuntimeException(String.format(
"Unable to find matching rule for %s schema:\n%s", debuggingTypeSource, repeatedType));
}
return matchedRule;
}

private Type makeForwardCompatible(Type repeatedFileType, Type repeatedProjectedType) {
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
Rule fileTypeRule = findFirstRule(repeatedFileType, "file");
Rule projectedTypeRule = findFirstRule(repeatedProjectedType, "projected");

if (projectedTypeRule == fileTypeRule) {
return repeatedProjectedType;
}

String elementName = projectedTypeRule.elementName(repeatedProjectedType);
Type elementType = projectedTypeRule.elementType(repeatedProjectedType);
Boolean isElementRequired = projectedTypeRule.isElementRequired(repeatedProjectedType);
OriginalType elementOriginalType = projectedTypeRule.elementOriginalType(repeatedProjectedType);

return fileTypeRule.createCompliantRepeatedType(
elementType,
elementName,
isElementRequired,
elementOriginalType);
}

private static class GroupUnwrapped {
Stack<GroupType> wrappers;
Type repeatedType;

public GroupUnwrapped(Stack<GroupType> wrappers, Type repeatedType) {
this.wrappers = wrappers;
this.repeatedType = repeatedType;
}
}

private static GroupUnwrapped unwrapGroup(Type type, Stack<GroupType> wrappers) {
Type ptr = type;
// only wrapper for list with size one, so we can wrap repeated type later
while (!ptr.isPrimitive()) {
wrappers.push(ptr.asGroupType());
if (isGroupList(ptr)) {
// when it is repeated
return new GroupUnwrapped(wrappers, ptr.asGroupType().getFields().get(0));
} else if (ptr.asGroupType().getFields().size() == 1){
ptr = ptr.asGroupType().getFields().get(0);
} else {
break;
}
}
return new GroupUnwrapped(wrappers, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
import org.apache.parquet.thrift.projection.ThriftProjectionException;
import org.apache.parquet.thrift.struct.ThriftType;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

/**
* Read support for Scrooge
Expand Down Expand Up @@ -132,7 +130,8 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, String p
*/
public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) {
assertGroupsAreCompatible(fileMessageType, projectedMessageType);
return projectedMessageType;
Type resolved = new ParquetListFormatForwardCompatibility().resolveTypeFormat(fileMessageType, projectedMessageType);
return new MessageType(projectedMessageType.getName(), resolved.asGroupType().getFields());
}

/**
Expand Down
Loading