Skip to content

Commit fb83e32

Browse files
committed
Added missing files.
1 parent 91805e2 commit fb83e32

31 files changed

+154
-375
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Configuration file for log4cxx
3+
# can be used for unit test
4+
# by launching next command before unit tests:
5+
# export LSST_LOG_CONFIG=$HOME/.lsst/log4cxx.unittest.properties
6+
#
7+
8+
#log4j.rootLogger=INFO, CONSOLE
9+
log4j.rootLogger=DEBUG, CONSOLE
10+
#log4j.rootLogger=WARN, CONSOLE
11+
12+
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
13+
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
14+
#log4j.appender.CONSOLE.layout.ConversionPattern=[%d{yyyy-MM-ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n
15+
log4j.appender.CONSOLE.layout.ConversionPattern=[%d{ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n
16+
17+
# Tune log at the module level
18+
#log4j.logger.lsst.qserv.util=DEBUG

core/modules/loader/BufferUdp.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket,
4848

4949
// If there's something in the buffer already, get it and return.
5050
// This can happen when the previous read of socket read multiple elements.
51-
MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket&&&" + note);
51+
MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket" + note);
5252
if (msgElem != nullptr) {
5353
return msgElem;
5454
}
@@ -69,7 +69,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket,
6969

7070
/// Try to retrieve an element (there's no guarantee that an entire element got read in a single read.
7171
// Store original cursor positions so they can be restored if the read fails.
72-
msgElem = _safeRetrieve("2readFromSocket&&&" + note);
72+
msgElem = _safeRetrieve("2readFromSocket" + note);
7373
if (msgElem != nullptr) {
7474
return msgElem;
7575
}
@@ -117,11 +117,11 @@ void BufferUdp::advanceReadCursor(size_t len) {
117117
}
118118

119119

120-
std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve(std::string const& note) { // &&& delete note, maybe
120+
std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve(std::string const& note) {
121121
auto wCursorOriginal = _wCursor;
122122
auto rCursorOriginal = _rCursor;
123123
// throwOnMissing=false since missing data is possible with TCP.
124-
MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve &&&", false);
124+
MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve", false);
125125
if (msgElem != nullptr) {
126126
return msgElem;
127127
} else {

core/modules/loader/BufferUdp.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class MsgElement;
4646

4747
/// A buffer for reading and writing. Nothing can be read from the buffer until
4848
/// something has been written to it.
49-
/// TODO: rename BufferUdp is not really accurate anymore. &&&
49+
/// TODO: rename BufferUdp is not really accurate anymore.
5050
class BufferUdp {
5151
public:
5252
using Ptr = std::shared_ptr<BufferUdp>;

core/modules/loader/Central.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ void Central::_checkDoList() {
7777
while(_loop) {
7878
// Run and then sleep for a second. TODO A more advanced timer should be used
7979
doList->checkList();
80-
LOGS(_log, LOG_LVL_INFO, "&&& SLEEP");
8180
usleep(_loopSleepTime);
8281
}
8382
}

core/modules/loader/CentralClient.cc

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,36 +49,18 @@ namespace lsst {
4949
namespace qserv {
5050
namespace loader {
5151

52-
/* &&&
53-
CentralClient::CentralClient(boost::asio::io_service& ioService_,
54-
std::string const& hostName, ClientConfig const& cfg)
55-
: Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()),
56-
_hostName(hostName), _udpPort(cfg.getClientPortUdp()),
57-
_defWorkerHost(cfg.getDefWorkerHost()),
58-
_defWorkerPortUdp(cfg.getDefWorkerPortUdp()),
59-
_doListMaxLookups(cfg.getMaxLookups()),
60-
_doListMaxInserts(cfg.getMaxInserts()),
61-
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()) {
62-
}
63-
*/
52+
6453
CentralClient::CentralClient(boost::asio::io_service& ioService_,
6554
std::string const& hostName, ClientConfig const& cfg)
6655
: CentralFollower(ioService_, hostName, cfg.getMasterHost(), cfg.getMasterPortUdp(),
6756
cfg.getThreadPoolSize(),cfg.getLoopSleepTime(), cfg.getIOThreads(), cfg.getClientPortUdp()),
68-
// &&& _hostName(hostName),
69-
// &&& _udpPort(cfg.getClientPortUdp()),
7057
_defWorkerHost(cfg.getDefWorkerHost()),
7158
_defWorkerPortUdp(cfg.getDefWorkerPortUdp()),
7259
_doListMaxLookups(cfg.getMaxLookups()),
7360
_doListMaxInserts(cfg.getMaxInserts()),
7461
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()) {
7562
}
7663

77-
/* &&&
78-
void CentralClient::start() {
79-
_server = std::make_shared<ClientServer>(ioService, _hostName, _udpPort, this);
80-
}
81-
*/
8264

8365
void CentralClient::startService() {
8466
_server = std::make_shared<ClientServer>(ioService, _hostName, _udpPort, this);
@@ -92,7 +74,7 @@ CentralClient::~CentralClient() {
9274
void CentralClient::handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
9375
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyLookup");
9476

95-
auto const sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup&&& "));
77+
auto const sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup "));
9678
if (sData == nullptr) {
9779
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list");
9880
return;
@@ -128,14 +110,14 @@ void CentralClient::_handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr<pro
128110
_waitingKeyLookupMap.erase(iter);
129111
}
130112
keyLookupOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success());
131-
LOGS(_log, LOG_LVL_WARN, "&&&INFO Successful KEY_LOOKUP key=" << key << " " << chunkInfo);
113+
LOGS(_log, LOG_LVL_INFO, "Successful KEY_LOOKUP key=" << key << " " << chunkInfo);
132114
}
133115

134116

135117
void CentralClient::handleKeyInsertComplete(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
136118
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInsertComplete");
137119

138-
auto sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete&&& "));
120+
auto sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete "));
139121
if (sData == nullptr) {
140122
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to retrieve element");
141123
return;
@@ -173,7 +155,7 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique
173155
mapSize = _waitingKeyInsertMap.size();
174156
}
175157
keyInsertOneShot->keyInsertComplete();
176-
LOGS(_log, LOG_LVL_WARN, "&&&INFO Successful KEY_INSERT_COMPLETE key=" << key << " " << chunkInfo <<
158+
LOGS(_log, LOG_LVL_INFO, "Successful KEY_INSERT_COMPLETE key=" << key << " " << chunkInfo <<
177159
" mapSize=" << mapSize);
178160
}
179161

@@ -199,11 +181,10 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk,
199181
size_t sz = _waitingKeyInsertMap.size();
200182
lck.unlock();
201183
if (loopCount % 100 == 0) {
202-
LOGS(_log, LOG_LVL_INFO, "keyInsertReq waiting key=" << key <<
184+
LOGS(_log, LOG_LVL_DEBUG, "keyInsertReq waiting key=" << key <<
203185
"size=" << sz << " loopCount=" << loopCount);
204186
}
205187
// Let the CPU do something else while waiting for some requests to finish.
206-
LOGS(_log, LOG_LVL_INFO, "&&& SLEEP");
207188
usleep(_maxRequestSleepTime);
208189
++loopCount;
209190
lck.lock();
@@ -294,7 +275,6 @@ KeyInfoData::Ptr CentralClient::keyLookupReq(CompositeKey const& key) {
294275
"size=" << sz << " loopCount=" << loopCount);
295276
}
296277
// Let the CPU do something else while waiting for some requests to finish.
297-
LOGS(_log, LOG_LVL_INFO, "&&& SLEEP");
298278
usleep(_maxRequestSleepTime);
299279
sleptForMicroSec += _maxRequestSleepTime;
300280
++loopCount;

core/modules/loader/CentralClient.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,17 @@ class KeyInfoData : public util::Tracker {
6767
/// so replies to its request can be sent directly back to it.
6868
/// 'Central' provides access to the master and a DoList for handling requests.
6969
/// TODO Maybe base this on CentralWorker or have a common base class?
70-
// &&& class CentralClient : public Central {
7170
class CentralClient : public CentralFollower {
7271
public:
7372
/// The client needs to know the master's IP and its own IP.
7473
CentralClient(boost::asio::io_service& ioService_,
7574
std::string const& hostName, ClientConfig const& cfg);
7675

77-
// &&&void start();
7876
void startService() override;
7977

8078
~CentralClient() override;
8179

8280
std::string const& getHostName() const { return _hostName; }
83-
// &&& int getUdpPort() const { return _udpPort; }
8481
int getTcpPort() const { return 0; } ///< No tcp port at this time.
8582

8683
/// @return the default worker's host name.
@@ -161,17 +158,10 @@ class CentralClient : public CentralFollower {
161158
CentralClient* central;
162159
};
163160

164-
165-
/// TODO The worker IP becomes default worker as it should be able to get
166-
/// that information from the master in the future. DM-16555
167-
// &&& const std::string _hostName;
168-
// &&& const int _udpPort;
169-
170161
// If const is removed, these will need mutex protection.
171162
const std::string _defWorkerHost; ///< Default worker host
172163
const int _defWorkerPortUdp; ///< Default worker UDP port
173164

174-
175165
size_t const _doListMaxLookups = 1000; ///< Maximum number of concurrent lookups in DoList, set by config
176166
size_t const _doListMaxInserts = 1000; ///< Maximum number of concurrent inserts in DoList, set by config
177167
/// Time to sleep between checking requests when at max length, set by config

core/modules/loader/CentralFollower.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void CentralFollower::startMonitoring() {
6565

6666
bool CentralFollower::workerInfoReceive(BufferUdp::Ptr const& data) {
6767
// Open the data protobuffer and add it to our list.
68-
StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralFollower::workerInfoReceive&&& "));
68+
StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, "CentralFollower::workerInfoReceive"));
6969
if (sData == nullptr) {
7070
LOGS(_log, LOG_LVL_WARN, "CentralFollower::workerInfoRecieve Failed to parse list");
7171
return false;

core/modules/loader/CentralFollower.h

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,6 @@ class WorkerKeysInfo;
4747

4848
namespace loader {
4949

50-
51-
// &&& class CentralWorkerDoListItem;
52-
5350
/// This class is used a base central class for servers that need to get
5451
/// lists of of worker from the master.
5552
/// CentralFollower provides no service on its own. The derived classes must:
@@ -61,20 +58,6 @@ class CentralFollower : public Central {
6158
public:
6259
typedef std::pair<CompositeKey, ChunkSubchunk> CompKeyPair;
6360

64-
/* &&&
65-
enum SocketStatus {
66-
VOID0 = 0,
67-
STARTING1,
68-
ESTABLISHED2
69-
};
70-
71-
enum Direction {
72-
NONE0 = 0,
73-
TORIGHT1,
74-
FROMRIGHT2
75-
};
76-
*/
77-
7861
CentralFollower(boost::asio::io_service& ioService,
7962
std::string const& hostName_, std::string const& masterHost, int masterPortUdp,
8063
int threadPoolSize, int loopSleepTime, int ioThreads, int fPortUdp)
@@ -107,34 +90,23 @@ class CentralFollower : public Central {
10790

10891
std::string getOurLogId() const override { return "CentralFollower"; }
10992

110-
// &&& friend CentralWorkerDoListItem;
111-
11293
protected: // &&& make some or all private again
11394

11495
/// Real workers need to check this for initial ranges.
11596
virtual void checkForThisWorkerValues(uint32_t wId, std::string const& ip,
11697
int portUdp, int portTcp, KeyRange& strRange) {};
117-
/// &&& This function is needed to fill the map. On real workers, CentralWorker
118-
/// needs to do additional work to set its id.
98+
/// This function is needed to fill the map. On real workers, CentralWorker
99+
/// needs to do additional work to set its own id.
119100
void _workerInfoReceive(std::unique_ptr<proto::WorkerListItem>& protoBuf);
120101

121-
/// See workerWorkerKeysInfoReq(...)
122-
// &&& void _workerWorkerKeysInfoReq(LoaderMsg const& inMsg);
123-
124102
const std::string _hostName;
125103
const int _udpPort;
126-
// &&& const int _tcpPort;
127104

128-
WWorkerList::Ptr _wWorkerList{std::make_shared<WWorkerList>(this)}; ///< Maps of workers.
105+
/// Maps of workers with their key ranges.
106+
WWorkerList::Ptr _wWorkerList{std::make_shared<WWorkerList>(this)};
129107

130-
/// The DoListItem that makes sure _monitor() is run. &&& needs to ask master for worker map occasionally
131-
// &&& replace with item to refresh _wWorkerList (see CentralWorker::_startMonitoring)// std::shared_ptr<CentralWorkerDoListItem> _centralWorkerDoListItem;
132108
};
133109

134-
135-
136-
137-
138110
}}} // namespace lsst::qserv::loader
139111

140112
#endif // LSST_QSERV_LOADER_CENTRAL_FOLLOWER_H

core/modules/loader/CentralMaster.cc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ namespace lsst {
4343
namespace qserv {
4444
namespace loader {
4545

46-
/* &&&
47-
void CentralMaster::start() {
48-
_server = std::make_shared<MasterServer>(ioService, getMasterHostName(), getMasterPort(), this);
49-
}
50-
*/
5146
void CentralMaster::startService() {
5247
_server = std::make_shared<MasterServer>(ioService, getMasterHostName(), getMasterPort(), this);
5348
}

core/modules/loader/CentralMaster.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ class CentralMaster : public Central {
5959
_maxKeysPerWorker(cfg.getMaxKeysPerWorker()) {}
6060

6161
/// Open the UDP port. This can throw boost::system::system_error.
62-
// &&&void start();
6362
void startService() override;
6463

6564
~CentralMaster() override { _mWorkerList.reset(); }

0 commit comments

Comments
 (0)