-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make projected parquet collection schema forward compatible with the given file schema #1921
base: develop
Are you sure you want to change the base?
Conversation
|
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
...et-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/ScroogeReadSupportTests.scala
Outdated
Show resolved
Hide resolved
...et-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/ScroogeReadSupportTests.scala
Outdated
Show resolved
Hide resolved
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
Hi @johnynek, Just would like to ask for a quick look whether this is the right strategy, and if this is something that we would like to push it to the public one as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
Have we run this internally to do some smoke tests that it can read old data that was previously written?
I added some general comments. I really don't know the latest with parquet. Maybe @isnotinvain or @julienledem can comment?
cc @ttim
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/twitter/scalding/parquet/scrooge/ParquetListFormatForwardCompatibility.java
Outdated
Show resolved
Hide resolved
Also, you need to bump the |
Thanks @johnynek. Ah I haven't done any tests internally on actual data. Just purely unit tests now. Will follow up with that. I can port all these to Scala, and will make the resolver stateless; yup the object instantiation isn't neccessary. |
hi @isnotinvain or @julienledem, I've addressed the first round of general feedback, and added support for map schema compat as well. Would appreciate your review here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some initial review comments, primarily focused on understanding (and potentially simplifying) two of the larger and more complex pattern matches at the heart of the converter.
I also left a couple of minor style nits, but those can be addressed later (in a final polish pass) once we're confident in the rest of the design.
I haven't finished looking at this yet, though: I still need to go through ParquetListFormatRule
and also need to take a closer look at the test cases.
...scala/com/twitter/scalding/parquet/scrooge/ParquetCollectionFormatForwardCompatibility.scala
Outdated
Show resolved
Hide resolved
...scala/com/twitter/scalding/parquet/scrooge/ParquetCollectionFormatForwardCompatibility.scala
Outdated
Show resolved
Hide resolved
repeatedListType: Option[Type] = None, | ||
repeatedMapType: Option[Type] = None) | ||
|
||
private def unwrapGroup(typ: Type, wrappers: Seq[GroupType] = Seq()): GroupUnwrapped = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a naive question, but what does "wrapped" or "wrapper" refer to in this case? Could you add a comment explaining this? That might make the following code easier to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now simplified to just extract for only the repeated type, and let the recursive call in main method formatForwardCompatibleType
handle multiple layers instead.
This now becomes a sealed trait for the group type itself, and a helper method to get its repeated type
private[scrooge] sealed trait CollectionGroup {
def groupType: GroupType
def repeatedType: Type
}
} | ||
} | ||
|
||
def isGroupList(projection: Type): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm my understanding:
This is matching on things like
required group my_list (LIST) {
repeated group list {
optional binary element (UTF8);
}
}
where groupProjection
here refers to the top-most type, and the groupProjection.getFields.get(0).isRepetition(Type.Repetition.REPEATED)
check would apply to the second-level element (repeated group list
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup that's correct on groupProjection
, and the repeated is the second level.
The part after repeated
could be different depending on formats eg.
required group my_list (LIST) {
repeated repeated int32 [element|array];
}
repeatedListType=None, | ||
repeatedMapType=None | ||
) | ||
} else if (typ.asGroupType.getFieldCount != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this else if
branch and the else
branch I noticed we have typ.asGroupType
calls everywhere. To avoid that and to make it clear that !typ.isPrimitive
implies that typ
is a group type, could we instead do something like
if (typ.isPrimitive) {
// ...
} else {
val groupTyp = typ.asGroupTyp
if (groupTyp.getFieldCount != 1) {
//...
} else {
//...
}
}
We might be able to further improve clarity by changing the sign of the second condition, doing
if (typ.isPrimitive) {
// ...
} else {
val groupTyp = typ.asGroupTyp
if (groupTyp.getFieldCount == 1) { // <--- inverted condition
//...
} else {
//...
}
}
which then simplifies or clarifies the
// Note the field count is strictly 1 here
comment present in the current code.
We could then add a comment in the else
branch saying something like
// The group contains more than one field, so ... <whatever that implies>
This is maybe nitpicky, but I'm trying to walk through each case here to make sure that I understand what all of the branches correspond to, so I'm considering whether a different ordering or presentation of the cases can make it easier to understand and verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified this to two methods to extract group list and maplist instead.
* this is because the subset fields of source node and its optional/required must | ||
* be maintained in the formatted result. | ||
*/ | ||
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = { |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
...scala/com/twitter/scalding/parquet/scrooge/ParquetCollectionFormatForwardCompatibility.scala
Outdated
Show resolved
Hide resolved
...quet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetListFormatRule.scala
Outdated
Show resolved
Hide resolved
|
||
private[scrooge] def isElementRequired(repeatedType: Type): Boolean | ||
|
||
private[scrooge] def check(typ: Type): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the name, it's a bit non-obvious what this method does. It this actually something like appliesToType(typ: Type)
which returns true
if the type is a list of this format and false
otherwise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on naming this something more descriptive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated as suggested
def name: String | ||
} | ||
|
||
private[scrooge] object Source extends SourceOrTarget { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the advantage of distinguishing between Source and Target? is it so we can define supported directionalities in terms of parquet spec conversion? I'm wondering if we're being too specific here and wind up seeing a direction that we didn't engineer for, and we fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess since scalding is going to be stuck on the older parquet spec, this would not happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack. Will see how I can do end-to-end testing in this repo.
Ah the only thing i don't support here is the target type of _tuple
format which then would be compatible with the thrift-generated projection source
override def check(repeatedType: Type): Boolean = repeatedType.getName.endsWith("_tuple") | ||
|
||
override def elementName(repeatedType: Type): String = { | ||
repeatedType.getName.substring(0, repeatedType.getName.length - 6) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would like docs or variable/method renaming to explain this substring indexing a little more clearly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now made _tuple
as a variable where length is calculated from it more explicitly.
...quet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetListFormatRule.scala
Outdated
Show resolved
Hide resolved
import org.apache.parquet.thrift.struct.ThriftType.{ListType, MapType, StructType} | ||
import org.scalatest.{Matchers, WordSpec} | ||
|
||
class ParquetCollectionFormatForwardCompatibilityTests extends WordSpec with Matchers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc how hard would it be to add some actual thrift fixtures based tests in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack. Will see how I can do end-to-end testing in this repo.
@joshrosen-stripe, @tchow-stripe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This review is incomplete, I'll finish later this evening. I've got some rename suggestions and some possible bugs so far.
sourceGroup.groupType.withNewFields(formattedRepeated) | ||
} | ||
|
||
private def findCollectionGroup(typ: Type): Option[CollectionGroup] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming nit:
private def findCollectionGroup(typ: Type): Option[CollectionGroup] = { | |
private def extractCollectionGroup(typ: Type): Option[CollectionGroup] = { |
"find" implies that the return values will be a subset of the input, while "extract" (which the underlying functions use) suggests that the whole input will be considered.
formatForwardCompatibleCollectionGroup[ListGroup](sourceGroup, targetGroup) | ||
case (Some(sourceGroup: MapGroup), Some(targetGroup: MapGroup)) => | ||
formatForwardCompatibleCollectionGroup[MapGroup](sourceGroup, targetGroup) | ||
case _ if sourceType.isPrimitive || targetType.isPrimitive => // Base case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would explicitly check and throw if one is a primitive but not the other. As written this will silently pass through certain schema mismatches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
* this is because the subset fields of source node and its optional/required must | ||
* be maintained in the formatted result. | ||
*/ | ||
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = { | |
private def formatForwardCompatibleType(fileType: Type, projectionType: Type): Type = { |
"source" and "target" are very generic names and this function is really only used for one thing. It might make sense to rename them accordingly to make reasoning about the correctness here easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From conversation thread below:
By that i mean we want to convert projection source schema to be the in the format of the target file schema.
val targetGroup = targetType.asGroupType | ||
val resultFields = sourceGroup.getFields.asScala.map { sourceField => | ||
if (!targetGroup.containsField(sourceField.getName)) { | ||
if (!sourceField.isRepetition(Repetition.OPTIONAL)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is incorrect, I think. "source" == the file's schema, yes (see above renaming suggestion)? If so, then the optionality of a field which is not included in the projection is not important. We should be iterating over the fields in the projected (target) schema and checking the optionality of the projected fields if they're missing from the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah no, source here is actually the projection, and target is the file's scheme.
By that i mean we want to convert projection source schema to be the in the format of the target file schema.
Given this confusion, i think makes sense that I should rename this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on this point, I think naming convention along the lines of: fileSchema, readSchema, projectedFileSchema, projectedReadSchema all help clarify which is which
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One potential simplification and some rename/docs nits. Looking good!
...scala/com/twitter/scalding/parquet/scrooge/ParquetCollectionFormatForwardCompatibility.scala
Outdated
Show resolved
Hide resolved
...rquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetListFormatter.scala
Outdated
Show resolved
Hide resolved
fileGroup.repeatedType, | ||
projectFileType(_, _)) | ||
// Respect optional/required from the projected read group. | ||
projectedReadGroup.groupType.withNewFields(projectedFileRepeatedType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this end up letting through cases where the file's field is optional and the projection's field is required? If so we should explicitly detect and throw for that (probably in projectFileType
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the assertion at the "struct projection" which will have access of field-level optional/required
...scala/com/twitter/scalding/parquet/scrooge/ParquetCollectionFormatForwardCompatibility.scala
Outdated
Show resolved
Hide resolved
override val rules: Seq[ParquetListFormatRule] = Seq( | ||
PrimitiveElementRule, PrimitiveArrayRule, | ||
GroupElementRule, GroupArrayRule, | ||
TupleRule, StandardRule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Soooo many cases... That said, I like this approach. Makes it painfully clear the variants that we're supporting.
...arquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetMapFormatter.scala
Outdated
Show resolved
Hide resolved
private def projectFileGroup[T <: CollectionGroup](projectedReadGroup: T, | ||
fileGroup: T, | ||
fieldContext: FieldContext)(implicit t: ClassTag[T]): GroupType = { | ||
|
||
val (formatter, updatedFieldContext) = t.runtimeClass.asInstanceOf[Class[T]] match { | ||
case c if c == classOf[MapGroup] => | ||
(ParquetMapFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1)) | ||
case c if c == classOf[ListGroup] => | ||
(ParquetListFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's refactor this to avoid reflection and implicit magic. E.g.:
private def projectFileGroup[T <: CollectionGroup](projectedReadGroup: T, | |
fileGroup: T, | |
fieldContext: FieldContext)(implicit t: ClassTag[T]): GroupType = { | |
val (formatter, updatedFieldContext) = t.runtimeClass.asInstanceOf[Class[T]] match { | |
case c if c == classOf[MapGroup] => | |
(ParquetMapFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1)) | |
case c if c == classOf[ListGroup] => | |
(ParquetListFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1)) | |
} | |
private def projectFileGroup(projectedReadGroup: T, fileGroup: T, fieldContext: FieldContext, formatter: ParquetCollectionFormatter): GroupType = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to
def projectFileGroup(fileGroup: CollectionGroup,
projectedReadGroup: CollectionGroup,
fieldContext: FieldContext,
formatter: ParquetCollectionFormatter)
Thanks @xton, addressed the feedback. |
private def projectFileType(fileType: Type, projectedReadType: Type, fieldContext: FieldContext): Type = { | ||
(extractCollectionGroup(projectedReadType), extractCollectionGroup(fileType)) match { | ||
case _ if projectedReadType.isPrimitive && fileType.isPrimitive => | ||
projectedReadType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check that both types are the same primitive type? Or is that handled elsewhere? I imagine that certain types of primitive-type-changing projections might be supported (e.g. widening an int to a long) but others might not be (e.g. converting a string to an int).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That type compatibility is handled in a separate place here, so by the time this is called we should already have type compatibility.
Lines 133 to 136 in 73ce124
public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) { | |
assertGroupsAreCompatible(fileMessageType, projectedMessageType); | |
return projectedMessageType; | |
} |
def formatCompatibleRepeatedType(fileRepeatedType: Type, | ||
readRepeatedType: Type, | ||
fieldContext: FieldContext, | ||
recursiveSolver: (Type, Type, FieldContext) => Type): Type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice, I think this is always invoked with the projectFileType
method defined above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup it is. My thought is to make this recursive call explicit from the caller side so it's clear from the reading within this format compatbility layer instead of jumping into the respective formatter which then cyclically refers back to the caller's method.
* Add more comments; add explicit else conditions to some if cases; lift base case out of pattern match * Formatting changes * More minor formatting.
hi @isnotinvain, would like to follow-up on this if you have more feedback here please. |
Hi @isnotinvain, another bump on this please. Updates from our side: we actually have implemented this in our internally (via subclassing of
Without this patch, scalding job reading Spark job output can either fail hard or falsely read empty collection. |
_tuple
scalding/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ScroogeReadSupport.java
Lines 98 to 101 in 59d9327
array
, projection fails because it has name_tuple
which is incompatible. For example, given file schemas to representList<String> country_codes
will fail with the generated projection
This PR introduces
ParquetListFormatForwardCompatibility
to convert thrift-generated schema to compliant ones. The difficulties here are:The strategy here is to implement different rules for each format. The rule tell us how to decompose the list schema, and to reconstruct it again.