Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import java.io.Serializable;
import java.util.Objects;

import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode;
import org.apache.spark.sql.catalyst.expressions.aggregate.Complete$;
import org.apache.spark.sql.catalyst.expressions.aggregate.Final$;
import org.apache.spark.sql.catalyst.expressions.aggregate.Partial$;
import org.apache.spark.sql.catalyst.expressions.aggregate.PartialMerge$;

import scala.collection.Seq;

/**
* Information on the aggregation modes being used.
*/
public class AggregateModeInfo implements Serializable {
private static final long serialVersionUID = 1L;

private final Seq<AggregateMode> uniqueModes;
private final boolean hasPartialMode;
private final boolean hasPartialMergeMode;
private final boolean hasFinalMode;
private final boolean hasCompleteMode;

public AggregateModeInfo(
Seq<AggregateMode> uniqueModes,
boolean hasPartialMode,
boolean hasPartialMergeMode,
boolean hasFinalMode,
boolean hasCompleteMode) {
this.uniqueModes = uniqueModes;
this.hasPartialMode = hasPartialMode;
this.hasPartialMergeMode = hasPartialMergeMode;
this.hasFinalMode = hasFinalMode;
this.hasCompleteMode = hasCompleteMode;
}

public static AggregateModeInfo from(Seq<AggregateMode> uniqueModes) {
return new AggregateModeInfo(
uniqueModes,
uniqueModes.contains(Partial$.MODULE$),
uniqueModes.contains(PartialMerge$.MODULE$),
uniqueModes.contains(Final$.MODULE$),
uniqueModes.contains(Complete$.MODULE$));
}

public Seq<AggregateMode> uniqueModes() {
return uniqueModes;
}

public boolean hasPartialMode() {
return hasPartialMode;
}

public boolean hasPartialMergeMode() {
return hasPartialMergeMode;
}

public boolean hasFinalMode() {
return hasFinalMode;
}

public boolean hasCompleteMode() {
return hasCompleteMode;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof AggregateModeInfo)) {
return false;
}
AggregateModeInfo that = (AggregateModeInfo) other;
return hasPartialMode == that.hasPartialMode
&& hasPartialMergeMode == that.hasPartialMergeMode
&& hasFinalMode == that.hasFinalMode
&& hasCompleteMode == that.hasCompleteMode
&& Objects.equals(uniqueModes, that.uniqueModes);
}

@Override
public int hashCode() {
return Objects.hash(
uniqueModes, hasPartialMode, hasPartialMergeMode, hasFinalMode, hasCompleteMode);
}

@Override
public String toString() {
return "AggregateModeInfo(" + uniqueModes + "," + hasPartialMode + ","
+ hasPartialMergeMode + "," + hasFinalMode + "," + hasCompleteMode + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.io.async;

import java.io.Serializable;
import java.util.Objects;

/**
* Scheduling and execution timings for an async task.
*/
public class AsyncMetrics implements Serializable {
private static final long serialVersionUID = 1L;

private final long scheduleTimeMs;
private final long executionTimeMs;

public AsyncMetrics(long scheduleTimeMs, long executionTimeMs) {
this.scheduleTimeMs = scheduleTimeMs;
this.executionTimeMs = executionTimeMs;
}

public long scheduleTimeMs() {
return scheduleTimeMs;
}

public long executionTimeMs() {
return executionTimeMs;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof AsyncMetrics)) {
return false;
}
AsyncMetrics that = (AsyncMetrics) other;
return scheduleTimeMs == that.scheduleTimeMs
&& executionTimeMs == that.executionTimeMs;
}

@Override
public int hashCode() {
return Objects.hash(scheduleTimeMs, executionTimeMs);
}

@Override
public String toString() {
return "AsyncMetrics(" + scheduleTimeMs + "," + executionTimeMs + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.io.async;

import java.io.Serializable;
import java.util.Objects;

/**
* Mutable throttling counters updated by ThrottlingExecutor.
*/
public class ThrottlingExecutorStats implements Serializable {
private static final long serialVersionUID = 1L;

public int numTasksScheduled;
public long accumulatedThrottleTimeNs;
public long minThrottleTimeNs;
public long maxThrottleTimeNs;

public ThrottlingExecutorStats(
int numTasksScheduled,
long accumulatedThrottleTimeNs,
long minThrottleTimeNs,
long maxThrottleTimeNs) {
this.numTasksScheduled = numTasksScheduled;
this.accumulatedThrottleTimeNs = accumulatedThrottleTimeNs;
this.minThrottleTimeNs = minThrottleTimeNs;
this.maxThrottleTimeNs = maxThrottleTimeNs;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof ThrottlingExecutorStats)) {
return false;
}
ThrottlingExecutorStats that = (ThrottlingExecutorStats) other;
return numTasksScheduled == that.numTasksScheduled
&& accumulatedThrottleTimeNs == that.accumulatedThrottleTimeNs
&& minThrottleTimeNs == that.minThrottleTimeNs
&& maxThrottleTimeNs == that.maxThrottleTimeNs;
}

@Override
public int hashCode() {
return Objects.hash(
numTasksScheduled, accumulatedThrottleTimeNs, minThrottleTimeNs, maxThrottleTimeNs);
}

@Override
public String toString() {
return "ThrottlingExecutorStats(" + numTasksScheduled + ","
+ accumulatedThrottleTimeNs + "," + minThrottleTimeNs + ","
+ maxThrottleTimeNs + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2020-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shuffle;

import java.util.Objects;

/** Byte range for a block. */
public final class BlockRange<T extends BlockWithSize> {
private final T block;
private final long rangeStart;
private final long rangeEnd;

public BlockRange(T block, long rangeStart, long rangeEnd) {
if (rangeStart >= rangeEnd) {
throw new IllegalArgumentException(
"requirement failed: Instantiated a BlockRange with invalid boundaries: " +
rangeStart + " to " + rangeEnd);
}
this.block = block;
this.rangeStart = rangeStart;
this.rangeEnd = rangeEnd;
}

public T block() {
return block;
}

public long rangeStart() {
return rangeStart;
}

public long rangeEnd() {
return rangeEnd;
}

public long rangeSize() {
return rangeEnd - rangeStart;
}

public boolean isComplete() {
return rangeEnd == block.size();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof BlockRange)) {
return false;
}
BlockRange<?> other = (BlockRange<?>) obj;
return rangeStart == other.rangeStart &&
rangeEnd == other.rangeEnd &&
Objects.equals(block, other.block);
}

@Override
public int hashCode() {
return Objects.hash(block, rangeStart, rangeEnd);
}

@Override
public String toString() {
return "BlockRange(" + block + "," + rangeStart + "," + rangeEnd + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2020-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shuffle;

/** Block-like value that can report its size in bytes. */
public interface BlockWithSize {
long size();
}
Loading