Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make projected parquet collection schema forward compatible with the given file schema #1921

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e373ea2
forward list compat
mickjermsurawong-stripe Sep 27, 2019
721a490
bump travis to openjdk8
mickjermsurawong-stripe Sep 28, 2019
0adbfae
fail when projecting on list of struct
mickjermsurawong-stripe Sep 29, 2019
84c119a
recurse only on elements
mickjermsurawong-stripe Sep 29, 2019
6bf7652
check for non-optional extra fields for projection
mickjermsurawong-stripe Sep 30, 2019
018f267
move to scala
mickjermsurawong-stripe Sep 30, 2019
ae89c06
migrate test to different class
mickjermsurawong-stripe Sep 30, 2019
ca67991
handle map legacy format
mickjermsurawong-stripe Sep 30, 2019
b7363b8
format import
mickjermsurawong-stripe Sep 30, 2019
2996317
improve docs
mickjermsurawong-stripe Sep 30, 2019
e02d0a8
fix style warning
mickjermsurawong-stripe Sep 30, 2019
4d40c06
address PR feedback
mickjermsurawong-stripe Oct 1, 2019
6e551e5
undo import wildcard
mickjermsurawong-stripe Oct 1, 2019
436def8
fix warning descenents of sealed traits
mickjermsurawong-stripe Oct 1, 2019
7ba2305
remove duplicate test
mickjermsurawong-stripe Oct 1, 2019
d2f2fb5
improve test names and remove one duplicate
mickjermsurawong-stripe Oct 1, 2019
9c96d23
add docs
mickjermsurawong-stripe Oct 1, 2019
09bd4dd
Check on schema type mismatch
mickjermsurawong-stripe Oct 3, 2019
83b6898
explicit rename from source/target to projected read schema and file …
mickjermsurawong-stripe Oct 3, 2019
d0ed5d6
support creating _tuple format and generalize compat in all directions
mickjermsurawong-stripe Oct 3, 2019
5de8b64
support legacy spark list of nullable elements
mickjermsurawong-stripe Oct 3, 2019
db937c3
improve docs
mickjermsurawong-stripe Oct 3, 2019
0c38af8
add tests for all supported list compat conversions
mickjermsurawong-stripe Oct 3, 2019
3700a85
improve docs
mickjermsurawong-stripe Oct 4, 2019
3b7255b
remove classtag inference
mickjermsurawong-stripe Oct 4, 2019
3a9be2e
rename schema name and use consistent method
mickjermsurawong-stripe Oct 4, 2019
5b5cf2b
file rename to drop "forward" compat
mickjermsurawong-stripe Oct 4, 2019
e29c7e9
test rename and make variables consistent
mickjermsurawong-stripe Oct 4, 2019
cd7e69c
make names file/read oriented
mickjermsurawong-stripe Oct 4, 2019
b05f13c
improve test make sure formatted type is still compatible with given …
mickjermsurawong-stripe Oct 4, 2019
7bb0382
check for field optional/required
mickjermsurawong-stripe Oct 4, 2019
6365db8
Review suggestions for https://github.com/twitter/scalding/pull/1921 …
joshrosen-stripe Oct 5, 2019
b6c8a92
improve code coverage and remove dead code after restructuring
mickjermsurawong-stripe Oct 5, 2019
579905c
auto-format from running sbt test
mickjermsurawong-stripe Oct 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: scala
jdk: oraclejdk8
jdk: openjdk8
sudo: false

