3636#include " loader/WorkerConfig.h"
3737#include " proto/loader.pb.h"
3838#include " proto/ProtoImporter.h"
39+ #include " util/Timer.h" // &&&
3940
4041
4142// LSST headers
@@ -126,6 +127,7 @@ void CentralWorker::_monitor() {
126127 usleep (500000 );
127128 return ;
128129 }
130+ LOGS (_log, LOG_LVL_WARN, " &&& CentralWorker::_monitor A" );
129131
130132 // If data gets shifted, check everything again as ranges will have
131133 // changed and there may be a lot more data to shift.
@@ -191,6 +193,7 @@ void CentralWorker::_monitor() {
191193 _sendWorkerKeysInfo (masterAddr, getNextMsgId ());
192194 }
193195 } while (dataShifted);
196+ LOGS (_log, LOG_LVL_WARN, " &&& CentralWorker::_monitor Z" );
194197}
195198
196199
@@ -667,80 +670,6 @@ void CentralWorker::cancelShiftsWithLeftNeighbor() {
667670 }
668671}
669672
670- /* &&&
671- bool CentralWorker::workerInfoReceive(BufferUdp::Ptr const& data) {
672- // Open the data protobuffer and add it to our list.
673- StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralWorker::workerInfoReceive&&& "));
674- if (sData == nullptr) {
675- LOGS(_log, LOG_LVL_WARN, "CentralWorker::workerInfoRecieve Failed to parse list");
676- return false;
677- }
678- std::unique_ptr<proto::WorkerListItem> protoList = sData->protoParse<proto::WorkerListItem>();
679- if (protoList == nullptr) {
680- LOGS(_log, LOG_LVL_WARN, "CentralWorker::workerInfoRecieve Failed to parse list");
681- return false;
682- }
683-
684- // TODO: move this call to another thread
685- _workerInfoReceive(protoList);
686- return true;
687- }
688- */
689-
690- /* &&&
691- void CentralWorker::_workerInfoReceive(std::unique_ptr<proto::WorkerListItem>& protoL) {
692- std::unique_ptr<proto::WorkerListItem> protoList(std::move(protoL));
693-
694- // Check the information, if it is our network address, set or check our id.
695- // Then compare it with the map, adding new/changed information.
696- uint32_t wId = protoList->wid();
697- std::string ipUdp("");
698- int portUdp = 0;
699- int portTcp = 0;
700- if (protoList->has_address()) {
701- proto::LdrNetAddress protoAddr = protoList->address();
702- ipUdp = protoAddr.ip();
703- portUdp = protoAddr.udpport();
704- portTcp = protoAddr.tcpport();
705- }
706- KeyRange strRange;
707- if (protoList->has_range()) {
708- proto::WorkerRange protoRange = protoList->range();
709- bool valid = protoRange.valid();
710- if (valid) {
711- CompositeKey min(protoRange.minint(), protoRange.minstr());
712- CompositeKey max(protoRange.maxint(), protoRange.maxstr());
713- bool unlimited = protoRange.maxunlimited();
714- strRange.setMinMax(min, max, unlimited);
715- }
716- }
717-
718- // If the address matches ours, check the name.
719- if (getHostName() == ipUdp && getUdpPort() == portUdp) {
720- if (_isOurIdInvalid()) {
721- LOGS(_log, LOG_LVL_INFO, "Setting our name " << wId);
722- _setOurId(wId);
723- } else if (getOurId() != wId) {
724- LOGS(_log, LOG_LVL_ERROR, "Our wId doesn't match address from master! wId=" <<
725- getOurId() << " from master=" << wId);
726- }
727-
728- // It is this worker. If there is a valid range in the message and our range is not valid,
729- // take the range given as our own.
730- if (strRange.getValid()) {
731- std::lock_guard<std::mutex> lckM(_idMapMtx);
732- if (not _keyRange.getValid()) {
733- LOGS(_log, LOG_LVL_INFO, "Setting our range " << strRange);
734- _keyRange.setMinMax(strRange.getMin(), strRange.getMax(), strRange.getUnlimited());
735- }
736- }
737- }
738-
739- // Make/update entry in map.
740- _wWorkerList->updateEntry(wId, ipUdp, portUdp, portTcp, strRange);
741- }
742- */
743-
744673
745674void CentralWorker::checkForThisWorkerValues (uint32_t wId, std::string const & ip,
746675 int portUdp, int portTcp, KeyRange& strRange) {
@@ -826,6 +755,9 @@ bool CentralWorker::workerKeyInsertReq(LoaderMsg const& inMsg, BufferUdp::Ptr co
826755}
827756
828757
758+ util::Timer lastInsertTimer; // &&&
759+ std::mutex lastInsertTimerMtx; // &&&
760+
829761void CentralWorker::_workerKeyInsertReq (LoaderMsg const & inMsg, std::unique_ptr<proto::KeyInfoInsert>& protoBuf) {
830762 std::unique_ptr<proto::KeyInfoInsert> protoData (std::move (protoBuf));
831763
@@ -850,7 +782,17 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr<
850782 // Element already found, check file id and row number. Bad if not the same.
851783 // TODO HIGH send back duplicate key mismatch message to the original requester and return
852784 }
853- LOGS (_log, LOG_LVL_INFO, " Key inserted=" << key << " (" << chunkInfo << " )" );
785+ {
786+ std::lock_guard<std::mutex> tLg (lastInsertTimerMtx);
787+ lastInsertTimer.stop ();
788+ auto elapsedInsert = lastInsertTimer.getElapsed ();
789+ if (elapsedInsert > 0.5 ) {
790+ LOGS (_log, LOG_LVL_ERROR, " &&& Longdelay key=" << key << " dlay=" << elapsedInsert);
791+ }
792+ // &&& LOGS(_log, LOG_LVL_INFO, "Key inserted=" << key << "(" << chunkInfo << ")");
793+ LOGS (_log, LOG_LVL_WARN, " &&&INFO Key inserted=" << key << " (" << chunkInfo << " ) dlay=" << elapsedInsert);
794+ lastInsertTimer.start ();
795+ }
854796 // TODO Send this item to the keyLogger (which would then send KEY_INSERT_COMPLETE back to the requester),
855797 // for now this function will send the message back for proof of concept.
856798 LoaderMsg msg (LoaderMsg::KEY_INSERT_COMPLETE, inMsg.msgId ->element , getHostName (), getUdpPort ());
@@ -898,10 +840,10 @@ void CentralWorker::_forwardKeyInsertRequest(NetworkAddress const& targetAddr, L
898840 // The proto buffer should be the same, just need a new message.
899841 int hops = protoData->hops () + 1 ;
900842 if (hops > 4 ) { // TODO replace magic number with variable set via config file.
901- LOGS (_log, LOG_LVL_INFO , " Too many hops, dropping insert request hops=" << hops << " key=" << key);
843+ LOGS (_log, LOG_LVL_WARN , " Too many hops, dropping insert request hops=" << hops << " key=" << key);
902844 return ;
903845 }
904- LOGS (_log, LOG_LVL_INFO , " Forwarding key insert hops=" << hops << " key=" << key);
846+ LOGS (_log, LOG_LVL_WARN , " &&&INFO Forwarding key insert hops=" << hops << " key=" << key);
905847 LoaderMsg msg (LoaderMsg::KEY_INSERT_REQ, inMsg.msgId ->element , getHostName (), getUdpPort ());
906848 BufferUdp msgData;
907849 msg.appendToData (msgData);
0 commit comments