diff --git a/examples/subscribeServices.cpp b/examples/subscribeServices.cpp index aa7dbe2..7ad5c6b 100644 --- a/examples/subscribeServices.cpp +++ b/examples/subscribeServices.cpp @@ -28,6 +28,7 @@ int main() { //Interval for poller to check the status of subscribed services(unit:Ms), 30000 by default //Here we set it to 5000 to see the output more quick props[PropertyKeyConst::SUBSCRIPTION_POLL_INTERVAL] = "5000"; + props[PropertyKeyConst::UDP_RECEIVER_PORT] = "0"; INacosServiceFactory *factory = NacosFactoryFactory::getNacosFactory(props); ResourceGuard _guardFactory(factory); NamingService *n = factory->CreateNamingService(); diff --git a/src/factory/NacosServiceFactory.cpp b/src/factory/NacosServiceFactory.cpp index 09513fc..0402876 100644 --- a/src/factory/NacosServiceFactory.cpp +++ b/src/factory/NacosServiceFactory.cpp @@ -107,12 +107,12 @@ NamingService *NacosServiceFactory::CreateNamingService() NACOS_THROW(NacosExcep EventDispatcher *eventDispatcher = new EventDispatcher(); objectConfigData->_eventDispatcher = eventDispatcher; - SubscriptionPoller *subscriptionPoller = new SubscriptionPoller(objectConfigData); - objectConfigData->_subscriptionPoller = subscriptionPoller; - UdpNamingServiceListener *udpNamingServiceListener = new UdpNamingServiceListener(objectConfigData); objectConfigData->_udpNamingServiceListener = udpNamingServiceListener; + SubscriptionPoller *subscriptionPoller = new SubscriptionPoller(objectConfigData); + objectConfigData->_subscriptionPoller = subscriptionPoller; + HostReactor *hostReactor = new HostReactor(objectConfigData); objectConfigData->_hostReactor = hostReactor; diff --git a/src/naming/subscribe/HostReactor.cpp b/src/naming/subscribe/HostReactor.cpp index a411cf9..fd74ab7 100644 --- a/src/naming/subscribe/HostReactor.cpp +++ b/src/naming/subscribe/HostReactor.cpp @@ -4,6 +4,7 @@ #include "HostReactor.h" #include "src/json/JSON.h" +#include "src/log/Logger.h" #include "src/utils/NamingUtils.h" #include "src/naming/subscribe/EventDispatcher.h" @@ -43,6 +44,7 @@ void HostReactor::processServiceJson(const NacosString &json) { if (newServiceInfo) { changeAdvice.added = true; changeAdvice.newServiceInfo = serviceInfo; + log_debug("processServiceJson_key: %s\n", key.c_str()); _objectConfigData->_eventDispatcher->notifyDirectly(changeAdvice); } else {//service info is updated ChangeAdvice::compareChange(oldServiceInfo, serviceInfo, changeAdvice); diff --git a/src/naming/subscribe/SubscriptionPoller.cpp b/src/naming/subscribe/SubscriptionPoller.cpp index 579ee4f..df4c2b6 100644 --- a/src/naming/subscribe/SubscriptionPoller.cpp +++ b/src/naming/subscribe/SubscriptionPoller.cpp @@ -14,6 +14,9 @@ SubscriptionPoller::SubscriptionPoller(ObjectConfigData *objectConfigData) _pollingInterval = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::SUBSCRIPTION_POLL_INTERVAL).c_str()); _udpPort = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::UDP_RECEIVER_PORT).c_str()); _started = false; + + pthread_mutex_init(&_pollMutex, NULL); + pthread_cond_init(&_pollCond, NULL); } SubscriptionPoller::~SubscriptionPoller() @@ -26,6 +29,9 @@ SubscriptionPoller::~SubscriptionPoller() delete _pollingThread; _pollingThread = NULL; } + + pthread_mutex_destroy(&_pollMutex); + pthread_cond_destroy(&_pollCond); } bool SubscriptionPoller::addPollItem(const NacosString &serviceName, const NacosString &groupName, const NacosString &clusters) @@ -37,14 +43,23 @@ bool SubscriptionPoller::addPollItem(const NacosString &serviceName, const Nacos NacosString name = NamingUtils::getGroupedName(serviceName, groupName); NacosString key = ServiceInfo::getKey(name, clusters); + bool added = false; { WriteGuard __writeGuard(rwLock); if (pollingList.count(key) > 0) { return false; } pollingList[key] = pd; - return true; + added = true; } + + if (added) { + pthread_mutex_lock(&_pollMutex); + pthread_cond_signal(&_pollCond); + pthread_mutex_unlock(&_pollMutex); + } + + return added; } bool SubscriptionPoller::removePollItem(const NacosString &serviceName, const NacosString &groupName, const NacosString &clusters) @@ -95,8 +110,22 @@ void *SubscriptionPoller::pollingThreadFunc(void *parm) log_debug("Copied polling list, size = %d\n", copiedList.size()); } if (copiedList.empty()) { - log_debug("PollingList is empty, hibernating...\n", copiedList.size()); - sleep(thisObj->_pollingInterval / 1000); + log_debug("PollingList is empty, waiting for new items...\n"); + pthread_mutex_lock(&thisObj->_pollMutex); + + { + ReadGuard __readGuard(thisObj->rwLock); + if (!thisObj->pollingList.empty()) { + pthread_mutex_unlock(&thisObj->_pollMutex); + continue; + } + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += thisObj->_pollingInterval / 1000; + pthread_cond_timedwait(&thisObj->_pollCond, &thisObj->_pollMutex, &ts); + pthread_mutex_unlock(&thisObj->_pollMutex); continue; } @@ -130,7 +159,21 @@ void *SubscriptionPoller::pollingThreadFunc(void *parm) } log_debug("Polling process finished, hibernating...\n"); - sleep(thisObj->_pollingInterval / 1000); + pthread_mutex_lock(&thisObj->_pollMutex); + + { + ReadGuard __readGuard(thisObj->rwLock); + if (thisObj->pollingList.size() != copiedList.size()) { + pthread_mutex_unlock(&thisObj->_pollMutex); + continue; + } + } + + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += thisObj->_pollingInterval / 1000; + pthread_cond_timedwait(&thisObj->_pollCond, &thisObj->_pollMutex, &ts); + pthread_mutex_unlock(&thisObj->_pollMutex); } log_debug("Polling thread for NamingService exited normally.\n"); return NULL; diff --git a/src/naming/subscribe/SubscriptionPoller.h b/src/naming/subscribe/SubscriptionPoller.h index 20e1139..d32c816 100644 --- a/src/naming/subscribe/SubscriptionPoller.h +++ b/src/naming/subscribe/SubscriptionPoller.h @@ -10,6 +10,7 @@ #include "src/naming/NamingProxy.h" #include "EventDispatcher.h" #include "src/factory/ObjectConfigData.h" +#include namespace nacos{ struct PollingData @@ -29,6 +30,9 @@ class SubscriptionPoller volatile bool _started; ObjectConfigData *_objectConfigData; + pthread_cond_t _pollCond; + pthread_mutex_t _pollMutex; + SubscriptionPoller(); static void *pollingThreadFunc(void *parm); diff --git a/src/naming/subscribe/UdpNamingServiceListener.cpp b/src/naming/subscribe/UdpNamingServiceListener.cpp index 245db1a..bdf3c27 100644 --- a/src/naming/subscribe/UdpNamingServiceListener.cpp +++ b/src/naming/subscribe/UdpNamingServiceListener.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -15,7 +16,7 @@ using namespace std; namespace nacos { -void UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosException) { +int UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosException) { log_debug("in thread UdpNamingServiceListener::initializeUdpListener()\n"); // Creating socket file descriptor if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { @@ -27,7 +28,7 @@ void UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosExceptio // Filling client information cliaddr.sin_family = AF_INET; // IPv4 cliaddr.sin_addr.s_addr = INADDR_ANY; - cliaddr.sin_port = htons(udpReceiverPort); + cliaddr.sin_port = htons(0); log_debug("udp receiver port = %d\n", cliaddr.sin_port); // Bind the socket with the server address @@ -37,7 +38,17 @@ void UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosExceptio throw NacosException(NacosException::UNABLE_TO_CREATE_SOCKET, "Unable to bind"); } + // 获取实际绑定的端口号 + struct sockaddr_in bound_addr; + socklen_t addr_len = sizeof(bound_addr); + if (getsockname(sockfd, (struct sockaddr*)&bound_addr, &addr_len) < 0) { + perror("getsockname 失败"); + close(sockfd); + exit(EXIT_FAILURE); + } + log_debug("socket bound\n"); + return ntohs(bound_addr.sin_port); } bool UdpNamingServiceListener::unGzip(char *inBuffer, size_t inSize) { @@ -85,7 +96,7 @@ bool UdpNamingServiceListener::unGzip(char *inBuffer, size_t inSize) { void *UdpNamingServiceListener::listenerThreadFunc(void *param) { UdpNamingServiceListener *thisObj = (UdpNamingServiceListener*)param; log_debug("in thread UdpNamingServiceListener::listenerThreadFunc()\n"); - thisObj->initializeUdpListener(); + // thisObj->initializeUdpListener(); while (thisObj->_started) { int ret;//also data_len @@ -158,7 +169,9 @@ UdpNamingServiceListener::UdpNamingServiceListener(ObjectConfigData *objectConfi _listenerThread = NULL; _started = false; _objectConfigData = objectConfigData; - udpReceiverPort = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::UDP_RECEIVER_PORT).c_str()); + int udp_port = initializeUdpListener(); + // udpReceiverPort = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::UDP_RECEIVER_PORT).c_str()); + objectConfigData->_appConfigManager->set(PropertyKeyConst::UDP_RECEIVER_PORT, NacosStringOps::valueOf(udp_port)); log_debug("udpReceiverPort is %d\n", udpReceiverPort); _listenerThread = new Thread(objectConfigData->name + "UDPListener", listenerThreadFunc, (void*)this); } diff --git a/src/naming/subscribe/UdpNamingServiceListener.h b/src/naming/subscribe/UdpNamingServiceListener.h index 6252dcd..d34b54f 100644 --- a/src/naming/subscribe/UdpNamingServiceListener.h +++ b/src/naming/subscribe/UdpNamingServiceListener.h @@ -35,7 +35,7 @@ class UdpNamingServiceListener { char uncompressedData[UDP_MSS * 10]; Thread *_listenerThread; - void initializeUdpListener() NACOS_THROW(NacosException); + int initializeUdpListener() NACOS_THROW(NacosException); static void *listenerThreadFunc(void *param); bool unGzip(char *inBuffer, size_t inSize); public: