Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 2 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<!-- Maven to register attached artifact , which we later replace -->
<rapids.shim.jar.phase>initialize</rapids.shim.jar.phase>
<rapids.source.jar.phase>none</rapids.source.jar.phase>
<rapids.aggregator.downstream.refresh.skip>false</rapids.aggregator.downstream.refresh.skip>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -196,6 +197,7 @@
<goals><goal>run</goal></goals>
<phase>process-classes</phase>
<configuration>
<skip>${rapids.aggregator.downstream.refresh.skip}</skip>
<target>
<taskdef resource="net/sf/antcontrib/antcontrib.properties"/>
<property name="realAggJar"
Expand Down
241 changes: 220 additions & 21 deletions build/buildall
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@ shopt -s extglob
SKIP_CLEAN=1
BUILD_ALL_DEBUG=0
SCALA213=0
UNSHIM_FAST=0
UNSHIM_PARALLEL_WORLD_ONLY=0
UNSHIM_REUSE_BUILT_JARS=0
UNSHIM_ALLOWLIST_ONLY=0

SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)
SOURCE_DIR=$(cd "$SCRIPT_DIR/.." >/dev/null 2>&1 && pwd)

function first_pom_value() {
local key="$1"
local pom="$2"
sed -n "0,/<$key>/{s|.*<$key>\([^<]*\)</$key>.*|\1|p}" "$pom" | head -n 1
}

function last_pom_value() {
local key="$1"
local pom="$2"
sed -n "s|.*<$key>\([^<]*\)</$key>.*|\1|p" "$pom" | tail -n 1
}

function join_by { local IFS="$1"; shift; echo "$*"; }

Expand Down Expand Up @@ -56,6 +75,14 @@ function print_usage() {
echo " repackage the dist module artifact using installed dependencies"
echo " --scala213"
echo " build 2.13 shims"
echo " --unshim-fast"
echo " skip Maven checks/docs, tests, build metadata, coverage, enforcer, and snapshot refresh for repeated unshim/dist iteration"
echo " --parallel-world-only, --unshim-parallel-world-only"
echo " build analyzer-only parallel-world output without the final Maven dist invocation"
echo " --unshim-reuse-built-jars"
echo " with --unshim-fast --parallel-world-only, skip shim Maven builds and reuse existing target jars"
echo " --unshim-allowlist-only"
echo " imply --unshim-fast --parallel-world-only --unshim-reuse-built-jars and require only unshim allowlist changes"
}

function bloopInstall() {
Expand Down Expand Up @@ -148,6 +175,25 @@ case "$1" in
SCALA213=1
;;

--unshim-fast|--fast-unshim)
UNSHIM_FAST=1
;;

--parallel-world-only|--unshim-parallel-world-only)
UNSHIM_PARALLEL_WORLD_ONLY=1
;;

--unshim-reuse-built-jars)
UNSHIM_REUSE_BUILT_JARS=1
;;

--unshim-allowlist-only)
UNSHIM_ALLOWLIST_ONLY=1
UNSHIM_FAST=1
UNSHIM_PARALLEL_WORLD_ONLY=1
UNSHIM_REUSE_BUILT_JARS=1
;;

--rebuild-dist-only)
SKIP_DIST_DEPS="1"
MODULE="dist"
Expand All @@ -174,14 +220,62 @@ if [[ "$DIST_PROFILE" == *Scala213 ]]; then
SCALA213=1
fi

if [[ "$UNSHIM_PARALLEL_WORLD_ONLY" == "1" ]]; then
FINAL_OP="generate-resources"
MODULE="${MODULE:-dist}"
fi

