Skip to content

Commit

Permalink
stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Dec 23, 2024
1 parent 61e99cb commit 03c6297
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RangeKeyScanStateEncoderSpec, StateStoreColFamilySchema}
import org.apache.spark.sql.types._

object StateStoreColumnFamilySchemaUtils {
Expand Down Expand Up @@ -112,4 +112,15 @@ object StateStoreColumnFamilySchemaUtils {
valSchema,
Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)))
}

def getSecIndexTimerStateSchema(
stateName: String,
keySchema: StructType,
valSchema: StructType): StateStoreColFamilySchema = {
StateStoreColFamilySchema(
stateName,
keySchema,
valSchema,
Some(RangeKeyScanStateEncoderSpec(keySchema, Seq(0))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi
val colFamilySchema = StateStoreColumnFamilySchemaUtils.
getTimerStateSchema(stateName, timerEncoder.schemaForKeyRow, timerEncoder.schemaForValueRow)
columnFamilySchemas.put(stateName, colFamilySchema)
val tsToKeyCFName = TimerStateUtils.getTimerStateSecIndexName(timeMode.toString)
val secondaryColFamilySchema = StateStoreColumnFamilySchemaUtils.
getSecIndexTimerStateSchema(
tsToKeyCFName, timerEncoder.keySchemaForSecIndex, timerEncoder.schemaForValueRow)
columnFamilySchemas.put(stateName, colFamilySchema)
columnFamilySchemas.put(tsToKeyCFName, secondaryColFamilySchema)
val stateVariableInfo = TransformWithStateVariableUtils.getTimerState(stateName)
stateVariableInfos.put(stateName, stateVariableInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ object TimerStateUtils {
TimerStateUtils.PROC_TIMERS_STATE_NAME + TimerStateUtils.KEY_TO_TIMESTAMP_CF
}
}

def getTimerStateSecIndexName(timeMode: String): String = {
assert(timeMode == TimeMode.EventTime.toString || timeMode == TimeMode.ProcessingTime.toString)
if (timeMode == TimeMode.EventTime.toString) {
TimerStateUtils.EVENT_TIMERS_STATE_NAME + TimerStateUtils.TIMESTAMP_TO_KEY_CF
} else {
TimerStateUtils.PROC_TIMERS_STATE_NAME + TimerStateUtils.TIMESTAMP_TO_KEY_CF
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,6 @@ class SingleValueStateEncoder(
columnFamilyInfo = None,
supportSchemaEvolution = dataEncoder.supportsSchemaEvolution
) with RocksDBValueStateEncoder with Logging {
assert(currentSchemaId.isDefined)

override def getCurrentSchemaId: Option[Short] = currentSchemaId

Expand All @@ -1783,8 +1782,7 @@ class SingleValueStateEncoder(
}
// First decode the metadata prefixes
val prefix = decodeStateRowPrefix(valueBytes)
logError(s"### prefix: $prefix")
logError(s"### currentSchemaId: ${currentSchemaId}")

val data = decodeStateRowData(valueBytes)
// Decode the actual value using either Avro or UnsafeRow
dataEncoder.decodeValue(data, prefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest
CheckNewAnswer(("a", "1"), ("c", "1"))
)

logError(s"### starting query 2")
val result2 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
Expand Down

0 comments on commit 03c6297

Please sign in to comment.