From b81b0826a5a0c8f3d616803d12cbc756e88b27c9 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 19 May 2025 21:36:40 +0800 Subject: [PATCH 1/3] [jvm-packages] ExtenralMemory: Overlap the caching time --- .../scala/spark/ExternalMemory.scala | 83 ++++++++++++++++--- 1 file changed, 70 insertions(+), 13 deletions(-) diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala index 735941e679c9..24fb9dec017e 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala @@ -18,10 +18,14 @@ package ml.dmlc.xgboost4j.scala.spark import java.io.File import java.nio.file.{Files, Paths} +import java.util.concurrent.Executors +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} import ai.rapids.cudf._ +import org.apache.commons.logging.LogFactory import ml.dmlc.xgboost4j.java.{ColumnBatch, CudfColumnBatch} import ml.dmlc.xgboost4j.scala.spark.Utils.withResource @@ -61,14 +65,23 @@ private[spark] trait ExternalMemory[T] extends Iterator[Table] with AutoCloseabl } // The data will be cached into disk. -private[spark] class DiskExternalMemoryIterator(val path: String) extends ExternalMemory[String] { +private[spark] class DiskExternalMemoryIterator(val parent: String) extends ExternalMemory[String] { + + private val logger = LogFactory.getLog("XGBoostSparkGpuPlugin") private lazy val root = { - val tmp = path + "/xgboost" + val tmp = parent + "/xgboost" createDirectory(tmp) tmp } + logger.info(s"DiskExternalMemoryIterator createDirectory $root") + + // Tasks mapping the path to the Future of caching table + private val taskFutures: mutable.HashMap[String, Future[Boolean]] = mutable.HashMap.empty + private val executor = Executors.newFixedThreadPool(2) + implicit val ec = ExecutionContext.fromExecutor(executor) + private var counter = 0 private def createDirectory(dirPath: String): Unit = { @@ -78,6 +91,31 @@ private[spark] class DiskExternalMemoryIterator(val path: String) extends Extern } } + /** + * Cache the table into disk which runs in a separate thread + * + * @param table to be cached + * @param path where to cache the table + */ + private def cacheTableThread(table: Table, path: String): Future[Boolean] = { + Future { + withResource(table) { _ => + try { + val names = (1 to table.getNumberOfColumns).map(_.toString) + val options = ArrowIPCWriterOptions.builder().withColumnNames(names: _*).build() + withResource(Table.writeArrowIPCChunked(options, new File(path))) { writer => + writer.write(table) + } + true + } catch { + case e: Throwable => + throw e + false + } + } + } + } + /** * Convert the table to file path which will be cached * @@ -85,13 +123,13 @@ private[spark] class DiskExternalMemoryIterator(val path: String) extends Extern * @return the content */ override def convertTable(table: Table): String = { - val names = (1 to table.getNumberOfColumns).map(_.toString) - val options = ArrowIPCWriterOptions.builder().withColumnNames(names: _*).build() - val path = root + "/table_" + counter + "_" + System.nanoTime(); + val path = root + "/table_" + counter + "_" + System.nanoTime() counter += 1 - withResource(Table.writeArrowIPCChunked(options, new File(path))) { writer => - writer.write(table) - } + + // Increase the reference count of columnars to avoid being recycled + val newTable = new Table((0 until table.getNumberOfColumns).map(table.getColumn): _*) + val future = cacheTableThread(newTable, path) + taskFutures += (path -> future) path } @@ -106,17 +144,35 @@ private[spark] class DiskExternalMemoryIterator(val path: String) extends Extern } } + private def checkAndWaitCachingDone(path: String): Unit = { + var count = 1 + // Wait 6s to check if the caching is done. + // TODO, make it configurable + while (count < 120) { + val futureOpt = taskFutures.get(path) + if (futureOpt.isDefined && futureOpt.get.isCompleted) { + return + } + count += 1 + Thread.sleep(50) + } + throw new RuntimeException(s"Caching $path is not finished") + } + /** * Load the path from disk to the Table * - * @param name to be loaded + * @param path to be loaded * @return Table */ - override def loadTable(name: String): Table = { - val file = new File(name) + override def loadTable(path: String): Table = { + val file = new File(path) if (!file.exists()) { - throw new RuntimeException(s"The cache file ${name} doesn't exist" ) + throw new RuntimeException(s"The cache file ${path} doesn't exist" ) } + + checkAndWaitCachingDone(path) + try { withResource(Table.readArrowIPCChunked(file)) { reader => val tables = ArrayBuffer.empty[Table] @@ -147,6 +203,7 @@ private[spark] class DiskExternalMemoryIterator(val path: String) extends Extern } override def close(): Unit = { + executor.shutdown() buffers.foreach { path => val file = new File(path) if (file.exists()) { @@ -169,7 +226,7 @@ private[spark] object ExternalMemory { * * The first round iteration gets the input batch that will be * 1. cached in the external memory - * 2. fed in QuantilDmatrix + * 2. fed in QuantileDMatrix * The second round iteration returns the cached batch got from external memory. * * @param input the spark input iterator From 6008ef3572128842896aa15343e634f87490f36c Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 21 May 2025 14:29:36 +0800 Subject: [PATCH 2/3] fix small file --- .../dmlc/xgboost4j/scala/spark/ExternalMemory.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala index 24fb9dec017e..da0990c5b4b3 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala @@ -150,13 +150,14 @@ private[spark] class DiskExternalMemoryIterator(val parent: String) extends Exte // TODO, make it configurable while (count < 120) { val futureOpt = taskFutures.get(path) - if (futureOpt.isDefined && futureOpt.get.isCompleted) { + val exist = new File(path).exists() + if (futureOpt.isDefined && futureOpt.get.isCompleted && exist) { return } count += 1 Thread.sleep(50) } - throw new RuntimeException(s"Caching $path is not finished") + throw new RuntimeException(s"The cache file $path does not exist") } /** @@ -167,13 +168,10 @@ private[spark] class DiskExternalMemoryIterator(val parent: String) extends Exte */ override def loadTable(path: String): Table = { val file = new File(path) - if (!file.exists()) { - throw new RuntimeException(s"The cache file ${path} doesn't exist" ) - } - - checkAndWaitCachingDone(path) try { + checkAndWaitCachingDone(path) + withResource(Table.readArrowIPCChunked(file)) { reader => val tables = ArrayBuffer.empty[Table] closeOnExcept(tables) { tables => From 795e151d853ccb1249f0e981462072dd478721bb Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 21 May 2025 15:41:17 +0800 Subject: [PATCH 3/3] fix --- .../scala/spark/ExternalMemory.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala index da0990c5b4b3..f6247bfd4f0b 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala @@ -22,7 +22,8 @@ import java.util.concurrent.Executors import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.DurationInt import ai.rapids.cudf._ import org.apache.commons.logging.LogFactory @@ -145,19 +146,17 @@ private[spark] class DiskExternalMemoryIterator(val parent: String) extends Exte } private def checkAndWaitCachingDone(path: String): Unit = { - var count = 1 + val futureOpt = taskFutures.get(path) + if (futureOpt.isEmpty) { + throw new RuntimeException(s"Failed to find the caching process for $path") + } // Wait 6s to check if the caching is done. // TODO, make it configurable - while (count < 120) { - val futureOpt = taskFutures.get(path) - val exist = new File(path).exists() - if (futureOpt.isDefined && futureOpt.get.isCompleted && exist) { - return - } - count += 1 - Thread.sleep(50) + // If timeout, it's going to throw exception + val success = Await.result(futureOpt.get, 6.seconds) + if (!success) { // Failed to cache + throw new RuntimeException(s"Failed to cache table to $path") } - throw new RuntimeException(s"The cache file $path does not exist") } /**