MVN=${MVN:-"mvn"}
# include options to mvn command
export MVN="$MVN -Dmaven.wagon.http.retryHandler.count=3 ${MVN_OPT}"
if [[ "$UNSHIM_FAST" == "1" ]]; then
export MAVEN_REFRESH_OPT="--no-snapshot-updates"
export MVN_FAST_SKIP_OPTS="-Dmaven.test.skip=true -Drat.skip=true -Dmaven.scalastyle.skip=true -Dmaven.scaladoc.skip=true -Dmaven.javadoc.skip=true -Ddist.jar.compress=false -Djacoco.skip=true -Denforcer.skip=true -Drapids.build.info.skip=true -Dignore.shim.revisions.check=true"
else
export MAVEN_REFRESH_OPT="-U"
export MVN_FAST_SKIP_OPTS=""
fi
export UNSHIM_FAST
export UNSHIM_PARALLEL_WORLD_ONLY
export UNSHIM_ALLOWLIST_ONLY

if [[ "$UNSHIM_REUSE_BUILT_JARS" == "1" && \
( "$UNSHIM_FAST" != "1" || "$UNSHIM_PARALLEL_WORLD_ONLY" != "1" ) ]]; then
echo >&2 "--unshim-reuse-built-jars requires --unshim-fast --parallel-world-only"
exit 1
fi

if [[ "$UNSHIM_ALLOWLIST_ONLY" == "1" ]] && \
git -C "$SOURCE_DIR" rev-parse --is-inside-work-tree >/dev/null 2>&1; then
ALLOWLIST_ONLY_DIRTY=$(
{
git -C "$SOURCE_DIR" diff --name-only -- \
. \
':(exclude)dist/unshimmed-common-from-single-shim.txt' \
':(exclude)dist/unshimmed-from-each-spark3xx.txt' \
':(exclude)dist/keep-in-spark-shared.txt'
git -C "$SOURCE_DIR" diff --cached --name-only -- \
. \
':(exclude)dist/unshimmed-common-from-single-shim.txt' \
':(exclude)dist/unshimmed-from-each-spark3xx.txt' \
':(exclude)dist/keep-in-spark-shared.txt'
} | sort -u
)
if [[ -n "$ALLOWLIST_ONLY_DIRTY" ]]; then
echo >&2 "--unshim-allowlist-only can only reuse jars when tracked changes are limited to dist/unshimmed*.txt or dist/keep-in-spark-shared.txt"
echo >&2 "$ALLOWLIST_ONLY_DIRTY"
exit 1
fi
fi

if [[ "$SCALA213" == "1" ]]; then
POM_FILE="scala2.13/pom.xml"
export MVN="$MVN -f scala2.13/"
$(dirname $0)/make-scala-version-build-files.sh 2.13
if [[ "$UNSHIM_FAST" == "1" && -f "$POM_FILE" ]]; then
echo "Unshim fast: reusing existing Scala 2.13 POMs"
else
"$SCRIPT_DIR"/make-scala-version-build-files.sh 2.13
fi
else
POM_FILE="pom.xml"
fi
Expand Down Expand Up @@ -216,7 +310,26 @@ case $DIST_PROFILE in
esac

echo "Spark versions involved: ${SPARK_SHIM_VERSIONS[@]} ..."
export MVN_BASE_DIR=$($MVN help:evaluate -Dexpression=project.basedir -q -DforceStdout)
if [[ "$UNSHIM_FAST" == "1" ]]; then
if [[ "$SCALA213" == "1" ]]; then
export MVN_BASE_DIR="$SOURCE_DIR/scala2.13"
else
export MVN_BASE_DIR="$SOURCE_DIR"
fi
export RAPIDS_PROJECT_VERSION=$(first_pom_value version "$POM_FILE")
export RAPIDS_SCALA_BINARY_VERSION=$(last_pom_value scala.binary.version "$POM_FILE")
else
export MVN_BASE_DIR=$($MVN help:evaluate -Dexpression=project.basedir -q -DforceStdout)
fi