before_install:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, String p
*/
public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) {
assertGroupsAreCompatible(fileMessageType, projectedMessageType);
return projectedMessageType;
return ParquetCollectionFormatCompatibility.projectFileSchema(
fileMessageType, projectedMessageType
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package com.twitter.scalding.parquet.scrooge

import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{ GroupType, MessageType, Type }
import org.apache.parquet.thrift.DecodingSchemaMismatchException
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._

/**
* Project file schema based on projected read schema which may contain different format
* of collection group--list and map. This is currently used in [[ScroogeReadSupport]] where
* projected read schema can come from:
* 1) Thrift struct via [[org.apache.parquet.thrift.ThriftSchemaConvertVisitor]] which always
* describe list with `_tuple` format, and map which has `MAP_KEY_VALUE` annotation.
* 2) User-supplied schema string via config key
* [[org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA]]
*
* By definition, the projected read schema is a "sub-graph" of file schema in terms of field names.
* (We do allow optional field in projected read schema to be in
* the projected file schema, even if file schema may not originally contain it.)
* The graphs of the two schemas may, however, differ for list and map type because of multiple
* legacy formats and the canonical one. This class supports all directions of conversion.
*
* The projection strategy is:
* 1) traverse the two schemas and maintain only the fields in the read schema.
* 2) find collection type indicated by `repeated` type, and delegate it to respective list/map formatter.
* 3) wrap back the formatted repeated type with group type from projected read schema. This
* means the optional/required remains the same as that from projected read schema.
*/
private[scrooge] object ParquetCollectionFormatCompatibility {

private val logger = LoggerFactory.getLogger(getClass)

/**
* Project file schema to contain the same fields as the given projected read schema.
* The result is projected file schema with the same optional/required fields as the
* projected read schema, but collection type format as the file schema.
*
* @param fileSchema file schema to be projected
* @param projectedReadSchema read schema specifying field projection
*/
def projectFileSchema(fileSchema: MessageType, projectedReadSchema: MessageType): MessageType = {
val projectedFileSchema = projectFileType(fileSchema, projectedReadSchema, FieldContext()).asGroupType()
logger.debug(s"Projected read schema:\n${projectedReadSchema}\n" +
s"File schema:\n${fileSchema}\n" +
s"Projected file schema:\n${projectedFileSchema}")
new MessageType(projectedFileSchema.getName, projectedFileSchema.getFields)
}

/**
* Main recursion to get projected file type. Traverse given schemas, filter out unneeded
* fields, and format read schema's list/map node to file schema's structure.
* The formatting of repeated type is not to one-to-one node swapping because we also have to
* handle projection and possible nested collection types in the repeated type.
*/
private def projectFileType(fileType: Type, projectedReadType: Type, fieldContext: FieldContext): Type = {
if (projectedReadType.isPrimitive || fileType.isPrimitive) {
// Base-cases to handle primitive types:
if (projectedReadType.isPrimitive && fileType.isPrimitive) {
// The field is a primitive in both schemas
projectedReadType
Copy link

@joshrosen-stripe joshrosen-stripe Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check that both types are the same primitive type? Or is that handled elsewhere? I imagine that certain types of primitive-type-changing projections might be supported (e.g. widening an int to a long) but others might not be (e.g. converting a string to an int).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That type compatibility is handled in a separate place here, so by the time this is called we should already have type compatibility.

public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) {
assertGroupsAreCompatible(fileMessageType, projectedMessageType);
return projectedMessageType;
}

} else {
// The field is primitive in one schema but non-primitive in the othe other
throw new DecodingSchemaMismatchException(
s"Found schema mismatch between projected read type:\n$projectedReadType\n" +
s"and file type:\n${fileType}")
}
} else {
// Recursive cases to handle non-primitives (lists, maps, and structs):
(extractCollectionGroup(projectedReadType.asGroupType()), extractCollectionGroup(fileType.asGroupType())) match {
case (Some(projectedReadGroup: ListGroup), Some(fileGroup: ListGroup)) =>
projectFileGroup(fileGroup, projectedReadGroup, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1), formatter = ParquetListFormatter)
case (Some(projectedReadGroup: MapGroup), Some(fileGroup: MapGroup)) =>
projectFileGroup(fileGroup, projectedReadGroup, fieldContext, formatter = ParquetMapFormatter)
case _ => // Struct projection
val projectedReadGroupType = projectedReadType.asGroupType
val fileGroupType = fileType.asGroupType
val projectedReadFields = projectedReadGroupType.getFields.asScala.map { projectedReadField =>
if (!fileGroupType.containsField(projectedReadField.getName)) {
// The projected read schema includes a field which is missing from the file schema.
if (projectedReadField.isRepetition(Repetition.OPTIONAL)) {
// The missing field is optional in the projected read schema. Since the file schema
// doesn't contain this field there are no collection compatibility concerns to worry
// about and we can simply use the supplied schema:
projectedReadField
} else {
// The missing field is repeated or required, which is an error:
throw new DecodingSchemaMismatchException(
s"Found non-optional projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" +
s"not present in the given file group type:\n${fileGroupType}")
}
} else {
// The field is present in both schemas, so first check that the schemas specify compatible repetition
// values for the field, then recursively process the fields:
val fileFieldIndex = fileGroupType.getFieldIndex(projectedReadField.getName)
val fileField = fileGroupType.getFields.get(fileFieldIndex)
if (fileField.isRepetition(Repetition.OPTIONAL) && projectedReadField.isRepetition(Repetition.REQUIRED)) {
// The field is optional in the file schema but required in the projected read schema; this is an error:
throw new DecodingSchemaMismatchException(
s"Found required projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" +
s"on optional file field:\n${fileField}")
} else {
// The field's repetitions are compatible in both schemas (e.g. optional in both schemas or required
// in both), so recursively process the field:
projectFileType(fileField, projectedReadField, FieldContext(projectedReadField.getName))
}
}
}
projectedReadGroupType.withNewFields(projectedReadFields.asJava)
}
}
}

private def projectFileGroup(fileGroup: CollectionGroup,
projectedReadGroup: CollectionGroup,
fieldContext: FieldContext,
formatter: ParquetCollectionFormatter): GroupType = {
val projectedFileRepeatedType = formatter.formatCompatibleRepeatedType(
fileGroup.repeatedType,
projectedReadGroup.repeatedType,
fieldContext,
projectFileType)
// Respect optional/required from the projected read group.
projectedReadGroup.groupType.withNewFields(projectedFileRepeatedType)
}

private def extractCollectionGroup(typ: GroupType): Option[CollectionGroup] = {
ParquetListFormatter.extractGroup(typ).orElse(ParquetMapFormatter.extractGroup(typ))
}
}

