Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/subscribeServices.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ int main() {
//Interval for poller to check the status of subscribed services(unit:Ms), 30000 by default
//Here we set it to 5000 to see the output more quick
props[PropertyKeyConst::SUBSCRIPTION_POLL_INTERVAL] = "5000";
props[PropertyKeyConst::UDP_RECEIVER_PORT] = "0";
INacosServiceFactory *factory = NacosFactoryFactory::getNacosFactory(props);
ResourceGuard <INacosServiceFactory> _guardFactory(factory);
NamingService *n = factory->CreateNamingService();
Expand Down
6 changes: 3 additions & 3 deletions src/factory/NacosServiceFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ NamingService *NacosServiceFactory::CreateNamingService() NACOS_THROW(NacosExcep
EventDispatcher *eventDispatcher = new EventDispatcher();
objectConfigData->_eventDispatcher = eventDispatcher;

SubscriptionPoller *subscriptionPoller = new SubscriptionPoller(objectConfigData);
objectConfigData->_subscriptionPoller = subscriptionPoller;

UdpNamingServiceListener *udpNamingServiceListener = new UdpNamingServiceListener(objectConfigData);
objectConfigData->_udpNamingServiceListener = udpNamingServiceListener;

SubscriptionPoller *subscriptionPoller = new SubscriptionPoller(objectConfigData);
objectConfigData->_subscriptionPoller = subscriptionPoller;

HostReactor *hostReactor = new HostReactor(objectConfigData);
objectConfigData->_hostReactor = hostReactor;

Expand Down
2 changes: 2 additions & 0 deletions src/naming/subscribe/HostReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "HostReactor.h"
#include "src/json/JSON.h"
#include "src/log/Logger.h"
#include "src/utils/NamingUtils.h"
#include "src/naming/subscribe/EventDispatcher.h"

Expand Down Expand Up @@ -43,6 +44,7 @@ void HostReactor::processServiceJson(const NacosString &json) {
if (newServiceInfo) {
changeAdvice.added = true;
changeAdvice.newServiceInfo = serviceInfo;
log_debug("processServiceJson_key: %s\n", key.c_str());
_objectConfigData->_eventDispatcher->notifyDirectly(changeAdvice);
} else {//service info is updated
ChangeAdvice::compareChange(oldServiceInfo, serviceInfo, changeAdvice);
Expand Down
51 changes: 47 additions & 4 deletions src/naming/subscribe/SubscriptionPoller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ SubscriptionPoller::SubscriptionPoller(ObjectConfigData *objectConfigData)
_pollingInterval = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::SUBSCRIPTION_POLL_INTERVAL).c_str());
_udpPort = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::UDP_RECEIVER_PORT).c_str());
_started = false;

pthread_mutex_init(&_pollMutex, NULL);
pthread_cond_init(&_pollCond, NULL);
}

SubscriptionPoller::~SubscriptionPoller()
Expand All @@ -26,6 +29,9 @@ SubscriptionPoller::~SubscriptionPoller()
delete _pollingThread;
_pollingThread = NULL;
}

pthread_mutex_destroy(&_pollMutex);
pthread_cond_destroy(&_pollCond);
}

bool SubscriptionPoller::addPollItem(const NacosString &serviceName, const NacosString &groupName, const NacosString &clusters)
Expand All @@ -37,14 +43,23 @@ bool SubscriptionPoller::addPollItem(const NacosString &serviceName, const Nacos
NacosString name = NamingUtils::getGroupedName(serviceName, groupName);
NacosString key = ServiceInfo::getKey(name, clusters);

bool added = false;
{
WriteGuard __writeGuard(rwLock);
if (pollingList.count(key) > 0) {
return false;
}
pollingList[key] = pd;
return true;
added = true;
}

if (added) {
pthread_mutex_lock(&_pollMutex);
pthread_cond_signal(&_pollCond);
pthread_mutex_unlock(&_pollMutex);
}

return added;
}

bool SubscriptionPoller::removePollItem(const NacosString &serviceName, const NacosString &groupName, const NacosString &clusters)
Expand Down Expand Up @@ -95,8 +110,22 @@ void *SubscriptionPoller::pollingThreadFunc(void *parm)
log_debug("Copied polling list, size = %d\n", copiedList.size());
}
if (copiedList.empty()) {
log_debug("PollingList is empty, hibernating...\n", copiedList.size());
sleep(thisObj->_pollingInterval / 1000);
log_debug("PollingList is empty, waiting for new items...\n");
pthread_mutex_lock(&thisObj->_pollMutex);

{
ReadGuard __readGuard(thisObj->rwLock);
if (!thisObj->pollingList.empty()) {
pthread_mutex_unlock(&thisObj->_pollMutex);
continue;
}
}

struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += thisObj->_pollingInterval / 1000;
pthread_cond_timedwait(&thisObj->_pollCond, &thisObj->_pollMutex, &ts);
pthread_mutex_unlock(&thisObj->_pollMutex);
continue;
}

