diff --git a/HeterogeneousCore/SonicCore/BuildFile.xml b/HeterogeneousCore/SonicCore/BuildFile.xml
index b0d5e2a08b98f..5208c91638f37 100644
--- a/HeterogeneousCore/SonicCore/BuildFile.xml
+++ b/HeterogeneousCore/SonicCore/BuildFile.xml
@@ -2,6 +2,7 @@
+
diff --git a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h
new file mode 100644
index 0000000000000..e3fc0bbb8af9a
--- /dev/null
+++ b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h
@@ -0,0 +1,35 @@
+#ifndef HeterogeneousCore_SonicCore_RetryActionBase
+#define HeterogeneousCore_SonicCore_RetryActionBase
+
+#include "FWCore/PluginManager/interface/PluginFactory.h"
+#include "FWCore/ParameterSet/interface/ParameterSet.h"
+#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h"
+#include
+#include
+
+// Base class for retry actions
+class RetryActionBase {
+public:
+ RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client);
+ virtual ~RetryActionBase() = default;
+
+ bool shouldRetry() const { return shouldRetry_; } // Getter for shouldRetry_
+
+ virtual void retry() = 0; // Pure virtual function for execution logic
+ virtual void start() = 0; // Pure virtual function for execution logic for initialization
+
+protected:
+ void eval(); // interface for calling evaluate in client
+
+protected:
+ SonicClientBase* client_;
+ bool shouldRetry_; // Flag to track if further retries should happen
+};
+
+// Define the factory for creating retry actions
+using RetryActionFactory =
+ edmplugin::PluginFactory;
+
+#endif
+
+#define DEFINE_RETRY_ACTION(type) DEFINE_EDM_PLUGIN(RetryActionFactory, type, #type);
diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientBase.h b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h
index 47caaae8b2052..45a089701ed12 100644
--- a/HeterogeneousCore/SonicCore/interface/SonicClientBase.h
+++ b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h
@@ -9,12 +9,15 @@
#include "HeterogeneousCore/SonicCore/interface/SonicDispatcherPseudoAsync.h"
#include
+#include
#include
#include
#include
enum class SonicMode { Sync = 1, Async = 2, PseudoAsync = 3 };
+class RetryActionBase;
+
class SonicClientBase {
public:
//constructor
@@ -54,14 +57,23 @@ class SonicClientBase {
SonicMode mode_;
bool verbose_;
std::unique_ptr dispatcher_;
- unsigned allowedTries_, tries_;
+ unsigned totalTries_;
std::optional holder_;
+ // Use a unique_ptr with a custom deleter to avoid incomplete type issues
+ struct RetryDeleter {
+ void operator()(RetryActionBase* ptr) const;
+ };
+
+ using RetryActionPtr = std::unique_ptr;
+ std::vector retryActions_;
+
//for logging/debugging
std::string debugName_, clientName_, fullDebugName_;
friend class SonicDispatcher;
friend class SonicDispatcherPseudoAsync;
+ friend class RetryActionBase;
};
#endif
diff --git a/HeterogeneousCore/SonicCore/plugins/BuildFile.xml b/HeterogeneousCore/SonicCore/plugins/BuildFile.xml
new file mode 100644
index 0000000000000..0ecf2187a0f82
--- /dev/null
+++ b/HeterogeneousCore/SonicCore/plugins/BuildFile.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/HeterogeneousCore/SonicCore/plugins/RetrySameServerAction.cc b/HeterogeneousCore/SonicCore/plugins/RetrySameServerAction.cc
new file mode 100644
index 0000000000000..9877013b93d5b
--- /dev/null
+++ b/HeterogeneousCore/SonicCore/plugins/RetrySameServerAction.cc
@@ -0,0 +1,30 @@
+#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h"
+#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h"
+
+class RetrySameServerAction : public RetryActionBase {
+public:
+ RetrySameServerAction(const edm::ParameterSet& pset, SonicClientBase* client)
+ : RetryActionBase(pset, client), allowedTries_(pset.getUntrackedParameter("allowedTries", 0)) {}
+
+ void start() override { tries_ = 0; };
+
+protected:
+ void retry() override;
+
+private:
+ unsigned allowedTries_, tries_;
+};
+
+void RetrySameServerAction::retry() {
+ ++tries_;
+ //if max retries has not been exceeded, call evaluate again
+ if (tries_ < allowedTries_) {
+ eval();
+ return;
+ } else {
+ shouldRetry_ = false; // Flip flag when max retries are reached
+ edm::LogInfo("RetrySameServerAction") << "Max retry attempts reached. No further retries.";
+ }
+}
+
+DEFINE_RETRY_ACTION(RetrySameServerAction)
diff --git a/HeterogeneousCore/SonicCore/src/RetryActionBase.cc b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc
new file mode 100644
index 0000000000000..41b9a6186da2b
--- /dev/null
+++ b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc
@@ -0,0 +1,15 @@
+#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h"
+
+// Constructor implementation
+RetryActionBase::RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client)
+ : client_(client), shouldRetry_(true) {}
+
+void RetryActionBase::eval() {
+ if (client_) {
+ client_->evaluate();
+ } else {
+ edm::LogError("RetryActionBase") << "Client pointer is null, cannot evaluate.";
+ }
+}
+
+EDM_REGISTER_PLUGINFACTORY(RetryActionFactory, "RetryActionFactory");
diff --git a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc
index 745c51f17aaf3..9949d9d1f2ea2 100644
--- a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc
+++ b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc
@@ -1,18 +1,33 @@
#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h"
+#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/ParameterSet/interface/allowedValues.h"
+// Custom deleter implementation
+void SonicClientBase::RetryDeleter::operator()(RetryActionBase* ptr) const { delete ptr; }
+
SonicClientBase::SonicClientBase(const edm::ParameterSet& params,
const std::string& debugName,
const std::string& clientName)
- : allowedTries_(params.getUntrackedParameter("allowedTries", 0)),
- debugName_(debugName),
- clientName_(clientName),
- fullDebugName_(debugName_) {
+ : debugName_(debugName), clientName_(clientName), fullDebugName_(debugName_) {
if (!clientName_.empty())
fullDebugName_ += ":" + clientName_;
+ const auto& retryPSetList = params.getParameter>("Retry");
std::string modeName(params.getParameter("mode"));
+
+ for (const auto& retryPSet : retryPSetList) {
+ const std::string& actionType = retryPSet.getParameter("retryType");
+
+ auto retryAction = RetryActionFactory::get()->create(actionType, retryPSet, this);
+ if (retryAction) {
+ //Convert to RetryActionPtr Type from raw pointer of retryAction
+ retryActions_.emplace_back(RetryActionPtr(retryAction.release()));
+ } else {
+ throw cms::Exception("Configuration") << "Unknown Retry type " << actionType << " for SonicClient: " << modeName;
+ }
+ }
+
if (modeName == "Sync")
setMode(SonicMode::Sync);
else if (modeName == "Async")
@@ -40,24 +55,30 @@ void SonicClientBase::start(edm::WaitingTaskWithArenaHolder holder) {
holder_ = std::move(holder);
}
-void SonicClientBase::start() { tries_ = 0; }
+void SonicClientBase::start() {
+ totalTries_ = 0;
+ // initialize all actions
+ for (const auto& action : retryActions_) {
+ action->start();
+ }
+}
void SonicClientBase::finish(bool success, std::exception_ptr eptr) {
//retries are only allowed if no exception was raised
if (!success and !eptr) {
- ++tries_;
- //if max retries has not been exceeded, call evaluate again
- if (tries_ < allowedTries_) {
- evaluate();
- //avoid calling doneWaiting() twice
- return;
- }
- //prepare an exception if exceeded
- else {
- edm::Exception ex(edm::errors::ExternalFailure);
- ex << "SonicCallFailed: call failed after max " << tries_ << " tries";
- eptr = make_exception_ptr(ex);
+ ++totalTries_;
+ for (const auto& action : retryActions_) {
+ if (action->shouldRetry()) {
+ action->retry(); // Call retry only if shouldRetry_ is true
+ return;
+ }
}
+ //prepare an exception if no more retries left
+ edm::LogInfo("SonicClientBase") << "SonicCallFailed: call failed, no retry actions available after " << totalTries_
+ << " tries.";
+ edm::Exception ex(edm::errors::ExternalFailure);
+ ex << "SonicCallFailed: call failed, no retry actions available after " << totalTries_ << " tries.";
+ eptr = make_exception_ptr(ex);
}
if (holder_) {
holder_->doneWaiting(eptr);
@@ -74,7 +95,20 @@ void SonicClientBase::fillBasePSetDescription(edm::ParameterSetDescription& desc
//restrict allowed values
desc.ifValue(edm::ParameterDescription("mode", "PseudoAsync", true),
edm::allowedValues("Sync", "Async", "PseudoAsync"));
- if (allowRetry)
- desc.addUntracked("allowedTries", 0);
+ if (allowRetry) {
+ // Defines the structure of each entry in the VPSet
+ edm::ParameterSetDescription retryDesc;
+ retryDesc.add("retryType", "RetrySameServerAction");
+ retryDesc.addUntracked("allowedTries", 0);
+
+ // Define a default retry action
+ edm::ParameterSet defaultRetry;
+ defaultRetry.addParameter("retryType", "RetrySameServerAction");
+ defaultRetry.addUntrackedParameter("allowedTries", 0);
+
+ // Add the VPSet with the default retry action
+ desc.addVPSet("Retry", retryDesc, {defaultRetry});
+ }
+ desc.add("sonicClientBase", desc);
desc.addUntracked("verbose", false);
}
diff --git a/HeterogeneousCore/SonicCore/test/BuildFile.xml b/HeterogeneousCore/SonicCore/test/BuildFile.xml
index 04b2bcb20df2f..11e8f860b5818 100644
--- a/HeterogeneousCore/SonicCore/test/BuildFile.xml
+++ b/HeterogeneousCore/SonicCore/test/BuildFile.xml
@@ -1,6 +1,6 @@
-
-
+
+
diff --git a/HeterogeneousCore/SonicCore/test/DummyClient.h b/HeterogeneousCore/SonicCore/test/DummyClient.h
index ccef888ad9f7d..6504843926c0a 100644
--- a/HeterogeneousCore/SonicCore/test/DummyClient.h
+++ b/HeterogeneousCore/SonicCore/test/DummyClient.h
@@ -36,7 +36,7 @@ class DummyClient : public SonicClient {
this->output_ = this->input_ * factor_;
//simulate a failure
- if (this->tries_ < fails_)
+ if (this->totalTries_ < fails_)
this->finish(false);
else
this->finish(true);
diff --git a/HeterogeneousCore/SonicCore/test/sonicTestAna_cfg.py b/HeterogeneousCore/SonicCore/test/sonicTestAna_cfg.py
index 11c23c6cdfcc9..b8b66db34abd9 100644
--- a/HeterogeneousCore/SonicCore/test/sonicTestAna_cfg.py
+++ b/HeterogeneousCore/SonicCore/test/sonicTestAna_cfg.py
@@ -1,4 +1,5 @@
import FWCore.ParameterSet.Config as cms
+from FWCore.ParameterSet.VarParsing import VarParsing
process = cms.Process("Test")
diff --git a/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py
index 614297d86e3bb..bf7b44cb01519 100644
--- a/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py
+++ b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py
@@ -1,11 +1,13 @@
import FWCore.ParameterSet.Config as cms
-from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+from FWCore.ParameterSet.VarParsing import VarParsing
-_allowedModuleTypes = ["Producer","Filter"]
-parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
-parser.add_argument("--moduleType", type=str, required=True, choices=_allowedModuleTypes, help="Type of module to test")
-options = parser.parse_args()
+options = VarParsing()
+options.register("moduleType","", VarParsing.multiplicity.singleton, VarParsing.varType.string)
+options.parseArguments()
+_allowedModuleTypes = ["Producer","Filter"]
+if options.moduleType not in ["Producer","Filter"]:
+ raise ValueError("Unknown module type: {} (allowed: {})".format(options.moduleType,_allowedModuleTypes))
_moduleName = "SonicDummy"+options.moduleType
_moduleClass = getattr(cms,"ED"+options.moduleType)
@@ -17,15 +19,19 @@
process.options.numberOfThreads = 2
process.options.numberOfStreams = 0
-
process.dummySync = _moduleClass(_moduleName,
input = cms.int32(1),
Client = cms.PSet(
mode = cms.string("Sync"),
factor = cms.int32(-1),
wait = cms.int32(10),
- allowedTries = cms.untracked.uint32(0),
fails = cms.uint32(0),
+ Retry = cms.VPSet(
+ cms.PSet(
+ retryType = cms.string('RetrySameServerAction'),
+ allowedTries = cms.untracked.uint32(0)
+ )
+ )
),
)
@@ -35,8 +41,14 @@
mode = cms.string("PseudoAsync"),
factor = cms.int32(2),
wait = cms.int32(10),
- allowedTries = cms.untracked.uint32(0),
fails = cms.uint32(0),
+ Retry = cms.VPSet(
+ cms.PSet(
+ retryType = cms.string('RetrySameServerAction'),
+ allowedTries = cms.untracked.uint32(0)
+ )
+ )
+
),
)
@@ -46,32 +58,53 @@
mode = cms.string("Async"),
factor = cms.int32(5),
wait = cms.int32(10),
- allowedTries = cms.untracked.uint32(0),
fails = cms.uint32(0),
+ Retry = cms.VPSet(
+ cms.PSet(
+ retryType = cms.string('RetrySameServerAction'),
+ allowedTries = cms.untracked.uint32(0)
+ )
+ )
),
)
process.dummySyncRetry = process.dummySync.clone(
Client = dict(
wait = 2,
- allowedTries = 2,
fails = 1,
+ Retry = cms.VPSet(
+ cms.PSet(
+ retryType = cms.string('RetrySameServerAction'),
+ allowedTries = cms.untracked.uint32(2)
+ )
+ )
+
)
)
process.dummyPseudoAsyncRetry = process.dummyPseudoAsync.clone(
Client = dict(
wait = 2,
- allowedTries = 2,
fails = 1,
+ Retry = cms.VPSet(
+ cms.PSet(
+ retryType = cms.string('RetrySameServerAction'),
+ allowedTries = cms.untracked.uint32(2)
+ )
+ )
)
)
process.dummyAsyncRetry = process.dummyAsync.clone(
Client = dict(
wait = 2,
- allowedTries = 2,
fails = 1,
+ Retry = cms.VPSet(
+ cms.PSet(
+ allowedTries = cms.untracked.uint32(2),
+ retryType = cms.string('RetrySameServerAction')
+ )
+ )
)
)
diff --git a/HeterogeneousCore/SonicTriton/BuildFile.xml b/HeterogeneousCore/SonicTriton/BuildFile.xml
index b93d51e711e87..4af38d69d89e9 100644
--- a/HeterogeneousCore/SonicTriton/BuildFile.xml
+++ b/HeterogeneousCore/SonicTriton/BuildFile.xml
@@ -10,6 +10,7 @@
+
-
+
diff --git a/HeterogeneousCore/SonicTriton/README.md b/HeterogeneousCore/SonicTriton/README.md
index 88058ed88289b..488566c937caf 100644
--- a/HeterogeneousCore/SonicTriton/README.md
+++ b/HeterogeneousCore/SonicTriton/README.md
@@ -71,7 +71,7 @@ There are specific local input and output containers that should be used in prod
Here, `T` is a primitive type, and the two aliases listed below are passed to `TritonInputData::toServer()`
and returned by `TritonOutputData::fromServer()`, respectively:
* `TritonInputContainer = std::shared_ptr> = std::shared_ptr>>`
-* `TritonOutput = std::vector>`
+* `TritonOutput = std::vector>`
The `TritonInputContainer` object should be created using the helper function described below.
It expects one vector per batch entry (i.e. the size of the outer vector is the batch size (rectangular case) or number of entries (ragged case)).
diff --git a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h
new file mode 100644
index 0000000000000..af7720b90cb0b
--- /dev/null
+++ b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h
@@ -0,0 +1,32 @@
+#ifndef HeterogeneousCore_SonicTriton_RetryActionDiffServer_h
+#define HeterogeneousCore_SonicTriton_RetryActionDiffServer_h
+
+#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h"
+
+/**
+ * @class RetryActionDiffServer
+ * @brief A concrete implementation of RetryActionBase that attempts to retry an inference
+ * request on a different Triton server.
+ *
+ * This class provides a fallback mechanism. If an initial inference request fails
+ * (e.g., due to server unavailability or a model-specific error), this action will be
+ * triggered. It queries the central TritonService to select an alternative server (e.g.,
+ * the fallback server when available) and instructs the TritonClient to reconnect to
+ * that server for the retry attempt. This action is designed for one-time use per
+ * inference call; after the retry attempt, it disables itself until the next `start()`
+ * call.
+ */
+
+class RetryActionDiffServer : public RetryActionBase {
+public:
+ RetryActionDiffServer(const edm::ParameterSet& conf, SonicClientBase* client);
+ ~RetryActionDiffServer() override = default;
+
+ void retry() override;
+ void start() override;
+
+private:
+};
+
+#endif
+
diff --git a/HeterogeneousCore/SonicTriton/interface/TritonClient.h b/HeterogeneousCore/SonicTriton/interface/TritonClient.h
index df8f9b559427c..9e21b646508e9 100644
--- a/HeterogeneousCore/SonicTriton/interface/TritonClient.h
+++ b/HeterogeneousCore/SonicTriton/interface/TritonClient.h
@@ -1,102 +1,107 @@
-#ifndef HeterogeneousCore_SonicTriton_TritonClient
-#define HeterogeneousCore_SonicTriton_TritonClient
-
-#include "FWCore/ParameterSet/interface/ParameterSet.h"
-#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
-#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
-#include "HeterogeneousCore/SonicCore/interface/SonicClient.h"
-#include "HeterogeneousCore/SonicTriton/interface/TritonData.h"
-#include "HeterogeneousCore/SonicTriton/interface/TritonService.h"
-
-#include