private[scrooge] trait ParquetCollectionFormatter {
/**
* Format source repeated type in the structure of target repeated type.
*
* @param fileRepeatedType repeated type from which the formatted result get the structure
* @param readRepeatedType repeated type from which the formatted result get content
* @param recursiveSolver solver for the inner content of the repeated type
* @return formatted result
*/
def formatCompatibleRepeatedType(fileRepeatedType: Type,
readRepeatedType: Type,
fieldContext: FieldContext,
recursiveSolver: (Type, Type, FieldContext) => Type): Type

/**
* Extract collection group containing repeated type of different formats.
*/
def extractGroup(typ: GroupType): Option[CollectionGroup]
}

/**
* Helper class to carry information from the field. Currently it only contains specific to list collection
* @param name field name
* @param nestedListLevel li
*/
private[scrooge] case class FieldContext(name: String = "", nestedListLevel: Int = 0)

private[scrooge] sealed trait CollectionGroup {
/**
* Type for the collection.
* For example, given the schema,
* required group my_list (LIST) {
* repeated group list {
* optional binary element (UTF8);
* }
* }
* [[groupType]] refers to this whole schema
* [[repeatedType]] refers to inner `repeated` schema
*/
def groupType: GroupType

def repeatedType: Type
}

private[scrooge] sealed case class MapGroup(groupType: GroupType, repeatedType: Type) extends CollectionGroup

private[scrooge] sealed case class ListGroup(groupType: GroupType, repeatedType: Type) extends CollectionGroup
Loading