Skip to content

Commit

Permalink
Remove http data change (#10043)
Browse files Browse the repository at this point in the history
* Remove http way to report data change

* Remove http controller method to receive data change
  • Loading branch information
Daydreamer-ia authored Mar 6, 2023
1 parent 74b81be commit 920fd4c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext;
import com.alibaba.nacos.config.server.service.LongPollingService;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.notify.NotifyService;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
Expand All @@ -50,44 +48,19 @@
@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)
public class CommunicationController {

private final DumpService dumpService;

private final LongPollingService longPollingService;

private final ConfigChangeListenContext configChangeListenContext;

private final ConnectionManager connectionManager;

public CommunicationController(DumpService dumpService, LongPollingService longPollingService,
public CommunicationController(LongPollingService longPollingService,
ConfigChangeListenContext configChangeListenContext, ConnectionManager connectionManager) {
this.dumpService = dumpService;
this.longPollingService = longPollingService;
this.configChangeListenContext = configChangeListenContext;
this.connectionManager = connectionManager;
}

/**
* Notify the change of config information.
*/
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && Boolean.parseBoolean(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}

/**
* Get client config information of subscriber in local machine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,9 @@
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.auth.util.AuthHeaderUtil;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy;
Expand All @@ -38,19 +31,13 @@
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.InetUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
Expand All @@ -67,8 +54,6 @@ public class AsyncNotifyService {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);

private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate();

private static final int MIN_RETRY_INTERVAL = 500;

private static final int INCREASE_STEPS = 1000;
Expand Down Expand Up @@ -109,20 +94,12 @@ public void onEvent(Event event) {
Collection<Member> ipList = memberManager.allMembers();

// In fact, any type of queue here can be
Queue<NotifySingleTask> httpQueue = new LinkedList<>();
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();

for (Member member : ipList) {
if (!MemberUtil.isSupportedLongCon(member)) {
httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
} else {
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
}
}
if (!httpQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
// grpc report data change only
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
}
if (!rpcQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
Expand All @@ -138,52 +115,6 @@ public Class<? extends Event> subscribeType() {
});
}

class AsyncTask implements Runnable {

private Queue<NotifySingleTask> queue;

private NacosAsyncRestTemplate restTemplate;

public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {
this.restTemplate = restTemplate;
this.queue = queue;
}

@Override
public void run() {
executeAsyncInvoke();
}

private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (memberManager.hasMember(targetIp)) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, task.target);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
}

class AsyncRpcTask implements Runnable {

private Queue<NotifySingleRpcTask> queue;
Expand Down Expand Up @@ -227,19 +158,14 @@ public void run() {
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {

if (!MemberUtil.isSupportedLongCon(member)) {
asyncTaskExecute(
new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
task.getLastModified(), member.getAddress(), task.isBeta));
} else {
try {
configClusterRpcClientProxy
.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
} catch (Exception e) {
MetricsMonitor.getConfigNotifyException().increment();
asyncTaskExecute(task);
}

// grpc report data change only
try {
configClusterRpcClientProxy
.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
} catch (Exception e) {
MetricsMonitor.getConfigNotifyException().increment();
asyncTaskExecute(task);
}

}
Expand Down Expand Up @@ -268,14 +194,6 @@ public NotifySingleRpcTask(String dataId, String group, String tenant, String ta
}
}

private void asyncTaskExecute(NotifySingleTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}

private void asyncTaskExecute(NotifySingleRpcTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleRpcTask> queue = new LinkedList<>();
Expand All @@ -284,74 +202,6 @@ private void asyncTaskExecute(NotifySingleRpcTask task) {
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}

class AsyncNotifyCallBack implements Callback<String> {

private NotifySingleTask task;

public AsyncNotifyCallBack(NotifySingleTask task) {
this.task = task;
}

@Override
public void onReceive(RestResult<String> result) {

long delayed = System.currentTimeMillis() - task.getLastModified();

if (result.ok()) {
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_OK, delayed,
task.target);
} else {
LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", task.target, task.getDataId(),
task.getGroup(), task.getLastModified(), result.getCode());
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
task.target);

//get delay time and set fail count to the task
asyncTaskExecute(task);

LogUtil.NOTIFY_LOG
.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(),
task.getGroup(), task.getLastModified());

MetricsMonitor.getConfigNotifyException().increment();
}
}

@Override
public void onError(Throwable ex) {

long delayed = System.currentTimeMillis() - task.getLastModified();
LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", task.target, task.getDataId(),
task.getGroup(), task.getLastModified(), ex);
ConfigTraceService
.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(),
InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, task.target);

//get delay time and set fail count to the task
asyncTaskExecute(task);
LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(),
task.getGroup(), task.getLastModified());

MetricsMonitor.getConfigNotifyException().increment();
}

@Override
public void onCancel() {

LogUtil.NOTIFY_LOG.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", task.target,
task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED");

//get delay time and set fail count to the task
asyncTaskExecute(task);
LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(),
task.getGroup(), task.getLastModified());

MetricsMonitor.getConfigNotifyException().increment();
}
}

class AsyncRpcNotifyCallBack implements RequestCallBack<ConfigChangeClusterSyncResponse> {

private NotifySingleRpcTask task;
Expand Down Expand Up @@ -413,72 +263,7 @@ public void onException(Throwable ex) {
MetricsMonitor.getConfigNotifyException().increment();
}
}

static class NotifySingleTask extends NotifyTask {

private String target;

private String url;

private boolean isBeta;

private static final String URL_PATTERN =
"http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}";

private static final String URL_PATTERN_TENANT =
"http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange"
+ "?dataId={2}&group={3}&tenant={4}";

private int failCount;

public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target) {
this(dataId, group, tenant, lastModified, target, false);
}

public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target,
boolean isBeta) {
this(dataId, group, tenant, null, lastModified, target, isBeta);
}

public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified,
String target, boolean isBeta) {
super(dataId, group, tenant, lastModified);
this.target = target;
this.isBeta = isBeta;
try {
dataId = URLEncoder.encode(dataId, Constants.ENCODE);
group = URLEncoder.encode(group, Constants.ENCODE);
} catch (UnsupportedEncodingException e) {
LOGGER.error("URLEncoder encode error", e);
}
if (StringUtils.isBlank(tenant)) {
this.url = MessageFormat.format(URL_PATTERN, target, EnvUtil.getContextPath(), dataId, group);
} else {
this.url = MessageFormat
.format(URL_PATTERN_TENANT, target, EnvUtil.getContextPath(), dataId, group, tenant);
}
if (StringUtils.isNotEmpty(tag)) {
url = url + "&tag=" + tag;
}
failCount = 0;
}

@Override
public void setFailCount(int count) {
this.failCount = count;
}

@Override
public int getFailCount() {
return failCount;
}

public String getTargetIP() {
return target;
}

}


/**
* get delayTime and also set failCount to task; The failure time index increases, so as not to retry invalid tasks
* in the offline scene, which affects the normal synchronization.
Expand Down

0 comments on commit 920fd4c

Please sign in to comment.