if [[ "$UNSHIM_PARALLEL_WORLD_ONLY" == "1" ]]; then
echo "Unshim parallel-world-only: preparing analyzer-only output and skipping JNI unpack, shimplify, and reduced POM generation"
MVN_FAST_SKIP_OPTS="$MVN_FAST_SKIP_OPTS -Drapids.jni.unpack.skip=true -Drapids.shimplify.skip=true -Drapids.parallel.world.skip.reduced.pom=true -Drapids.aggregator.downstream.refresh.skip=true"
elif [[ "$UNSHIM_FAST" == "1" && -d "$MVN_BASE_DIR/dist/target/jni-deps" ]]; then
echo "Unshim fast: reusing existing JNI deps from $MVN_BASE_DIR/dist/target/jni-deps"
MVN_FAST_SKIP_OPTS="$MVN_FAST_SKIP_OPTS -Drapids.jni.unpack.skip=true"
fi
export MVN_FAST_SKIP_OPTS

if [[ "$GEN_BLOOP" == "true" ]]; then
bloopInstall
Expand All @@ -237,9 +350,45 @@ fi

echo "Building a combined dist jar with Shims for ${SPARK_SHIM_VERSIONS[@]} ..."

function refresh_fast_aggregator_jar() {
[[ "$UNSHIM_FAST" == "1" ]] || return 0
local BUILD_VER=$1
local agg_dir="$MVN_BASE_DIR/aggregator/target/spark$BUILD_VER"
local agg_base="rapids-4-spark-aggregator_${RAPIDS_SCALA_BINARY_VERSION}-${RAPIDS_PROJECT_VERSION}"
local shaded_jar="$agg_dir/${agg_base}-shaded.jar"
local downstream_jar="$agg_dir/${agg_base}-spark$BUILD_VER.jar"
if [[ ! -f "$shaded_jar" ]]; then
echo >&2 "Expected shaded aggregator jar missing: $shaded_jar"
exit 255
fi
if [[ -f "$downstream_jar" ]] && cmp -s "$shaded_jar" "$downstream_jar"; then
return 0
fi
cp -p "$shaded_jar" "$downstream_jar"
}
export -f refresh_fast_aggregator_jar

function verify_reusable_unshim_artifacts() {
local BUILD_VER=$1
local classifier="spark$BUILD_VER"
local api_base="rapids-4-spark-sql-plugin-api_${RAPIDS_SCALA_BINARY_VERSION}-${RAPIDS_PROJECT_VERSION}"
local agg_base="rapids-4-spark-aggregator_${RAPIDS_SCALA_BINARY_VERSION}-${RAPIDS_PROJECT_VERSION}"
local api_jar="$MVN_BASE_DIR/sql-plugin-api/target/$classifier/${api_base}-$classifier.jar"
local agg_shaded_jar="$MVN_BASE_DIR/aggregator/target/$classifier/${agg_base}-shaded.jar"
local jar_path
for jar_path in "$api_jar" "$agg_shaded_jar"; do
if [[ ! -f "$jar_path" ]]; then
echo >&2 "Expected reusable unshim artifact missing: $jar_path"
echo >&2 "Re-run without --unshim-reuse-built-jars after source or dependency changes."
exit 255
fi
done
}
export -f verify_reusable_unshim_artifacts

