diff --git a/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java b/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java index a32119d6abf..b1c7574684e 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java @@ -22,8 +22,6 @@ import com.alibaba.nacos.config.server.model.SampleResult; import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext; import com.alibaba.nacos.config.server.service.LongPollingService; -import com.alibaba.nacos.config.server.service.dump.DumpService; -import com.alibaba.nacos.config.server.service.notify.NotifyService; import com.alibaba.nacos.config.server.utils.GroupKey2; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionManager; @@ -50,44 +48,19 @@ @RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH) public class CommunicationController { - private final DumpService dumpService; - private final LongPollingService longPollingService; private final ConfigChangeListenContext configChangeListenContext; private final ConnectionManager connectionManager; - public CommunicationController(DumpService dumpService, LongPollingService longPollingService, + public CommunicationController(LongPollingService longPollingService, ConfigChangeListenContext configChangeListenContext, ConnectionManager connectionManager) { - this.dumpService = dumpService; this.longPollingService = longPollingService; this.configChangeListenContext = configChangeListenContext; this.connectionManager = connectionManager; } - /** - * Notify the change of config information. - */ - @GetMapping("/dataChange") - public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId, - @RequestParam("group") String group, - @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, - @RequestParam(value = "tag", required = false) String tag) { - dataId = dataId.trim(); - group = group.trim(); - String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); - long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); - String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); - String isBetaStr = request.getHeader("isBeta"); - if (StringUtils.isNotBlank(isBetaStr) && Boolean.parseBoolean(isBetaStr)) { - dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); - } else { - dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); - } - return true; - } - /** * Get client config information of subscriber in local machine. */ diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index 06c08be197b..64836982a54 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -20,16 +20,9 @@ import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse; import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.utils.NetUtils; -import com.alibaba.nacos.auth.util.AuthHeaderUtil; -import com.alibaba.nacos.common.http.Callback; -import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate; -import com.alibaba.nacos.common.http.param.Header; -import com.alibaba.nacos.common.http.param.Query; -import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.Subscriber; -import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy; @@ -38,19 +31,13 @@ import com.alibaba.nacos.config.server.utils.ConfigExecutor; import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.core.cluster.Member; -import com.alibaba.nacos.core.cluster.MemberUtil; import com.alibaba.nacos.core.cluster.ServerMemberManager; -import com.alibaba.nacos.sys.env.EnvUtil; import com.alibaba.nacos.sys.utils.InetUtils; -import com.alibaba.nacos.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.text.MessageFormat; import java.util.Collection; import java.util.LinkedList; import java.util.Queue; @@ -67,8 +54,6 @@ public class AsyncNotifyService { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class); - private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate(); - private static final int MIN_RETRY_INTERVAL = 500; private static final int INCREASE_STEPS = 1000; @@ -109,20 +94,12 @@ public void onEvent(Event event) { Collection ipList = memberManager.allMembers(); // In fact, any type of queue here can be - Queue httpQueue = new LinkedList<>(); Queue rpcQueue = new LinkedList<>(); for (Member member : ipList) { - if (!MemberUtil.isSupportedLongCon(member)) { - httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), - evt.isBeta)); - } else { - rpcQueue.add( - new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); - } - } - if (!httpQueue.isEmpty()) { - ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); + // grpc report data change only + rpcQueue.add( + new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); } if (!rpcQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); @@ -138,52 +115,6 @@ public Class subscribeType() { }); } - class AsyncTask implements Runnable { - - private Queue queue; - - private NacosAsyncRestTemplate restTemplate; - - public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue queue) { - this.restTemplate = restTemplate; - this.queue = queue; - } - - @Override - public void run() { - executeAsyncInvoke(); - } - - private void executeAsyncInvoke() { - while (!queue.isEmpty()) { - NotifySingleTask task = queue.poll(); - String targetIp = task.getTargetIP(); - if (memberManager.hasMember(targetIp)) { - // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify - boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); - if (unHealthNeedDelay) { - // target ip is unhealthy, then put it in the notification list - ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, - task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, - 0, task.target); - // get delay time and set fail count to the task - asyncTaskExecute(task); - } else { - Header header = Header.newInstance(); - header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, - String.valueOf(task.getLastModified())); - header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP()); - if (task.isBeta) { - header.addParam("isBeta", "true"); - } - AuthHeaderUtil.addIdentityToHeader(header); - restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)); - } - } - } - } - } - class AsyncRpcTask implements Runnable { private Queue queue; @@ -227,19 +158,14 @@ public void run() { // get delay time and set fail count to the task asyncTaskExecute(task); } else { - - if (!MemberUtil.isSupportedLongCon(member)) { - asyncTaskExecute( - new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag, - task.getLastModified(), member.getAddress(), task.isBeta)); - } else { - try { - configClusterRpcClientProxy - .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task)); - } catch (Exception e) { - MetricsMonitor.getConfigNotifyException().increment(); - asyncTaskExecute(task); - } + + // grpc report data change only + try { + configClusterRpcClientProxy + .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task)); + } catch (Exception e) { + MetricsMonitor.getConfigNotifyException().increment(); + asyncTaskExecute(task); } } @@ -268,14 +194,6 @@ public NotifySingleRpcTask(String dataId, String group, String tenant, String ta } } - private void asyncTaskExecute(NotifySingleTask task) { - int delay = getDelayTime(task); - Queue queue = new LinkedList<>(); - queue.add(task); - AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue); - ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS); - } - private void asyncTaskExecute(NotifySingleRpcTask task) { int delay = getDelayTime(task); Queue queue = new LinkedList<>(); @@ -284,74 +202,6 @@ private void asyncTaskExecute(NotifySingleRpcTask task) { ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS); } - class AsyncNotifyCallBack implements Callback { - - private NotifySingleTask task; - - public AsyncNotifyCallBack(NotifySingleTask task) { - this.task = task; - } - - @Override - public void onReceive(RestResult result) { - - long delayed = System.currentTimeMillis() - task.getLastModified(); - - if (result.ok()) { - ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, - task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_OK, delayed, - task.target); - } else { - LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", task.target, task.getDataId(), - task.getGroup(), task.getLastModified(), result.getCode()); - ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, - task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, - task.target); - - //get delay time and set fail count to the task - asyncTaskExecute(task); - - LogUtil.NOTIFY_LOG - .error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(), - task.getGroup(), task.getLastModified()); - - MetricsMonitor.getConfigNotifyException().increment(); - } - } - - @Override - public void onError(Throwable ex) { - - long delayed = System.currentTimeMillis() - task.getLastModified(); - LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", task.target, task.getDataId(), - task.getGroup(), task.getLastModified(), ex); - ConfigTraceService - .logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), - InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, task.target); - - //get delay time and set fail count to the task - asyncTaskExecute(task); - LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(), - task.getGroup(), task.getLastModified()); - - MetricsMonitor.getConfigNotifyException().increment(); - } - - @Override - public void onCancel() { - - LogUtil.NOTIFY_LOG.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", task.target, - task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED"); - - //get delay time and set fail count to the task - asyncTaskExecute(task); - LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(), - task.getGroup(), task.getLastModified()); - - MetricsMonitor.getConfigNotifyException().increment(); - } - } - class AsyncRpcNotifyCallBack implements RequestCallBack { private NotifySingleRpcTask task; @@ -413,72 +263,7 @@ public void onException(Throwable ex) { MetricsMonitor.getConfigNotifyException().increment(); } } - - static class NotifySingleTask extends NotifyTask { - - private String target; - - private String url; - - private boolean isBeta; - - private static final String URL_PATTERN = - "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}"; - - private static final String URL_PATTERN_TENANT = - "http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" - + "?dataId={2}&group={3}&tenant={4}"; - - private int failCount; - - public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target) { - this(dataId, group, tenant, lastModified, target, false); - } - - public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target, - boolean isBeta) { - this(dataId, group, tenant, null, lastModified, target, isBeta); - } - - public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified, - String target, boolean isBeta) { - super(dataId, group, tenant, lastModified); - this.target = target; - this.isBeta = isBeta; - try { - dataId = URLEncoder.encode(dataId, Constants.ENCODE); - group = URLEncoder.encode(group, Constants.ENCODE); - } catch (UnsupportedEncodingException e) { - LOGGER.error("URLEncoder encode error", e); - } - if (StringUtils.isBlank(tenant)) { - this.url = MessageFormat.format(URL_PATTERN, target, EnvUtil.getContextPath(), dataId, group); - } else { - this.url = MessageFormat - .format(URL_PATTERN_TENANT, target, EnvUtil.getContextPath(), dataId, group, tenant); - } - if (StringUtils.isNotEmpty(tag)) { - url = url + "&tag=" + tag; - } - failCount = 0; - } - - @Override - public void setFailCount(int count) { - this.failCount = count; - } - - @Override - public int getFailCount() { - return failCount; - } - - public String getTargetIP() { - return target; - } - - } - + /** * get delayTime and also set failCount to task; The failure time index increases, so as not to retry invalid tasks * in the offline scene, which affects the normal synchronization.