Skip to content

Commit 9d0a07f

Browse files
author
hhakimi53
authored
Don't bandon topo update in backpressure n logging updates (#47)
1 parent ddca160 commit 9d0a07f

File tree

1 file changed

+39
-9
lines changed

1 file changed

+39
-9
lines changed

src/ClientImpl.cpp

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,34 +1112,56 @@ bool ClientImpl::drain() throw (voltdb::Exception, voltdb::NoConnectionsExceptio
11121112
class TopoUpdateCallback : public voltdb::ProcedureCallback
11131113
{
11141114
public:
1115-
TopoUpdateCallback(Distributer *dist):m_dist(dist){}
1115+
TopoUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {}
1116+
11161117
bool callback(InvocationResponse response) throw (voltdb::Exception)
11171118
{
11181119
if (response.failure()){
1119-
std::cerr << "Failure response TopoUpdateCallback::callback: " << response.statusString() << std::endl;
1120+
std::ostringstream os;
1121+
os << "Failure response TopoUpdateCallback::callback: " << response.statusString();
1122+
if (m_logger) {
1123+
m_logger->log(ClientLogger::ERROR, os.str());
1124+
}
1125+
else {
1126+
std::cerr << os.str() << std::endl;
1127+
}
11201128
return false;
11211129
}
11221130
m_dist->updateAffinityTopology(response.results());
11231131
return true;
11241132
}
1133+
1134+
bool allowAbandon() const {return false;}
1135+
11251136
private:
11261137
Distributer *m_dist;
1138+
ClientLogger *m_logger;
11271139
};
11281140

11291141
class SubscribeCallback : public voltdb::ProcedureCallback
11301142
{
11311143
public:
1132-
SubscribeCallback(){}
1144+
SubscribeCallback(ClientLogger *logger) : m_logger(logger) {}
1145+
11331146
bool callback(InvocationResponse response) throw (voltdb::Exception)
11341147
{
11351148
if (response.failure()) {
1136-
std::cerr << "Failure response SubscribeCallback::callback: " << response.statusString() << std::endl;
1149+
std::ostringstream os;
1150+
os << "Failure response SubscribeCallback::callback: " << response.statusString();
1151+
if (m_logger) {
1152+
m_logger->log(ClientLogger::ERROR, os.str());
1153+
}
1154+
else {
1155+
std::cerr << os.str()<< std::endl;
1156+
}
11371157
return false;
11381158
}
11391159
return true;
11401160
}
11411161

11421162
bool allowAbandon() const {return false;}
1163+
private:
1164+
ClientLogger *m_logger;
11431165
};
11441166

11451167
/*
@@ -1148,12 +1170,19 @@ class SubscribeCallback : public voltdb::ProcedureCallback
11481170
class ProcUpdateCallback : public voltdb::ProcedureCallback
11491171
{
11501172
public:
1151-
ProcUpdateCallback(Distributer *dist):m_dist(dist) {}
1173+
ProcUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {}
11521174

11531175
bool callback(InvocationResponse response) throw (voltdb::Exception)
11541176
{
11551177
if (response.failure()) {
1156-
std::cerr << "Failure response SubscribeCallback::callback: " << response.statusString() << std::endl;
1178+
std::ostringstream os;
1179+
os << "Failure response SubscribeCallback::callback: " << response.statusString();
1180+
if (m_logger) {
1181+
m_logger->log(ClientLogger::ERROR, os.str());
1182+
}
1183+
else {
1184+
std::cerr << os.str() << std::endl;
1185+
}
11571186
return false;
11581187
}
11591188
m_dist->updateProcedurePartitioning(response.results());
@@ -1164,6 +1193,7 @@ class ProcUpdateCallback : public voltdb::ProcedureCallback
11641193

11651194
private:
11661195
Distributer *m_dist;
1196+
ClientLogger *m_logger;
11671197
};
11681198

11691199

@@ -1176,7 +1206,7 @@ void ClientImpl::updateHashinator(){
11761206
voltdb::ParameterSet* params = systemCatalogProc.params();
11771207
params->addString("PROCEDURES");
11781208

1179-
boost::shared_ptr<ProcUpdateCallback> procCallback(new ProcUpdateCallback(&m_distributer));
1209+
boost::shared_ptr<ProcUpdateCallback> procCallback(new ProcUpdateCallback(&m_distributer, m_pLogger));
11801210
invoke(systemCatalogProc, procCallback);
11811211

11821212
parameterTypes.resize(2);
@@ -1187,7 +1217,7 @@ void ClientImpl::updateHashinator(){
11871217
params = statisticsProc.params();
11881218
params->addString("TOPO").addInt32(0);
11891219

1190-
boost::shared_ptr<TopoUpdateCallback> topoCallback(new TopoUpdateCallback(&m_distributer));
1220+
boost::shared_ptr<TopoUpdateCallback> topoCallback(new TopoUpdateCallback(&m_distributer, m_pLogger));
11911221

11921222
invoke(statisticsProc, topoCallback);
11931223
}
@@ -1200,7 +1230,7 @@ void ClientImpl::subscribeToTopologyNotifications(){
12001230
voltdb::ParameterSet* params = statisticsProc.params();
12011231
params->addString("TOPOLOGY");
12021232

1203-
boost::shared_ptr<SubscribeCallback> topoCallback(new SubscribeCallback());
1233+
boost::shared_ptr<SubscribeCallback> topoCallback(new SubscribeCallback(m_pLogger));
12041234

12051235
invoke(statisticsProc, topoCallback);
12061236
}

0 commit comments

Comments
 (0)