Expand Down Expand Up @@ -130,7 +159,21 @@ void *SubscriptionPoller::pollingThreadFunc(void *parm)
}

log_debug("Polling process finished, hibernating...\n");
sleep(thisObj->_pollingInterval / 1000);
pthread_mutex_lock(&thisObj->_pollMutex);

{
ReadGuard __readGuard(thisObj->rwLock);
if (thisObj->pollingList.size() != copiedList.size()) {
pthread_mutex_unlock(&thisObj->_pollMutex);
continue;
}
}

struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += thisObj->_pollingInterval / 1000;
pthread_cond_timedwait(&thisObj->_pollCond, &thisObj->_pollMutex, &ts);
pthread_mutex_unlock(&thisObj->_pollMutex);
}
log_debug("Polling thread for NamingService exited normally.\n");
return NULL;
Expand Down
4 changes: 4 additions & 0 deletions src/naming/subscribe/SubscriptionPoller.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "src/naming/NamingProxy.h"
#include "EventDispatcher.h"
#include "src/factory/ObjectConfigData.h"
#include <pthread.h>

namespace nacos{
struct PollingData
Expand All @@ -29,6 +30,9 @@ class SubscriptionPoller
volatile bool _started;
ObjectConfigData *_objectConfigData;

pthread_cond_t _pollCond;
pthread_mutex_t _pollMutex;

SubscriptionPoller();

static void *pollingThreadFunc(void *parm);
Expand Down
21 changes: 17 additions & 4 deletions src/naming/subscribe/UdpNamingServiceListener.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <errno.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>

Expand All @@ -15,7 +16,7 @@ using namespace std;

namespace nacos {

void UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosException) {
int UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosException) {
log_debug("in thread UdpNamingServiceListener::initializeUdpListener()\n");
// Creating socket file descriptor
if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) {
Expand All @@ -27,7 +28,7 @@ void UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosExceptio
// Filling client information
cliaddr.sin_family = AF_INET; // IPv4
cliaddr.sin_addr.s_addr = INADDR_ANY;
cliaddr.sin_port = htons(udpReceiverPort);
cliaddr.sin_port = htons(0);
log_debug("udp receiver port = %d\n", cliaddr.sin_port);

// Bind the socket with the server address
Expand All @@ -37,7 +38,17 @@ void UdpNamingServiceListener::initializeUdpListener() NACOS_THROW(NacosExceptio
throw NacosException(NacosException::UNABLE_TO_CREATE_SOCKET, "Unable to bind");
}

// 获取实际绑定的端口号
struct sockaddr_in bound_addr;
socklen_t addr_len = sizeof(bound_addr);
if (getsockname(sockfd, (struct sockaddr*)&bound_addr, &addr_len) < 0) {
perror("getsockname 失败");
close(sockfd);
exit(EXIT_FAILURE);
}

log_debug("socket bound\n");
return ntohs(bound_addr.sin_port);
}

bool UdpNamingServiceListener::unGzip(char *inBuffer, size_t inSize) {
Expand Down Expand Up @@ -85,7 +96,7 @@ bool UdpNamingServiceListener::unGzip(char *inBuffer, size_t inSize) {
void *UdpNamingServiceListener::listenerThreadFunc(void *param) {
UdpNamingServiceListener *thisObj = (UdpNamingServiceListener*)param;
log_debug("in thread UdpNamingServiceListener::listenerThreadFunc()\n");
thisObj->initializeUdpListener();
// thisObj->initializeUdpListener();
while (thisObj->_started) {
int ret;//also data_len

Expand Down Expand Up @@ -158,7 +169,9 @@ UdpNamingServiceListener::UdpNamingServiceListener(ObjectConfigData *objectConfi
_listenerThread = NULL;
_started = false;
_objectConfigData = objectConfigData;
udpReceiverPort = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::UDP_RECEIVER_PORT).c_str());
int udp_port = initializeUdpListener();
// udpReceiverPort = atoi(_objectConfigData->_appConfigManager->get(PropertyKeyConst::UDP_RECEIVER_PORT).c_str());
objectConfigData->_appConfigManager->set(PropertyKeyConst::UDP_RECEIVER_PORT, NacosStringOps::valueOf(udp_port));
log_debug("udpReceiverPort is %d\n", udpReceiverPort);
_listenerThread = new Thread(objectConfigData->name + "UDPListener", listenerThreadFunc, (void*)this);
}
Expand Down
2 changes: 1 addition & 1 deletion src/naming/subscribe/UdpNamingServiceListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class UdpNamingServiceListener {
char uncompressedData[UDP_MSS * 10];
Thread *_listenerThread;

void initializeUdpListener() NACOS_THROW(NacosException);
int initializeUdpListener() NACOS_THROW(NacosException);
static void *listenerThreadFunc(void *param);
bool unGzip(char *inBuffer, size_t inSize);
public:
Expand Down