-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: rand expression support #1199
base: main
Are you sure you want to change the base?
Conversation
2c1c0c4
to
7e4ca2c
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1199 +/- ##
============================================
- Coverage 34.78% 34.11% -0.67%
+ Complexity 957 925 -32
============================================
Files 115 115
Lines 43569 43586 +17
Branches 9528 9556 +28
============================================
- Hits 15155 14870 -285
- Misses 25449 25763 +314
+ Partials 2965 2953 -12 ☔ View full report in Codecov by Sentry. |
Thanks @akupchinskiy. I plan on reviewing this after the holidays. |
Are the partition related changes necessary for this PR? Otherwise, it might be better to reduce the scope to just the |
native/spark-expr/src/rand.rs
Outdated
const DOUBLE_UNIT: f64 = 1.1102230246251565e-16; | ||
const SPARK_MURMUR_ARRAY_SEED: u32 = 0x3c074a61; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would really helpful if you could add documentation / refrences around these constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added doc comments with all the references.
native/spark-expr/src/rand.rs
Outdated
match self.seed.evaluate(batch)? { | ||
ColumnarValue::Scalar(seed) => self.evaluate_batch(seed, batch.num_rows()), | ||
ColumnarValue::Array(_arr) => Err(DataFusionError::NotImplemented(format!( | ||
"Only literal seeds are not supported for {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message seems to have a typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx, fixed
@@ -317,7 +317,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( | |||
// query plan, we need to defer stream initialization to first time execution. | |||
if exec_context.root_op.is_none() { | |||
let start = Instant::now(); | |||
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx)) | |||
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is interesting. Is there any reason the partition is not used in Comet native physical planner? this is def used in DF physical plan during plan node execution https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/execution_plan.rs#L371
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spark partition index is erased when a native DF plan is sent for the execution for some reason : https://github.com/apache/datafusion-comet/blob/main/native/core/src/execution/jni_api.rs#L496
There is a handful of expressions besides rand() relying on the partition index. All of them implement nondetermenistic trait providing a hook method to initialize a state before a partition evaluation for spark runtime. Encapsulation-wise, I agree that the scope of the partition exposure should be limited. But I could not find another way to extract it other than making it a part of a planner struct. |
native/spark-expr/src/rand.rs
Outdated
/// Adoption of the XOR-shift algorithm used in Apache Spark. | ||
/// See: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala | ||
|
||
/// Normalization multiplier used in mapping from a random i64 value to the f64 interval [0.0, 1.0). | ||
/// Corresponds to the java implementation: https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/Random.java#L302) | ||
/// Due to the lack of hexadecimal float literals support in rust, the scientific notation is used instead. | ||
const DOUBLE_UNIT: f64 = 1.1102230246251565e-16; | ||
|
||
/// Spark-compatible initial seed which is actually a part of the scala standard library murmurhash3 implementation. | ||
/// The references: | ||
/// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala#L63 | ||
/// https://github.com/scala/scala/blob/2.13.x/src/library/scala/util/hashing/MurmurHash3.scala#L331 | ||
const SPARK_MURMUR_ARRAY_SEED: u32 = 0x3c074a61; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please replace the links with peramlink as when the file will move the link will not be valid anymore + if the logic changed we can know when
Which issue does this PR close?
Closes #1198
Rationale for this change
Support of the spark rand() expression
What changes are included in this PR?
How are these changes tested?
Spark compatibility tests and expression correctness test are included in the PR