diff --git a/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py b/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py index d6dd761188b14..e180616080563 100644 --- a/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py +++ b/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py @@ -2501,7 +2501,7 @@ class UpgradeWorkflow_SonicTriton(UpgradeWorkflow): def setup_(self, step, stepName, stepDict, k, properties): stepDict[stepName][k] = merge([{'--procModifiers': 'allSonicTriton'}, stepDict[step][k]]) def condition(self, fragment, stepList, key, hasHarvest): - return (fragment=='TTbar_13' and '2021' in key) \ + return ((fragment=='TTbar_13' or fragment=='TTbar_14TeV') and '2021' in key) \ or (fragment=='TTbar_14TeV' and '2026' in key) upgradeWFs['SonicTriton'] = UpgradeWorkflow_SonicTriton( steps = [ diff --git a/Configuration/PyReleaseValidation/scripts/runTheMatrix.py b/Configuration/PyReleaseValidation/scripts/runTheMatrix.py index 610460b31f02d..05bad56169e65 100755 --- a/Configuration/PyReleaseValidation/scripts/runTheMatrix.py +++ b/Configuration/PyReleaseValidation/scripts/runTheMatrix.py @@ -35,7 +35,7 @@ def runSelected(opt): ret = 0 if opt.show: mrd.show(opt.testList, opt.extended, opt.cafVeto) - if opt.testList : print('testListected items:', opt.testList) + if opt.testList : print('selected items:', opt.testList) else: mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads) ret = mRunnerHi.runTests(opt) diff --git a/HeterogeneousCore/SonicTriton/README.md b/HeterogeneousCore/SonicTriton/README.md index 7eed0f67989fa..4050f36beaf0a 100644 --- a/HeterogeneousCore/SonicTriton/README.md +++ b/HeterogeneousCore/SonicTriton/README.md @@ -29,6 +29,7 @@ The model information from the server can be printed by enabling `verbose` outpu * `modelConfigPath`: path to `config.pbtxt` file for the model (using `edm::FileInPath`) * `preferredServer`: name of preferred server, for testing (see [Services](#services) below) * `timeout`: maximum allowed time for a request (disabled with 0) +* `timeoutUnit`: seconds, milliseconds, or microseconds (default: seconds) * `outputs`: optional, specify which output(s) the server should send * `verbose`: enable verbose printouts (default: false) * `useSharedMemory`: enable use of shared memory (see [below](#shared-memory)) with local servers (default: true) @@ -132,6 +133,7 @@ The script has three operations (`start`, `stop`, `check`) and the following opt * `-C [dir]`: directory containing Nvidia compatibility drivers (checks CMSSW_BASE by default if available) * `-D`: dry run: print container commands rather than executing them * `-d`: use Docker instead of Apptainer +* `-E [path]`: include extra path(s) for executables (default: /cvmfs/oasis.opensciencegrid.org/mis/apptainer/current/bin) * `-f`: force reuse of (possibly) existing container instance * `-g`: use GPU instead of CPU * `-i` [name]`: server image name (default: fastml/triton-torchgeo:22.07-py3-geometric) diff --git a/HeterogeneousCore/SonicTriton/interface/TritonClient.h b/HeterogeneousCore/SonicTriton/interface/TritonClient.h index 833d329417d18..4661c3e57c54c 100644 --- a/HeterogeneousCore/SonicTriton/interface/TritonClient.h +++ b/HeterogeneousCore/SonicTriton/interface/TritonClient.h @@ -48,6 +48,7 @@ class TritonClient : public SonicClient { void resetBatchMode(); void reset() override; TritonServerType serverType() const { return serverType_; } + bool isLocal() const { return isLocal_; } //for fillDescriptions static void fillPSetDescription(edm::ParameterSetDescription& iDesc); @@ -78,6 +79,7 @@ class TritonClient : public SonicClient { bool verbose_; bool useSharedMemory_; TritonServerType serverType_; + bool isLocal_; grpc_compression_algorithm compressionAlgo_; triton::client::Headers headers_; diff --git a/HeterogeneousCore/SonicTriton/interface/TritonException.h b/HeterogeneousCore/SonicTriton/interface/TritonException.h index 418a98fc1bd8e..3066c969a8e14 100644 --- a/HeterogeneousCore/SonicTriton/interface/TritonException.h +++ b/HeterogeneousCore/SonicTriton/interface/TritonException.h @@ -7,7 +7,7 @@ class TritonException : public cms::Exception { public: - explicit TritonException(std::string const& aCategory); + explicit TritonException(std::string const& aCategory, bool signal = false); void convertToWarning() const; }; diff --git a/HeterogeneousCore/SonicTriton/interface/TritonMemResource.h b/HeterogeneousCore/SonicTriton/interface/TritonMemResource.h index 9ccd27fd0c0cf..a1352fc25b7ee 100644 --- a/HeterogeneousCore/SonicTriton/interface/TritonMemResource.h +++ b/HeterogeneousCore/SonicTriton/interface/TritonMemResource.h @@ -19,6 +19,7 @@ class TritonMemResource { uint8_t* addr() { return addr_; } size_t size() const { return size_; } virtual void close() {} + void closeSafe(); //used for input virtual void copyInput(const void* values, size_t offset, unsigned entry) {} //used for output diff --git a/HeterogeneousCore/SonicTriton/interface/TritonService.h b/HeterogeneousCore/SonicTriton/interface/TritonService.h index 98d0d410924b8..f4d6093695dad 100644 --- a/HeterogeneousCore/SonicTriton/interface/TritonService.h +++ b/HeterogeneousCore/SonicTriton/interface/TritonService.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "grpc_client.h" @@ -112,6 +113,7 @@ class TritonService { void addModel(const std::string& modelName, const std::string& path); Server serverInfo(const std::string& model, const std::string& preferred = "") const; const std::string& pid() const { return pid_; } + void notifyCallStatus(bool status) const; static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); @@ -132,6 +134,7 @@ class TritonService { unsigned currentModuleId_; bool allowAddModel_; bool startedFallback_; + mutable std::atomic callFails_; std::string pid_; std::unordered_map unservedModels_; //this represents a many:many:many map diff --git a/HeterogeneousCore/SonicTriton/interface/triton_utils.h b/HeterogeneousCore/SonicTriton/interface/triton_utils.h index d6c7612a5159c..01ac1ef0f0b0f 100644 --- a/HeterogeneousCore/SonicTriton/interface/triton_utils.h +++ b/HeterogeneousCore/SonicTriton/interface/triton_utils.h @@ -1,6 +1,7 @@ #ifndef HeterogeneousCore_SonicTriton_triton_utils #define HeterogeneousCore_SonicTriton_triton_utils +#include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/Span.h" #include "HeterogeneousCore/SonicTriton/interface/TritonException.h" @@ -19,6 +20,8 @@ namespace triton_utils { bool checkType(inference::DataType dtype) { return false; } + //turn CMS exceptions into warnings + void convertToWarning(const cms::Exception& e); } // namespace triton_utils //explicit specializations (inlined) @@ -72,11 +75,11 @@ inline bool triton_utils::checkType(inference::DataType dtype) { //helper to turn triton error into exception //implemented as a macro to avoid constructing the MSG string for successful function calls -#define TRITON_THROW_IF_ERROR(X, MSG) \ - { \ - triton::client::Error err = (X); \ - if (!err.IsOk()) \ - throw TritonException("TritonFailure") << (MSG) << (err.Message().empty() ? "" : ": " + err.Message()); \ +#define TRITON_THROW_IF_ERROR(X, MSG, NOTIFY) \ + { \ + triton::client::Error err = (X); \ + if (!err.IsOk()) \ + throw TritonException("TritonFailure", NOTIFY) << (MSG) << (err.Message().empty() ? "" : ": " + err.Message()); \ } extern template std::string triton_utils::printColl(const edm::Span::const_iterator>& coll, diff --git a/HeterogeneousCore/SonicTriton/scripts/cmsTriton b/HeterogeneousCore/SonicTriton/scripts/cmsTriton index 9c84be2b62616..3949d6f21826b 100755 --- a/HeterogeneousCore/SonicTriton/scripts/cmsTriton +++ b/HeterogeneousCore/SonicTriton/scripts/cmsTriton @@ -5,7 +5,7 @@ USEDOCKER="" GPU="" VERBOSE="" VERBOSE_ARGS="--log-verbose=1 --log-error=1 --log-warning=1 --log-info=1" -WTIME=300 +WTIME=600 SERVER=triton_server_instance RETRIES=3 REPOS=() @@ -23,6 +23,7 @@ NPORTS=3 IMAGE=fastml/triton-torchgeo:22.07-py3-geometric SANDBOX="" COMPAT_USR="" +EXTRAPATH=/cvmfs/oasis.opensciencegrid.org/mis/apptainer/current/bin get_sandbox(){ if [ -z "$SANDBOX" ]; then @@ -41,6 +42,7 @@ usage() { $ECHO "-C [dir] \t directory containing Nvidia compatibility drivers (checks CMSSW_BASE by default if available)" $ECHO "-D \t dry run: print container commands rather than executing them" $ECHO "-d \t use Docker instead of Apptainer" + $ECHO "-E [path] \t include extra path(s) for executables (default: ${EXTRAPATH})" $ECHO "-f \t force reuse of (possibly) existing container instance" $ECHO "-g \t use GPU instead of CPU" $ECHO "-i [name] \t server image name (default: ${IMAGE})" @@ -131,6 +133,11 @@ else TMPDIR=$(readlink -f $TMPDIR) fi +# update path +if [ -n "$EXTRAPATH" ]; then + export PATH="${EXTRAPATH}:${PATH}" +fi + # find executables if [ -n "$USEDOCKER" ]; then if [ -z "$DOCKER" ]; then @@ -149,7 +156,6 @@ else fi fi - SANDBOX=$(get_sandbox) SANDBOX=$(readlink -f ${SANDBOX}) LOG="log_${SERVER}.log" @@ -160,6 +166,12 @@ SEGFAULT_INDICATOR="Address already in use" EXTRA="" COMPAT_SCRIPT=/etc/shinit_v2 +THREADCONTROL="" +# do not apply thread control settings if GPU use is requested +if [ "$INSTANCES" -gt 0 ] && [ -z "$GPU" ]; then + THREADCONTROL=true +fi + compute_ports(){ # compute derived port numbers export HTTPPORT=$BASEPORT @@ -341,7 +353,7 @@ wait_server(){ list_models(){ # make list of model repositories LOCALMODELREPO="local_model_repo" - if [ "$INSTANCES" -gt 0 ]; then + if [ -n "$THREADCONTROL" ]; then if [ -d "$TMPDIR/$LOCALMODELREPO" ]; then #Want to start with a fresh copy of model files in case this directory already exists with local edits rm -rf $TMPDIR/$LOCALMODELREPO @@ -359,7 +371,7 @@ list_models(){ if [ -f "$MODEL" ]; then MODEL="$(dirname "$MODEL")" fi - if [ "$INSTANCES" -gt 0 ]; then + if [ -n "$THREADCONTROL" ]; then $DRYRUN cmsTritonConfigTool threadcontrol -c ${MODEL}/config.pbtxt --copy $TMPDIR/$LOCALMODELREPO --nThreads $INSTANCES TOOL_EXIT=$? if [ "$TOOL_EXIT" -ne 0 ]; then @@ -370,7 +382,7 @@ list_models(){ REPOS+=("$(dirname "$MODEL")") fi done - if [ "$INSTANCES" -gt 0 ]; then + if [ -n "$THREADCONTROL" ]; then REPOS=$TMPDIR/$LOCALMODELREPO else for ((r=0; r < ${#REPOS[@]}; r++)); do @@ -394,6 +406,7 @@ auto_stop(){ fi PCOUNTER=0 PMAX=5 + # builtin wait is not used here because it can only monitor a child process, not a parent process while [ "$PCOUNTER" -le "$PMAX" ]; do if ! kill -0 $PARENTPID >& /dev/null; then PCOUNTER=$((PCOUNTER+1)) @@ -415,13 +428,11 @@ auto_stop(){ $STOP_FN # move logs out of tmp dir - if [ -z "$DRYRUN" ]; then - if [ -n "$VERBOSE" ]; then - mv "$LOG" "$TOPDIR" - # only keep non-empty log - if [ -s "$STOPLOG" ]; then - mv "$STOPLOG" "$TOPDIR" - fi + if [ -z "$DRYRUN" ] && [ -n "$VERBOSE" ]; then + mv "$LOG" "$TOPDIR" + # only keep non-empty log + if [ -s "$STOPLOG" ]; then + mv "$STOPLOG" "$TOPDIR" fi fi @@ -569,11 +580,16 @@ elif [ "$OP" == start ]; then START_EXIT=0 for ((counter=0; counter < ${RETRIES}; counter++)); do make_tmp - #If we plan on editing model configs, must repull files into /tmp/local_model_repo, which is deleted upon retry - if [ "$counter" -eq 0 ] || [ "$INSTANCES" -gt 0 ]; then list_models; fi - check_drivers - DRIVER_EXIT=$? - if [ "$DRIVER_EXIT" -ne 0 ]; then exit $DRIVER_EXIT; fi + + # if we plan on editing model configs, must copy files into /tmp/local_model_repo, which is deleted upon retry + if [ "$counter" -eq 0 ] || [ -n "$THREADCONTROL" ]; then list_models; fi + + # only need to check drivers if using GPU + if [ -n "$GPU" ]; then + check_drivers + DRIVER_EXIT=$? + if [ "$DRIVER_EXIT" -ne 0 ]; then exit $DRIVER_EXIT; fi + fi $START_FN START_EXIT=$? diff --git a/HeterogeneousCore/SonicTriton/scripts/cmsTritonConfigTool b/HeterogeneousCore/SonicTriton/scripts/cmsTritonConfigTool index 9a1ef54c57da6..00c08742dd5f9 100755 --- a/HeterogeneousCore/SonicTriton/scripts/cmsTritonConfigTool +++ b/HeterogeneousCore/SonicTriton/scripts/cmsTritonConfigTool @@ -261,9 +261,11 @@ def cfg_checksum(args): missing = [] from glob import glob - config_dir = os.path.dirname(args.config) + # evaluate symbolic links + config_dir = os.path.realpath(os.path.dirname(args.config)) for filename in glob(os.path.join(config_dir,"*/*")): - if os.path.islink(os.path.dirname(filename)): continue + # evaluate symbolic links again + filename = os.path.realpath(filename) checksum = get_checksum(filename) # key = algorithm:[filename relative to config.pbtxt dir] filename = os.path.relpath(filename, config_dir) @@ -324,10 +326,12 @@ def cfg_versioncheck(args): missing = [] for path in os.environ['CMSSW_SEARCH_PATH'].split(':'): - for dirpath, dirnames, filenames in os.walk(path): + if args.verbose: print("Checking: "+path) + for dirpath, dirnames, filenames in os.walk(path, followlinks=True): for filename in filenames: if filename=="config.pbtxt": filepath = os.path.join(dirpath,filename) + if args.verbose: print(filepath) checksum_args = Namespace( config=filepath, should_return=True, copy=False, json=False, defaults=False, view=False, @@ -435,6 +439,7 @@ if __name__=="__main__": parser_checksum.set_defaults(func=cfg_checksum) parser_versioncheck = subparsers.add_parser("versioncheck", parents=[_parser_checksum_update], help="check all model checksums") + parser_versioncheck.add_argument("--verbose", default=False, action="store_true", help="verbose output (show all files checked)") parser_versioncheck.set_defaults(func=cfg_versioncheck) _parser_copy_req = ArgumentParser(add_help=False, parents=[_parser_copy_view]) diff --git a/HeterogeneousCore/SonicTriton/src/TritonClient.cc b/HeterogeneousCore/SonicTriton/src/TritonClient.cc index 201ad40d35a0e..76fd670bb66bc 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonClient.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonClient.cc @@ -1,5 +1,6 @@ #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/FileInPath.h" +#include "FWCore/ParameterSet/interface/allowedValues.h" #include "FWCore/ServiceRegistry/interface/Service.h" #include "FWCore/Utilities/interface/Exception.h" #include "HeterogeneousCore/SonicTriton/interface/TritonClient.h" @@ -71,16 +72,29 @@ TritonClient::TritonClient(const edm::ParameterSet& params, const std::string& d //todo: could enforce async mode otherwise (unless mode was specified by user?) if (serverType_ == TritonServerType::LocalCPU) setMode(SonicMode::Sync); + isLocal_ = serverType_ == TritonServerType::LocalCPU or serverType_ == TritonServerType::LocalGPU; //connect to the server TRITON_THROW_IF_ERROR( tc::InferenceServerGrpcClient::Create(&client_, server.url, false, server.useSsl, server.sslOptions), - "TritonClient(): unable to create inference context"); + "TritonClient(): unable to create inference context", + isLocal_); //set options options_[0].model_version_ = params.getParameter("modelVersion"); - //convert seconds to microseconds - options_[0].client_timeout_ = params.getUntrackedParameter("timeout") * 1e6; + options_[0].client_timeout_ = params.getUntrackedParameter("timeout"); + //convert to microseconds + const auto& timeoutUnit = params.getUntrackedParameter("timeoutUnit"); + unsigned conversion = 1; + if (timeoutUnit == "seconds") + conversion = 1e6; + else if (timeoutUnit == "milliseconds") + conversion = 1e3; + else if (timeoutUnit == "microseconds") + conversion = 1; + else + throw cms::Exception("Configuration") << "Unknown timeout unit: " << timeoutUnit; + options_[0].client_timeout_ *= conversion; //get fixed parameters from local config inference::ModelConfig localModelConfig; @@ -110,7 +124,8 @@ TritonClient::TritonClient(const edm::ParameterSet& params, const std::string& d //compare model checksums to remote config to enforce versioning inference::ModelConfigResponse modelConfigResponse; TRITON_THROW_IF_ERROR(client_->ModelConfig(&modelConfigResponse, options_[0].model_name_, options_[0].model_version_), - "TritonClient(): unable to get model config"); + "TritonClient(): unable to get model config", + isLocal_); inference::ModelConfig remoteModelConfig(modelConfigResponse.config()); std::map> checksums; @@ -140,7 +155,8 @@ TritonClient::TritonClient(const edm::ParameterSet& params, const std::string& d //get model info inference::ModelMetadataResponse modelMetadata; TRITON_THROW_IF_ERROR(client_->ModelMetadata(&modelMetadata, options_[0].model_name_, options_[0].model_version_), - "TritonClient(): unable to get model metadata"); + "TritonClient(): unable to get model metadata", + isLocal_); //get input and output (which know their sizes) const auto& nicInputs = modelMetadata.inputs(); @@ -329,8 +345,8 @@ void TritonClient::getResults(const std::vector //set shape here before output becomes const if (output.variableDims()) { std::vector tmp_shape; - TRITON_THROW_IF_ERROR(result->Shape(oname, &tmp_shape), - "getResults(): unable to get output shape for " + oname); + TRITON_THROW_IF_ERROR( + result->Shape(oname, &tmp_shape), "getResults(): unable to get output shape for " + oname, false); if (!noOuterDim_) tmp_shape.erase(tmp_shape.begin()); output.setShape(tmp_shape, i); @@ -346,6 +362,12 @@ void TritonClient::getResults(const std::vector //default case for sync and pseudo async void TritonClient::evaluate() { + //undo previous signal from TritonException + if (tries_ > 0) { + edm::Service ts; + ts->notifyCallStatus(true); + } + //in case there is nothing to process if (batchSize() == 0) { //call getResults on an empty vector @@ -400,43 +422,45 @@ void TritonClient::evaluate() { if (mode_ == SonicMode::Async) { //non-blocking call success = handle_exception([&]() { - TRITON_THROW_IF_ERROR( - client_->AsyncInferMulti( - [start_status, this](std::vector resultsTmp) { - //immediately convert to shared_ptr - const auto& results = convertToShared(resultsTmp); - //check results - for (auto ptr : results) { - auto success = handle_exception( - [&]() { TRITON_THROW_IF_ERROR(ptr->RequestStatus(), "evaluate(): unable to get result(s)"); }); - if (!success) - return; - } - - if (verbose()) { - inference::ModelStatistics end_status; - auto success = handle_exception([&]() { end_status = getServerSideStatus(); }); - if (!success) - return; - - const auto& stats = summarizeServerStats(start_status, end_status); - reportServerSideStats(stats); - } - - //check result - auto success = handle_exception([&]() { getResults(results); }); - if (!success) - return; - - //finish - finish(true); - }, - options_, - inputsTriton, - outputsTriton, - headers_, - compressionAlgo_), - "evaluate(): unable to launch async run"); + TRITON_THROW_IF_ERROR(client_->AsyncInferMulti( + [start_status, this](std::vector resultsTmp) { + //immediately convert to shared_ptr + const auto& results = convertToShared(resultsTmp); + //check results + for (auto ptr : results) { + auto success = handle_exception([&]() { + TRITON_THROW_IF_ERROR( + ptr->RequestStatus(), "evaluate(): unable to get result(s)", isLocal_); + }); + if (!success) + return; + } + + if (verbose()) { + inference::ModelStatistics end_status; + auto success = handle_exception([&]() { end_status = getServerSideStatus(); }); + if (!success) + return; + + const auto& stats = summarizeServerStats(start_status, end_status); + reportServerSideStats(stats); + } + + //check result + auto success = handle_exception([&]() { getResults(results); }); + if (!success) + return; + + //finish + finish(true); + }, + options_, + inputsTriton, + outputsTriton, + headers_, + compressionAlgo_), + "evaluate(): unable to launch async run", + isLocal_); }); if (!success) return; @@ -446,7 +470,8 @@ void TritonClient::evaluate() { success = handle_exception([&]() { TRITON_THROW_IF_ERROR( client_->InferMulti(&resultsTmp, options_, inputsTriton, outputsTriton, headers_, compressionAlgo_), - "evaluate(): unable to run and/or get result"); + "evaluate(): unable to run and/or get result", + isLocal_); }); //immediately convert to shared_ptr const auto& results = convertToShared(resultsTmp); @@ -533,7 +558,8 @@ inference::ModelStatistics TritonClient::getServerSideStatus() const { if (verbose_) { inference::ModelStatisticsResponse resp; TRITON_THROW_IF_ERROR(client_->ModelInferenceStatistics(&resp, options_[0].model_name_, options_[0].model_version_), - "getServerSideStatus(): unable to get model statistics"); + "getServerSideStatus(): unable to get model statistics", + isLocal_); return *(resp.model_stats().begin()); } return inference::ModelStatistics{}; @@ -549,6 +575,8 @@ void TritonClient::fillPSetDescription(edm::ParameterSetDescription& iDesc) { //server parameters should not affect the physics results descClient.addUntracked("preferredServer", ""); descClient.addUntracked("timeout"); + descClient.ifValue(edm::ParameterDescription("timeoutUnit", "seconds", false), + edm::allowedValues("seconds", "milliseconds", "microseconds")); descClient.addUntracked("useSharedMemory", true); descClient.addUntracked("compression", ""); descClient.addUntracked>("outputs", {}); diff --git a/HeterogeneousCore/SonicTriton/src/TritonException.cc b/HeterogeneousCore/SonicTriton/src/TritonException.cc index ee160ffe957c2..d1a08dd653fee 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonException.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonException.cc @@ -1,6 +1,13 @@ #include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "HeterogeneousCore/SonicTriton/interface/TritonException.h" +#include "HeterogeneousCore/SonicTriton/interface/TritonService.h" -TritonException::TritonException(std::string const& aCategory) : cms::Exception(aCategory) {} +TritonException::TritonException(std::string const& aCategory, bool signal) : cms::Exception(aCategory) { + if (signal) { + edm::Service ts; + ts->notifyCallStatus(false); + } +} void TritonException::convertToWarning() const { edm::LogWarning(category()) << explainSelf(); } diff --git a/HeterogeneousCore/SonicTriton/src/TritonMemResource.cc b/HeterogeneousCore/SonicTriton/src/TritonMemResource.cc index 102f28eda6601..3f6ef96681309 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonMemResource.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonMemResource.cc @@ -19,7 +19,21 @@ template void TritonMemResource::set() { for (auto& entry : data_->entries_) { TRITON_THROW_IF_ERROR(entry.data_->SetSharedMemory(name_, entry.totalByteSize_, entry.offset_), - "unable to set shared memory (" + name_ + ")"); + "unable to set shared memory (" + name_ + ")", + true); + } +} + +template +void TritonMemResource::closeSafe() { + CMS_SA_ALLOW try { close(); } catch (TritonException& e) { + e.convertToWarning(); + } catch (cms::Exception& e) { + triton_utils::convertToWarning(e); + } catch (std::exception& e) { + edm::LogWarning("UnknownFailure") << e.what(); + } catch (...) { + edm::LogWarning("UnknownFailure") << "An unknown exception was thrown"; } } @@ -35,7 +49,8 @@ void TritonInputHeapResource::copyInput(const void* values, size_t offset, unsig (data_->entries_.size() > 1 ? std::to_string(entry) : data_->entries_[entry].byteSizePerBatch_ ? std::to_string(offset / data_->entries_[entry].byteSizePerBatch_) - : "")); + : ""), + false); } template <> @@ -45,7 +60,8 @@ void TritonOutputHeapResource::copyOutput() { size_t contentByteSizeEntry(0); if (entry.totalByteSize_ > 0) TRITON_THROW_IF_ERROR(entry.result_->RawData(data_->name_, &entry.output_, &contentByteSizeEntry), - data_->name_ + " fromServer(): unable to get raw"); + data_->name_ + " fromServer(): unable to get raw", + false); contentByteSize += contentByteSizeEntry; } if (contentByteSize != data_->totalByteSize_) { @@ -87,12 +103,13 @@ TritonCpuShmResource::TritonCpuShmResource(TritonData* data, const std:: throw cms::Exception("TritonError") << "unable to close descriptor for shared memory key: " + this->name_; TRITON_THROW_IF_ERROR(this->data_->client()->RegisterSystemSharedMemory(this->name_, this->name_, this->size_), - "unable to register shared memory region: " + this->name_); + "unable to register shared memory region: " + this->name_, + true); } template TritonCpuShmResource::~TritonCpuShmResource() { - close(); + this->closeSafe(); } template @@ -101,7 +118,8 @@ void TritonCpuShmResource::close() { return; TRITON_THROW_IF_ERROR(this->data_->client()->UnregisterSystemSharedMemory(this->name_), - "unable to unregister shared memory region: " + this->name_); + "unable to unregister shared memory region: " + this->name_, + true); //unmap int tmp_fd = munmap(this->addr_, this->size_); @@ -143,12 +161,13 @@ TritonGpuShmResource::TritonGpuShmResource(TritonData* data, const std:: cudaCheck(cudaMalloc((void**)&this->addr_, this->size_), "unable to allocate GPU memory for key: " + this->name_); cudaCheck(cudaIpcGetMemHandle(handle_.get(), this->addr_), "unable to get IPC handle for key: " + this->name_); TRITON_THROW_IF_ERROR(this->data_->client()->RegisterCudaSharedMemory(this->name_, *handle_, deviceId_, this->size_), - "unable to register CUDA shared memory region: " + this->name_); + "unable to register CUDA shared memory region: " + this->name_, + true); } template TritonGpuShmResource::~TritonGpuShmResource() { - close(); + this->closeSafe(); } template @@ -156,7 +175,8 @@ void TritonGpuShmResource::close() { if (this->closed_) return; TRITON_THROW_IF_ERROR(this->data_->client()->UnregisterCudaSharedMemory(this->name_), - "unable to unregister CUDA shared memory region: " + this->name_); + "unable to unregister CUDA shared memory region: " + this->name_, + true); cudaCheck(cudaFree(this->addr_), "unable to free GPU memory for key: " + this->name_); this->closed_ = true; } diff --git a/HeterogeneousCore/SonicTriton/src/TritonService.cc b/HeterogeneousCore/SonicTriton/src/TritonService.cc index 8a34b6197130c..53b94f767062b 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonService.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonService.cc @@ -61,6 +61,7 @@ TritonService::TritonService(const edm::ParameterSet& pset, edm::ActivityRegistr currentModuleId_(0), allowAddModel_(false), startedFallback_(false), + callFails_(0), pid_(std::to_string(::getpid())) { //module construction is assumed to be serial (correct at the time this code was written) @@ -104,20 +105,22 @@ TritonService::TritonService(const edm::ParameterSet& pset, edm::ActivityRegistr std::unique_ptr client; TRITON_THROW_IF_ERROR( tc::InferenceServerGrpcClient::Create(&client, server.url, false, server.useSsl, server.sslOptions), - "TritonService(): unable to create inference context for " + serverName + " (" + server.url + ")"); + "TritonService(): unable to create inference context for " + serverName + " (" + server.url + ")", + false); if (verbose_) { inference::ServerMetadataResponse serverMetaResponse; TRITON_THROW_IF_ERROR(client->ServerMetadata(&serverMetaResponse), - "TritonService(): unable to get metadata for " + serverName + " (" + server.url + ")"); + "TritonService(): unable to get metadata for " + serverName + " (" + server.url + ")", + false); edm::LogInfo("TritonService") << "Server " << serverName << ": url = " << server.url << ", version = " << serverMetaResponse.version(); } inference::RepositoryIndexResponse repoIndexResponse; - TRITON_THROW_IF_ERROR( - client->ModelRepositoryIndex(&repoIndexResponse), - "TritonService(): unable to get repository index for " + serverName + " (" + server.url + ")"); + TRITON_THROW_IF_ERROR(client->ModelRepositoryIndex(&repoIndexResponse), + "TritonService(): unable to get repository index for " + serverName + " (" + server.url + ")", + false); //servers keep track of models and vice versa if (verbose_) @@ -297,20 +300,39 @@ void TritonService::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm:: << output; } +void TritonService::notifyCallStatus(bool status) const { + if (status) + --callFails_; + else + ++callFails_; +} + void TritonService::postEndJob() { if (!startedFallback_) return; - std::string command = fallbackOpts_.command + " stop"; + std::string command = fallbackOpts_.command; + //prevent log cleanup during server stop + if (callFails_ > 0) + command += " -c"; + command += " stop"; if (verbose_) edm::LogInfo("TritonService") << command; const auto& [output, rv] = execSys(command); - if (rv != 0) { + if (rv != 0 or callFails_ > 0) { + //print logs if cmsRun is currently exiting because of a TritonException edm::LogError("TritonService") << output; printFallbackServerLog(); - throw cms::Exception("FallbackFailed") - << "TritonService: Stopping the fallback server failed with exit code " << rv; + if (rv != 0) { + std::string stopCat("FallbackFailed"); + std::string stopMsg = fmt::format("TritonService: Stopping the fallback server failed with exit code {}", rv); + //avoid throwing if the stack is already unwinding + if (callFails_ > 0) + edm::LogWarning(stopCat) << stopMsg; + else + throw cms::Exception(stopCat) << stopMsg; + } } else if (verbose_) { edm::LogInfo("TritonService") << output; printFallbackServerLog(); diff --git a/HeterogeneousCore/SonicTriton/src/triton_utils.cc b/HeterogeneousCore/SonicTriton/src/triton_utils.cc index a71190d951e46..322dddf133381 100644 --- a/HeterogeneousCore/SonicTriton/src/triton_utils.cc +++ b/HeterogeneousCore/SonicTriton/src/triton_utils.cc @@ -1,3 +1,4 @@ +#include "FWCore/MessageLogger/interface/MessageLogger.h" #include "HeterogeneousCore/SonicTriton/interface/triton_utils.h" #include @@ -15,6 +16,7 @@ namespace triton_utils { return msg.str(); } + void convertToWarning(const cms::Exception& e) { edm::LogWarning(e.category()) << e.explainSelf(); } } // namespace triton_utils template std::string triton_utils::printColl(const edm::Span::const_iterator>& coll, diff --git a/HeterogeneousCore/SonicTriton/test/README.md b/HeterogeneousCore/SonicTriton/test/README.md index 2249a3ebd6ac1..5ace852ba0153 100644 --- a/HeterogeneousCore/SonicTriton/test/README.md +++ b/HeterogeneousCore/SonicTriton/test/README.md @@ -16,17 +16,17 @@ The local server will use Apptainer with CPU by default; if a local Nvidia GPU i Run the image test: ``` -cmsRun tritonTest_cfg.py maxEvents=1 modules=TritonImageProducer,TritonImageProducer models=inception_graphdef,densenet_onnx +cmsRun tritonTest_cfg.py --maxEvents 1 --modules TritonImageProducer TritonImageProducer --models inception_graphdef densenet_onnx ``` Run the identity test with ragged batching: ``` -cmsRun tritonTest_cfg.py maxEvents=1 modules=TritonIdentityProducer models=ragged_io +cmsRun tritonTest_cfg.py --maxEvents 1 --modules TritonIdentityProducer --models ragged_io ``` Run the graph test: ``` -cmsRun tritonTest_cfg.py maxEvents=1 modules=TritonGraphProducer +cmsRun tritonTest_cfg.py --maxEvents 1 --modules TritonGraphProducer ``` ## Caveats diff --git a/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py b/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py index 1773e252cfae2..fa891adb88721 100644 --- a/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py +++ b/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py @@ -1,6 +1,6 @@ -from FWCore.ParameterSet.VarParsing import VarParsing import FWCore.ParameterSet.Config as cms import os, sys, json +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter # module/model correspondence models = { @@ -16,30 +16,34 @@ allowed_compression = ["none","deflate","gzip"] allowed_devices = ["auto","cpu","gpu"] -options = VarParsing() -options.register("maxEvents", -1, VarParsing.multiplicity.singleton, VarParsing.varType.int, "Number of events to process (-1 for all)") -options.register("serverName", "default", VarParsing.multiplicity.singleton, VarParsing.varType.string, "name for server (used internally)") -options.register("address", "", VarParsing.multiplicity.singleton, VarParsing.varType.string, "server address") -options.register("port", 8001, VarParsing.multiplicity.singleton, VarParsing.varType.int, "server port") -options.register("timeout", 30, VarParsing.multiplicity.singleton, VarParsing.varType.int, "timeout for requests") -options.register("params", "", VarParsing.multiplicity.singleton, VarParsing.varType.string, "json file containing server address/port") -options.register("threads", 1, VarParsing.multiplicity.singleton, VarParsing.varType.int, "number of threads") -options.register("streams", 0, VarParsing.multiplicity.singleton, VarParsing.varType.int, "number of streams") -options.register("modules", "TritonGraphProducer", VarParsing.multiplicity.list, VarParsing.varType.string, "list of modules to run (choices: {})".format(', '.join(models))) -options.register("models","gat_test", VarParsing.multiplicity.list, VarParsing.varType.string, "list of models (same length as modules, or just 1 entry if all modules use same model)") -options.register("mode","Async", VarParsing.multiplicity.singleton, VarParsing.varType.string, "mode for client (choices: {})".format(', '.join(allowed_modes))) -options.register("verbose", False, VarParsing.multiplicity.singleton, VarParsing.varType.bool, "enable verbose output") -options.register("brief", False, VarParsing.multiplicity.singleton, VarParsing.varType.bool, "briefer output for graph modules") -options.register("fallbackName", "", VarParsing.multiplicity.singleton, VarParsing.varType.string, "name for fallback server") -options.register("unittest", False, VarParsing.multiplicity.singleton, VarParsing.varType.bool, "unit test mode: reduce input sizes") -options.register("testother", False, VarParsing.multiplicity.singleton, VarParsing.varType.bool, "also test gRPC communication if shared memory enabled, or vice versa") -options.register("shm", True, VarParsing.multiplicity.singleton, VarParsing.varType.bool, "enable shared memory") -options.register("compression", "", VarParsing.multiplicity.singleton, VarParsing.varType.string, "enable I/O compression (choices: {})".format(', '.join(allowed_compression))) -options.register("ssl", False, VarParsing.multiplicity.singleton, VarParsing.varType.bool, "enable SSL authentication for server communication") -options.register("device","auto", VarParsing.multiplicity.singleton, VarParsing.varType.string, "specify device for fallback server (choices: {})".format(', '.join(allowed_devices))) -options.register("docker", False, VarParsing.multiplicity.singleton, VarParsing.varType.bool, "use Docker for fallback server") -options.register("tries", 0, VarParsing.multiplicity.singleton, VarParsing.varType.int, "number of retries for failed request") -options.parseArguments() +parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) +parser.add_argument("--maxEvents", default=-1, type=int, help="Number of events to process (-1 for all)") +parser.add_argument("--serverName", default="default", type=str, help="name for server (used internally)") +parser.add_argument("--address", default="", type=str, help="server address") +parser.add_argument("--port", default=8001, type=int, help="server port") +parser.add_argument("--timeout", default=30, type=int, help="timeout for requests") +parser.add_argument("--timeoutUnit", default="seconds", type=str, help="unit for timeout") +parser.add_argument("--params", default="", type=str, help="json file containing server address/port") +parser.add_argument("--threads", default=1, type=int, help="number of threads") +parser.add_argument("--streams", default=0, type=int, help="number of streams") +parser.add_argument("--modules", metavar=("MODULES"), default=["TritonGraphProducer"], nargs='+', type=str, choices=list(models), help="list of modules to run (choices: %(choices)s)") +parser.add_argument("--models", default=["gat_test"], nargs='+', type=str, help="list of models (same length as modules, or just 1 entry if all modules use same model)") +parser.add_argument("--mode", default="Async", type=str, choices=allowed_modes, help="mode for client") +parser.add_argument("--verbose", default=False, action="store_true", help="enable all verbose output") +parser.add_argument("--verboseClient", default=False, action="store_true", help="enable verbose output for clients") +parser.add_argument("--verboseServer", default=False, action="store_true", help="enable verbose output for server") +parser.add_argument("--verboseService", default=False, action="store_true", help="enable verbose output for TritonService") +parser.add_argument("--brief", default=False, action="store_true", help="briefer output for graph modules") +parser.add_argument("--fallbackName", default="", type=str, help="name for fallback server") +parser.add_argument("--unittest", default=False, action="store_true", help="unit test mode: reduce input sizes") +parser.add_argument("--testother", default=False, action="store_true", help="also test gRPC communication if shared memory enabled, or vice versa") +parser.add_argument("--noShm", default=False, action="store_true", help="disable shared memory") +parser.add_argument("--compression", default="", type=str, choices=allowed_compression, help="enable I/O compression") +parser.add_argument("--ssl", default=False, action="store_true", help="enable SSL authentication for server communication") +parser.add_argument("--device", default="auto", type=str.lower, choices=allowed_devices, help="specify device for fallback server") +parser.add_argument("--docker", default=False, action="store_true", help="use Docker for fallback server") +parser.add_argument("--tries", default=0, type=int, help="number of retries for failed request") +options = parser.parse_args() if len(options.params)>0: with open(options.params,'r') as pfile: @@ -51,28 +55,13 @@ # check models and modules if len(options.modules)!=len(options.models): # assigning to VarParsing.multiplicity.list actually appends to existing value(s) - if len(options.models)==1: options.models = [options.models[0]]*(len(options.modules)-1) + if len(options.models)==1: options.models = [options.models[0]]*(len(options.modules)) else: raise ValueError("Arguments for modules and models must have same length") for im,module in enumerate(options.modules): - if module not in models: - raise ValueError("Unknown module: {}".format(module)) model = options.models[im] if model not in models[module]: raise ValueError("Unsupported model {} for module {}".format(model,module)) -# check modes -if options.mode not in allowed_modes: - raise ValueError("Unknown mode: {}".format(options.mode)) - -# check compression -if len(options.compression)>0 and options.compression not in allowed_compression: - raise ValueError("Unknown compression setting: {}".format(options.compression)) - -# check devices -options.device = options.device.lower() -if options.device not in allowed_devices: - raise ValueError("Unknown device: {}".format(options.device)) - from Configuration.ProcessModifiers.enableSonicTriton_cff import enableSonicTriton process = cms.Process('tritonTest',enableSonicTriton) @@ -82,8 +71,8 @@ process.source = cms.Source("EmptySource") -process.TritonService.verbose = options.verbose -process.TritonService.fallback.verbose = options.verbose +process.TritonService.verbose = options.verbose or options.verboseService +process.TritonService.fallback.verbose = options.verbose or options.verboseServer process.TritonService.fallback.useDocker = options.docker if len(options.fallbackName)>0: process.TritonService.fallback.instanceBaseName = options.fallbackName @@ -122,12 +111,13 @@ mode = cms.string(options.mode), preferredServer = cms.untracked.string(""), timeout = cms.untracked.uint32(options.timeout), + timeoutUnit = cms.untracked.string(options.timeoutUnit), modelName = cms.string(model), modelVersion = cms.string(""), modelConfigPath = cms.FileInPath("HeterogeneousCore/SonicTriton/data/models/{}/config.pbtxt".format(model)), - verbose = cms.untracked.bool(options.verbose), + verbose = cms.untracked.bool(options.verbose or options.verboseClient), allowedTries = cms.untracked.uint32(options.tries), - useSharedMemory = cms.untracked.bool(options.shm), + useSharedMemory = cms.untracked.bool(not options.noShm), compression = cms.untracked.string(options.compression), ) ) diff --git a/HeterogeneousCore/SonicTriton/test/unittest.sh b/HeterogeneousCore/SonicTriton/test/unittest.sh index aacb1b699df33..87cab09bf4969 100755 --- a/HeterogeneousCore/SonicTriton/test/unittest.sh +++ b/HeterogeneousCore/SonicTriton/test/unittest.sh @@ -50,7 +50,7 @@ fi fallbackName=triton_server_instance_${DEVICE} tmpFile=$(mktemp -p ${LOCALTOP} SonicTritonTestXXXXXXXX.log) -cmsRun ${LOCALTOP}/src/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py modules=TritonGraphProducer,TritonGraphFilter,TritonGraphAnalyzer maxEvents=2 unittest=1 verbose=1 device=${DEVICE} testother=1 fallbackName=${fallbackName} >& $tmpFile +cmsRun ${LOCALTOP}/src/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py --modules TritonGraphProducer TritonGraphFilter TritonGraphAnalyzer --maxEvents 2 --unittest --verbose --device ${DEVICE} --testother --fallbackName ${fallbackName} >& $tmpFile CMSEXIT=$? cat $tmpFile