Skip to content

Commit

Permalink
stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Dec 27, 2024
1 parent eb4e4df commit ae82712
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RangeKeyScanStateEncoderSpec, StateStoreColFamilySchema}
import org.apache.spark.sql.types._

object StateStoreColumnFamilySchemaUtils {
Expand Down Expand Up @@ -112,4 +112,15 @@ object StateStoreColumnFamilySchemaUtils {
valSchema,
Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)))
}

def getSecIndexTimerStateSchema(
stateName: String,
keySchema: StructType,
valSchema: StructType): StateStoreColFamilySchema = {
StateStoreColFamilySchema(
stateName,
keySchema,
valSchema,
Some(RangeKeyScanStateEncoderSpec(keySchema, Seq(0))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi
val colFamilySchema = StateStoreColumnFamilySchemaUtils.
getTimerStateSchema(stateName, timerEncoder.schemaForKeyRow, timerEncoder.schemaForValueRow)
columnFamilySchemas.put(stateName, colFamilySchema)
val tsToKeyCFName = TimerStateUtils.getTimerStateSecIndexName(timeMode.toString)
val secondaryColFamilySchema = StateStoreColumnFamilySchemaUtils.
getSecIndexTimerStateSchema(
tsToKeyCFName, timerEncoder.keySchemaForSecIndex, timerEncoder.schemaForValueRow)
columnFamilySchemas.put(stateName, colFamilySchema)
columnFamilySchemas.put(tsToKeyCFName, secondaryColFamilySchema)
val stateVariableInfo = TransformWithStateVariableUtils.getTimerState(stateName)
stateVariableInfos.put(stateName, stateVariableInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ object TimerStateUtils {
TimerStateUtils.PROC_TIMERS_STATE_NAME + TimerStateUtils.KEY_TO_TIMESTAMP_CF
}
}

def getTimerStateSecIndexName(timeMode: String): String = {
assert(timeMode == TimeMode.EventTime.toString || timeMode == TimeMode.ProcessingTime.toString)
if (timeMode == TimeMode.EventTime.toString) {
TimerStateUtils.EVENT_TIMERS_STATE_NAME + TimerStateUtils.TIMESTAMP_TO_KEY_CF
} else {
TimerStateUtils.PROC_TIMERS_STATE_NAME + TimerStateUtils.TIMESTAMP_TO_KEY_CF
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest
CheckNewAnswer(("a", "1"), ("c", "1"))
)

logError(s"### starting query 2")
val result2 = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
Expand Down

0 comments on commit ae82712

Please sign in to comment.