function build_single_shim() {
[[ "$BUILD_ALL_DEBUG" == "1" ]] && set -x
BUILD_VER=$1
local BUILD_VER=$1
mkdir -p "$MVN_BASE_DIR/target"
if (( BUILD_PARALLEL == 1 || NUM_SHIMS == 1 )); then
# Single-shim/serial build: stream Maven output live rather than to a log
Expand All @@ -255,8 +404,8 @@ function build_single_shim() {
LOG_FILE="$MVN_BASE_DIR/target/mvn-build-$BUILD_VER.log"
fi

if [[ "$BUILD_VER" == "$BASE_VER" ]]; then
SKIP_CHECKS="false"
if [[ "$BUILD_VER" == "$BASE_VER" && \
( "$UNSHIM_FAST" != "1" || "$UNSHIM_PARALLEL_WORLD_ONLY" != "1" ) ]]; then
# WORKAROUND:
# maven build on L193 currently relies on aggregator dependency which
# will removed by
Expand All @@ -267,24 +416,36 @@ function build_single_shim() {
#
MVN_PHASE="install"
else
SKIP_CHECKS="true"
MVN_PHASE="package"
fi

if [[ "$UNSHIM_FAST" == "1" || "$BUILD_VER" != "$BASE_VER" ]]; then
SKIP_CHECKS="true"
else
SKIP_CHECKS="false"
fi

local BUILD_PROJECTS="tools"
if [[ "$UNSHIM_FAST" == "1" ]]; then
BUILD_PROJECTS="aggregator"
fi

echo "#### REDIRECTING mvn output to ${LOG_FILE:-stdout} ####"
(
if [[ "$LOG_FILE" == "" ]]; then
exec 2>&1 || exit $?
else
exec > "$LOG_FILE" 2>&1 || exit $?
fi
$MVN -U "$MVN_PHASE" \
$MVN $MAVEN_REFRESH_OPT "$MVN_PHASE" \
-DskipTests \
-Dbuildver="$BUILD_VER" \
-Drat.skip="$SKIP_CHECKS" \
-Dmaven.scaladoc.skip \
-Dmaven.scaladoc.skip=true \
-Dmaven.javadoc.skip=true \
-Dmaven.scalastyle.skip="$SKIP_CHECKS" \
-pl tools -am
$MVN_FAST_SKIP_OPTS \
-pl "$BUILD_PROJECTS" -am
) || {
# Only tail when output went to a real log file; for a live stream
# (/dev/tty or existing stdout) the failure output is already on screen.
Expand All @@ -294,6 +455,7 @@ function build_single_shim() {
esac
exit 255
}
refresh_fast_aggregator_jar "$BUILD_VER"
}
export -f build_single_shim

Expand All @@ -310,25 +472,62 @@ export -f build_single_shim
time (
# printf a single buildver array element per line
if [[ "$SKIP_DIST_DEPS" != "1" ]]; then
if [[ "$UNSHIM_REUSE_BUILT_JARS" == "1" ]]; then
echo "Unshim fast: reusing existing per-shim jars and skipping Maven shim builds"
for bv in "${SPARK_SHIM_VERSIONS[@]}"; do
verify_reusable_unshim_artifacts "$bv"
refresh_fast_aggregator_jar "$bv"
done
else
# Execute initialize to download a massive jar for spark-rapids-jni in a single thread to
# avoid repeating this work in parallel
# Initialize sql-plugin-api only to avoid dealing with missing submodule dependencies
#
$MVN initialize -pl sql-plugin-api -am
# avoid repeating this work in parallel. This is unnecessary in unshim-fast modes that skip
# JNI unpacking.
if [[ "$UNSHIM_FAST" == "1" && "$MVN_FAST_SKIP_OPTS" == *"-Drapids.jni.unpack.skip=true"* ]]; then
echo "Unshim fast: skipping serial Maven initialize preflight"
else
# Initialize sql-plugin-api only to avoid dealing with missing submodule dependencies.
$MVN initialize -pl sql-plugin-api -am
fi

printf "%s\n" "${SPARK_SHIM_VERSIONS[@]}" | \
xargs -t -I% -P "$BUILD_PARALLEL" -n 1 \
bash -c 'build_single_shim "$@"' _ %
fi
fi
# This used to resume from dist. However, without including aggregator in the build
# the build does not properly initialize spark.version property via buildver profiles
# in the root pom, and we get a missing spark330 dependency even for --profile=330,331
# where the build does not require it. Moving it to aggregator resolves this issue with
# a negligible increase of the build time by ~2 seconds.
if [[ "$UNSHIM_FAST" == "1" && "$UNSHIM_REUSE_BUILT_JARS" != "1" ]]; then
for bv in "${SPARK_SHIM_VERSIONS[@]}"; do
refresh_fast_aggregator_jar "$bv"
done
fi
# Non-fast builds resume from aggregator so Maven initializes the buildver-derived
# spark.version.classifier before dist resolves its aggregator dependency. The unshim-fast
# dist path can skip that extra aggregator pass because the per-shim builds above already
# installed the base aggregator jar and refreshed all target aggregator jars.
joinShimBuildFrom="aggregator"
INCLUDED_BUILDVERS_OPT=-Dincluded_buildvers=$(join_by , "${SPARK_SHIM_VERSIONS[@]}")
echo "Resuming from $joinShimBuildFrom build only using $BASE_VER"
$MVN $FINAL_OP -rf $joinShimBuildFrom $MODULE_OPT $MVN_PROFILE_OPT $INCLUDED_BUILDVERS_OPT \
if [[ "$UNSHIM_FAST" == "1" && "$MODULE" == "dist" ]]; then
if [[ "$UNSHIM_PARALLEL_WORLD_ONLY" == "1" ]]; then
echo "Unshim fast: assembling parallel-world directly without final Maven dist invocation"
python3 "$SOURCE_DIR/dist/scripts/build-unshim-parallel-world.py" \
--mvn-base-dir "$MVN_BASE_DIR" \
--source-dir "$SOURCE_DIR" \
--project-version "$RAPIDS_PROJECT_VERSION" \
--scala-binary-version "$RAPIDS_SCALA_BINARY_VERSION" \
--buildvers "$(join_by , "${SPARK_SHIM_VERSIONS[@]}")" \
--ignore-shim-revisions-check
exit 0
else
echo "Resuming at dist only using $BASE_VER"
FINAL_RESUME_OPT=""
FINAL_MODULE_OPT="--projects dist"
fi
else
echo "Resuming from $joinShimBuildFrom build only using $BASE_VER"
FINAL_RESUME_OPT="-rf $joinShimBuildFrom"
FINAL_MODULE_OPT="$MODULE_OPT"
fi
$MVN $FINAL_OP $FINAL_RESUME_OPT $FINAL_MODULE_OPT $MVN_PROFILE_OPT $INCLUDED_BUILDVERS_OPT \
-Dbuildver="$BASE_VER" \
-DskipTests -Dmaven.scaladoc.skip
-DskipTests -Dmaven.scaladoc.skip=true -Dmaven.javadoc.skip=true \
$MVN_FAST_SKIP_OPTS
)
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object GpuCheckDeltaInvariant extends Logging {
ExprChecks.projectOnly(
TypeSig.all,
TypeSig.all,
paramCheck = Seq(ParamCheck("input", TypeSig.all, TypeSig.all))),
paramCheck = Seq(new ParamCheck("input", TypeSig.all, TypeSig.all))),
(c, conf, p, r) => new GpuCheckDeltaInvariantMeta(c, conf, p, r))

def maybeConvertToGpu(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ trait DatabricksDeltaProviderBase extends DeltaProviderImplBase {
"Delta RTAS was tagged as unsupported and should not be converted to GPU")
}

protected case class DeltaWriteV1Config(
deltaLog: DeltaLog,
forceOverwrite: Boolean,
options: mutable.HashMap[String, String])
protected class DeltaWriteV1Config(
val deltaLog: DeltaLog,
val forceOverwrite: Boolean,
val options: mutable.HashMap[String, String])

private def extractWriteV1Config(
meta: RapidsMeta[_, _, _],
Expand Down Expand Up @@ -210,7 +210,7 @@ trait DatabricksDeltaProviderBase extends DeltaProviderImplBase {
f.get(outerObj).asInstanceOf[mutable.HashMap[String, String]]
}
if (forceOverwrite.isDefined && options.isDefined) {
Some(DeltaWriteV1Config(deltaLog, forceOverwrite.get, options.get))
Some(new DeltaWriteV1Config(deltaLog, forceOverwrite.get, options.get))
} else {
meta.willNotWorkOnGpu(s"write class has unsupported outer class $outerClass")
None
Expand Down
Loading
Loading