Skip to content

Commit

Permalink
all sorts of stuff fails
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Dec 27, 2024
1 parent ae82712 commit 5c7d940
Show file tree
Hide file tree
Showing 20 changed files with 339 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class StateMetadataPartitionReader(
operatorStateMetadata.version,
v2.operatorPropertiesJson,
-1, // numColsPrefixKey is not available in OperatorStateMetadataV2
Some(stateStoreMetadata.stateSchemaFilePaths(stateStoreMetadata.stateSchemaId))
Some(stateStoreMetadata.stateSchemaFilePaths.last)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ case class TransformWithStateInPandasExec(
initialStateGroupingAttrs.map(SortOrder(_, Ascending)))

override def operatorStateMetadata(
stateSchemaPaths: List[String]): OperatorStateMetadata = {
stateSchemaPaths: List[List[String]]
): OperatorStateMetadata = {
getOperatorStateMetadata(stateSchemaPaths, getStateInfo, shortName, timeMode, outputMode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ trait FlatMapGroupsWithStateExecBase
hadoopConf: Configuration,
batchId: Long,
stateSchemaVersion: Int): List[StateSchemaValidationResult] = {
val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
groupingAttributes.toStructType, stateManager.stateSchema))
val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0,
groupingAttributes.toStructType, 0, stateManager.stateSchema))
List(StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, hadoopConf,
newStateSchema, session.sessionState, stateSchemaVersion))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ class IncrementalExecution(
} else {
None
}
val stateSchemaMapping = ssw.stateSchemaMapping(schemaValidationResult,
val stateSchemaList = ssw.stateSchemaList(schemaValidationResult,
oldMetadata)
val metadata = ssw.operatorStateMetadata(stateSchemaMapping)
val metadata = ssw.operatorStateMetadata(stateSchemaList)
oldMetadata match {
case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata)
case None =>
Expand All @@ -260,7 +260,7 @@ class IncrementalExecution(
Some(currentBatchId))
metadataWriter.write(metadata)
if (ssw.supportsSchemaEvolution) {
val stateSchemaMetadata = createStateSchemaMetadata(stateSchemaMapping.head)
val stateSchemaMetadata = createStateSchemaMetadata(stateSchemaList.head)
stateSchemaMetadatas.put(ssw.getStateInfo.operatorId, stateSchemaMetadata)
// Create new instance with copied fields but updated stateInfo
ssw match {
Expand All @@ -279,22 +279,54 @@ class IncrementalExecution(
}

private def createStateSchemaMetadata(
stateSchemaMapping: Map[Short, String]
stateSchemaFiles: List[String]
): StateSchemaBroadcast = {
val fm = CheckpointFileManager.create(new Path(checkpointLocation), hadoopConf)
val stateSchemas = stateSchemaMapping.flatMap { case (stateSchemaId, stateSchemaPath) =>
val inStream = fm.open(new Path(stateSchemaPath))
StateSchemaCompatibilityChecker.readSchemaFile(inStream).map { schema =>
StateSchemaMetadataKey(
schema.colFamilyName, stateSchemaId) ->
StateSchemaMetadataValue(
schema.valueSchema, SchemaConverters.toAvroType(schema.valueSchema))
}.toMap
}
StateSchemaBroadcast(
sparkSession.sparkContext.broadcast(
StateSchemaMetadata(stateSchemas.keys.map(_.schemaId).max, stateSchemas)
))

// Build up our map of schema metadata
val activeSchemas = stateSchemaFiles.zipWithIndex.foldLeft(
Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue]) {
case (schemas, (stateSchemaFile, schemaIndex)) =>
val fsDataInputStream = fm.open(new Path(stateSchemaFile))
val colFamilySchemas = StateSchemaCompatibilityChecker.readSchemaFile(fsDataInputStream)

// For each column family, create metadata entries for both key and value schemas
val schemaEntries = colFamilySchemas.flatMap { colFamilySchema =>
// Create key schema metadata
val keyAvroSchema = SchemaConverters.toAvroType(colFamilySchema.keySchema)
val keyEntry = StateSchemaMetadataKey(
colFamilySchema.colFamilyName,
colFamilySchema.keySchemaId,
isKey = true
) -> StateSchemaMetadataValue(
colFamilySchema.keySchema,
keyAvroSchema
)

// Create value schema metadata
val valueAvroSchema = SchemaConverters.toAvroType(colFamilySchema.valueSchema)
val valueEntry = StateSchemaMetadataKey(
colFamilySchema.colFamilyName,
colFamilySchema.valueSchemaId,
isKey = false
) -> StateSchemaMetadataValue(
colFamilySchema.valueSchema,
valueAvroSchema
)

Seq(keyEntry, valueEntry)
}

// Add new entries to our accumulated map
schemas ++ schemaEntries.toMap
}

// Create the final metadata and wrap it in a broadcast
val metadata = StateSchemaMetadata(
activeSchemas = activeSchemas
)

StateSchemaBroadcast(sparkSession.sparkContext.broadcast(metadata))
}

object StateOpIdRule extends SparkPlanPartialRule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ object StateStoreColumnFamilySchemaUtils {
valEncoder: Encoder[T],
hasTtl: Boolean): StateStoreColFamilySchema = {
StateStoreColFamilySchema(
stateName,
keyEncoder.schema,
stateName, 0,
keyEncoder.schema, 0,
getValueSchemaWithTTL(valEncoder.schema, hasTtl),
Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
}
Expand All @@ -81,8 +81,8 @@ object StateStoreColumnFamilySchemaUtils {
valEncoder: Encoder[T],
hasTtl: Boolean): StateStoreColFamilySchema = {
StateStoreColFamilySchema(
stateName,
keyEncoder.schema,
stateName, 0,
keyEncoder.schema, 0,
getValueSchemaWithTTL(valEncoder.schema, hasTtl),
Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
}
Expand All @@ -95,8 +95,8 @@ object StateStoreColumnFamilySchemaUtils {
hasTtl: Boolean): StateStoreColFamilySchema = {
val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema)
StateStoreColFamilySchema(
stateName,
compositeKeySchema,
stateName, 0,
compositeKeySchema, 0,
getValueSchemaWithTTL(valEncoder.schema, hasTtl),
Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
Some(userKeyEnc.schema))
Expand All @@ -107,8 +107,8 @@ object StateStoreColumnFamilySchemaUtils {
keySchema: StructType,
valSchema: StructType): StateStoreColFamilySchema = {
StateStoreColFamilySchema(
stateName,
keySchema,
stateName, 0,
keySchema, 0,
valSchema,
Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)))
}
Expand All @@ -118,8 +118,8 @@ object StateStoreColumnFamilySchemaUtils {
keySchema: StructType,
valSchema: StructType): StateStoreColFamilySchema = {
StateStoreColFamilySchema(
stateName,
keySchema,
stateName, 0,
keySchema, 0,
valSchema,
Some(RangeKeyScanStateEncoderSpec(keySchema, Seq(0))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ case class StreamingSymmetricHashJoinExec(
SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)

override def operatorStateMetadata(
stateSchemaPaths: List[Map[Short, String]] = List.empty): OperatorStateMetadata = {
stateSchemaPaths: List[List[String]] = List.empty
): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo =
Expand Down Expand Up @@ -263,8 +264,8 @@ case class StreamingSymmetricHashJoinExec(

// validate and maybe evolve schema for all state stores across both sides of the join
result.map { case (stateStoreName, (keySchema, valueSchema)) =>
val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
keySchema, valueSchema))
val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0,
keySchema, 0, valueSchema))
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, hadoopConf,
newStateSchema, session.sessionState, stateSchemaVersion, storeName = stateStoreName)
}.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,13 @@ case class TransformWithStateExec(
stateSchemaVersion: Int): List[StateSchemaValidationResult] = {
val info = getStateInfo
validateAndWriteStateSchema(hadoopConf, batchId, stateSchemaVersion,
info, session, operatorStateMetadataVersion)
info, session, operatorStateMetadataVersion, conf.stateStoreEncodingFormat)
}

