Skip to content

Commit

Permalink
refactor registry checker
Browse files Browse the repository at this point in the history
  • Loading branch information
ipipman committed Apr 26, 2024
1 parent 7e6508e commit 594bb7d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package cn.ipman.rpc.core.registry;

@FunctionalInterface
public interface Callback {
void call() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import cn.ipman.rpc.core.consumer.HttpInvoker;
import cn.ipman.rpc.core.meta.InstanceMeta;
import cn.ipman.rpc.core.meta.ServiceMeta;
import cn.ipman.rpc.core.registry.Callback;
import cn.ipman.rpc.core.registry.ChangedListener;
import cn.ipman.rpc.core.registry.Event;
import com.alibaba.fastjson.JSON;
Expand Down Expand Up @@ -32,7 +33,7 @@ public class IpManRegistryCenter implements RegistryCenter {
private final static String UN_REG_PATH = "/unreg";
private final static String FIND_ALL_PATH = "/findall";
private final static String VERSION_PATH = "/version";
private final static String RENEW_PATH = "/renew";
private final static String RENEW_PATH = "/renews";

@Value("${registry-ipman.servers}")
String server;
Expand All @@ -53,7 +54,18 @@ public void start() {
versionChecker = new IpManRegistryExecutor(1_000, 5_000, TimeUnit.MILLISECONDS);
// 定期将服务实例上报给注册中心, 避免被注册中心认为服务已死, 5s一次
heathChecker = new IpManRegistryExecutor(5, 5, TimeUnit.SECONDS);
heathChecker.executor(() -> RENEWS.keySet().forEach(
heathChecker.executor(providerChecker());
}

@Override
public void stop() {
log.info(" ====>>>> [IpMan-Registry] : stop with server: {}", server);
versionChecker.gracefulShutdown();
heathChecker.gracefulShutdown();
}

private Callback providerChecker() {
return () -> RENEWS.keySet().forEach(
instance -> {
// 根据所有实例, 找到对应服务, 触发renew进行服务健康状态上报, 做探活
try {
Expand All @@ -64,16 +76,9 @@ public void start() {
} catch (Exception e) {
log.error(" ====>>>> [IpMan-Registry] call registry leader error");
}
}
));
});
}

@Override
public void stop() {
log.info(" ====>>>> [IpMan-Registry] : stop with server: {}", server);
versionChecker.gracefulShutdown();
heathChecker.gracefulShutdown();
}

@Override
public void register(ServiceMeta service, InstanceMeta instance) {
Expand Down Expand Up @@ -103,7 +108,11 @@ public List<InstanceMeta> fetchAll(ServiceMeta service) {
@Override
public void subscribe(ServiceMeta service, ChangedListener listener) {
// 每隔5s, 去注册中心获取最新版本号,如果版本号大于当前版本, 就从注册中心同步最新实例的信息
versionChecker.executor(() -> {
versionChecker.executor(consumerChecker(service, listener));
}

private Callback consumerChecker(ServiceMeta service, ChangedListener listener) {
return () -> {
try {
// 获取注册中心, 最新的版本号
Long newVersion = HttpInvoker.httpGet(versionPath(service), Long.class);
Expand All @@ -121,7 +130,7 @@ public void subscribe(ServiceMeta service, ChangedListener listener) {
} catch (Exception e) {
log.error(" ====>>>> [IpMan-Registry] call registry leader error");
}
});
};
}

@Override
Expand All @@ -130,7 +139,6 @@ public void unsubscribe() {
}



private String regPath(ServiceMeta service) {
return path(REG_PATH, service);
}
Expand All @@ -147,7 +155,7 @@ private String versionPath(ServiceMeta service) {
return path(VERSION_PATH, service);
}

private String getReNewPath(List<ServiceMeta> serviceList){
private String getReNewPath(List<ServiceMeta> serviceList) {
return path(RENEW_PATH, serviceList);
}

Expand All @@ -159,7 +167,7 @@ private String path(String context, ServiceMeta service) {
return server + context + "?service=" + service.toPath();
}

private String args(List<ServiceMeta> serviceList){
private String args(List<ServiceMeta> serviceList) {
StringBuilder sb = new StringBuilder();
for (ServiceMeta service : serviceList) {
sb.append(service.toPath()).append(",");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.ipman.rpc.core.registry.ipman;

import cn.ipman.rpc.core.registry.Callback;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;
Expand Down Expand Up @@ -56,8 +57,4 @@ public void executor(Callback callback) {
}, initialDelay, delay, unit);
}

@FunctionalInterface
public interface Callback {
void call() throws Exception;
}
}

0 comments on commit 594bb7d

Please sign in to comment.