Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
e04aaf4
feat(datetime): add spark.comet.exec.datetime.engine config [skip ci]
andygrove May 14, 2026
dbebb6e
docs(datetime): polish COMET_DATETIME_ENGINE doc wording [skip ci]
andygrove May 14, 2026
0e084c0
feat(datetime): add DateTimeFieldUDF abstract base for JVM UDFs [skip…
andygrove May 14, 2026
24e1de7
feat(datetime): add HourUDF for JVM-side hour extraction [skip ci]
andygrove May 14, 2026
f74e5b4
refactor(datetime): move DateTimeFieldUDFSuite to spark module [skip ci]
andygrove May 14, 2026
2f092c8
refactor(datetime): revert scalatest setup in common/pom.xml [skip ci]
andygrove May 14, 2026
27cd822
feat(datetime): add MinuteUDF for JVM-side minute extraction [skip ci]
andygrove May 14, 2026
703e9c4
feat(datetime): add SecondUDF for JVM-side second extraction [skip ci]
andygrove May 14, 2026
77fe65b
feat(datetime): route Hour through JVM UDF when engine=java [skip ci]
andygrove May 14, 2026
f0117db
feat(datetime): route Minute through JVM UDF when engine=java [skip ci]
andygrove May 14, 2026
92b3428
feat(datetime): route Second through JVM UDF when engine=java [skip ci]
andygrove May 14, 2026
987c7e0
test(datetime): cover DST fall-back boundary in JVM UDF path [skip ci]
andygrove May 14, 2026
c081811
test(datetime): assert default engine remains rust [skip ci]
andygrove May 14, 2026
d70e9b5
docs(datetime): document spark.comet.exec.datetime.engine [skip ci]
andygrove May 14, 2026
8e77243
fix(datetime): reject non-timestamp inputs explicitly when engine=java
andygrove May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,23 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val DATETIME_ENGINE_RUST = "rust"
val DATETIME_ENGINE_JAVA = "java"

val COMET_DATETIME_ENGINE: ConfigEntry[String] =
conf("spark.comet.exec.datetime.engine")
.category(CATEGORY_EXEC)
.doc(
"Selects the engine used to evaluate supported date/time expressions. " +
s"`$DATETIME_ENGINE_RUST` uses the native DataFusion datetime engine. " +
s"`$DATETIME_ENGINE_JAVA` is experimental and routes through a JVM-side " +
"UDF for bit-exact Spark semantics, at the cost of JNI roundtrips per " +
"batch. Expressions routed when set to java: hour, minute, and second.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set(DATETIME_ENGINE_RUST, DATETIME_ENGINE_JAVA))
.createWithDefault(DATETIME_ENGINE_RUST)

val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
conf("spark.comet.metrics.updateInterval")
.category(CATEGORY_EXEC)
Expand Down
96 changes: 96 additions & 0 deletions common/src/main/scala/org/apache/comet/udf/DateTimeFieldUDF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.udf

import java.nio.charset.StandardCharsets
import java.time.{Instant, LocalDateTime, ZoneId, ZoneOffset}

import org.apache.arrow.vector.{IntVector, TimeStampMicroTZVector, TimeStampMicroVector, ValueVector, VarCharVector}

import org.apache.comet.CometArrowAllocator

/**
* Shared logic for date/time field-extraction UDFs (hour/minute/second).
*
* Inputs:
* - inputs(0): TimeStampMicroTZVector (for TimestampType) or TimeStampMicroVector (for
* TimestampNTZType) holding the timestamp column.
* - inputs(1): VarCharVector, length-1, holding the session timezone id. Used only for
* TimestampType. NTZ ignores it (wall-clock semantics).
*
* Output: IntVector of length `numRows` holding the extracted field. Null timestamps produce null
* output.
*/
abstract class DateTimeFieldUDF extends CometUDF {

protected def extract(dt: LocalDateTime): Int

override def evaluate(inputs: Array[ValueVector], numRows: Int): ValueVector = {
require(
inputs.length == 2,
s"${getClass.getSimpleName} expects 2 inputs, got ${inputs.length}")
val tsCol = inputs(0)
val tzVec = inputs(1).asInstanceOf[VarCharVector]
require(
tzVec.getValueCount >= 1 && !tzVec.isNull(0),
s"${getClass.getSimpleName} requires a non-null scalar timezone")
val zone = ZoneId.of(new String(tzVec.get(0), StandardCharsets.UTF_8))

val out = new IntVector(s"${getClass.getSimpleName}_result", CometArrowAllocator)
out.allocateNew(numRows)

tsCol match {
case tz: TimeStampMicroTZVector =>
var i = 0
while (i < numRows) {
if (tz.isNull(i)) {
out.setNull(i)
} else {
val micros = tz.get(i)
val instant = Instant.ofEpochSecond(
Math.floorDiv(micros, 1000000L),
Math.floorMod(micros, 1000000L) * 1000L)
out.set(i, extract(LocalDateTime.ofInstant(instant, zone)))
}
i += 1
}
case ntz: TimeStampMicroVector =>
var i = 0
while (i < numRows) {
if (ntz.isNull(i)) {
out.setNull(i)
} else {
val micros = ntz.get(i)
val seconds = Math.floorDiv(micros, 1000000L)
val nanos = (Math.floorMod(micros, 1000000L) * 1000L).toInt
val dt = LocalDateTime.ofEpochSecond(seconds, nanos, ZoneOffset.UTC)
out.set(i, extract(dt))
}
i += 1
}
case other =>
throw new IllegalArgumentException(
s"${getClass.getSimpleName}: unsupported timestamp vector type: " +
other.getClass.getName)
}
out.setValueCount(numRows)
out
}
}
26 changes: 26 additions & 0 deletions common/src/main/scala/org/apache/comet/udf/HourUDF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.udf

import java.time.LocalDateTime

class HourUDF extends DateTimeFieldUDF {
override protected def extract(dt: LocalDateTime): Int = dt.getHour
}
26 changes: 26 additions & 0 deletions common/src/main/scala/org/apache/comet/udf/MinuteUDF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.udf

import java.time.LocalDateTime

class MinuteUDF extends DateTimeFieldUDF {
override protected def extract(dt: LocalDateTime): Int = dt.getMinute
}
26 changes: 26 additions & 0 deletions common/src/main/scala/org/apache/comet/udf/SecondUDF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.udf

import java.time.LocalDateTime

class SecondUDF extends DateTimeFieldUDF {
override protected def extract(dt: LocalDateTime): Int = dt.getSecond
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ under the License.
timezone is UTC. TimestampNTZ inputs are handled correctly (timezone-independent truncation).
[#2649](https://github.com/apache/datafusion-comet/issues/2649)

## Engine Selection (experimental)

Comet supports two engines for evaluating date/time field-extraction expressions
(`hour`, `minute`, `second`). The choice is governed by
`spark.comet.exec.datetime.engine`:

- `rust` (default): native DataFusion implementations. Fastest, but applies timezone
conversion to `TimestampNTZ` inputs, which differs from Spark's semantics
([#3180](https://github.com/apache/datafusion-comet/issues/3180)). Comet falls back
to Spark for incompatible cases by default.
- `java` (experimental): routes the affected expressions through a JVM-side UDF that
uses `java.time` directly, producing bit-exact Spark results for all input types.
Incurs a JNI round-trip per native batch.

This is a prototype scoped to `hour`, `minute`, and `second`. The model is expected
to extend to other date/time expressions in follow-up work
([#4311](https://github.com/apache/datafusion-comet/issues/4311)).

## Date and Time Functions

Comet's native implementation of date and time functions may produce different results than Spark for dates
Expand Down
Loading