/** Metadata of this stateful operator and its states stores. */
override def operatorStateMetadata(
stateSchemaPaths: List[String]): OperatorStateMetadata = {
stateSchemaPaths: List[List[String]]
): OperatorStateMetadata = {
val info = getStateInfo
getOperatorStateMetadata(stateSchemaPaths, info, shortName, timeMode, outputMode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ trait TransformWithStateMetadataUtils extends Logging {
def getStateVariableInfos(): Map[String, TransformWithStateVariableInfo]

def getOperatorStateMetadata(
stateSchemaPaths: List[String],
stateSchemaPaths: List[List[String]],
info: StatefulOperatorStateInfo,
shortName: String,
timeMode: TimeMode,
Expand All @@ -201,7 +201,8 @@ trait TransformWithStateMetadataUtils extends Logging {
stateSchemaVersion: Int,
info: StatefulOperatorStateInfo,
session: SparkSession,
operatorStateMetadataVersion: Int = 2): List[StateSchemaValidationResult] = {
operatorStateMetadataVersion: Int = 2,
stateStoreEncodingFormat: String = "unsaferow"): List[StateSchemaValidationResult] = {
assert(stateSchemaVersion >= 3)
val newSchemas = getColFamilySchemas()
val stateSchemaDir = stateSchemaDirPath(info)
Expand All @@ -223,7 +224,7 @@ trait TransformWithStateMetadataUtils extends Logging {
case Some(metadata) =>
metadata match {
case v2: OperatorStateMetadataV2 =>
Some(new Path(v2.stateStoreInfo.head.stateSchemaFilePath))
Some(new Path(v2.stateStoreInfo.head.stateSchemaFilePaths.last))
case _ => None
}
case None => None
Expand All @@ -234,7 +235,8 @@ trait TransformWithStateMetadataUtils extends Logging {
newSchemas.values.toList, session.sessionState, stateSchemaVersion,
storeName = StateStoreId.DEFAULT_STORE_NAME,
oldSchemaFilePath = oldStateSchemaFilePath,
newSchemaFilePath = Some(newStateSchemaFilePath)))
newSchemaFilePath = Some(newStateSchemaFilePath),
schemaEvolutionEnabled = stateStoreEncodingFormat == "avro"))
}

def validateNewMetadataForTWS(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ case class StateStoreMetadataV2(
storeName: String,
numColsPrefixKey: Int,
numPartitions: Int,
stateSchemaId: Short,
stateSchemaFilePaths: Map[Short, String])
stateSchemaFilePaths: List[String])
extends StateStoreMetadata with Serializable

object StateStoreMetadataV2 {
Expand Down Expand Up @@ -470,7 +469,7 @@ class OperatorStateMetadataV2FileManager(
val earliestBatchToKeep = latestMetadata match {
case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) =>
val ssInfo = stateStoreInfo.head
val schemaFilePath = ssInfo.stateSchemaFilePaths.minBy(_._1)._2
val schemaFilePath = ssInfo.stateSchemaFilePaths.head
new Path(schemaFilePath).getName.split("_").head.toLong
case _ => 0
}
Expand Down
Loading

0 comments on commit 5c7d940

Please sign in to comment.