diff --git a/rpcman-core/src/main/java/cn/ipman/rpc/core/consumer/HttpInvoker.java b/rpcman-core/src/main/java/cn/ipman/rpc/core/consumer/HttpInvoker.java index 76356b9..ccf690d 100644 --- a/rpcman-core/src/main/java/cn/ipman/rpc/core/consumer/HttpInvoker.java +++ b/rpcman-core/src/main/java/cn/ipman/rpc/core/consumer/HttpInvoker.java @@ -60,7 +60,6 @@ static T httpGet(String url, Class clazz) { @SneakyThrows - @SuppressWarnings("unused") static T httpGet(String url, TypeReference typeReference) { log.debug(" =====>>>>>> httpGet: " + url); String respJson = Default.get(url); @@ -68,7 +67,6 @@ static T httpGet(String url, TypeReference typeReference) { return JSON.parseObject(respJson, typeReference); } - /** * 使用HTTP POST方法发送请求,并将其解析为指定的Java类型。 * diff --git a/rpcman-core/src/main/java/cn/ipman/rpc/core/registry/ipman/IpManHeathChecker.java b/rpcman-core/src/main/java/cn/ipman/rpc/core/registry/ipman/IpManHeathChecker.java new file mode 100644 index 0000000..662a7e4 --- /dev/null +++ b/rpcman-core/src/main/java/cn/ipman/rpc/core/registry/ipman/IpManHeathChecker.java @@ -0,0 +1,43 @@ +package cn.ipman.rpc.core.registry.ipman; + +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/4/21 20:12 + */ +@Slf4j +public class IpManHeathChecker { + + // 注册中心探活间隔, 5s + final int interval = 5_000; + + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + static final DateTimeFormatter DTF = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"); + + public void check(Callback callback) { + executor.scheduleWithFixedDelay(() -> { + log.debug(" schedule to check ipman registry ... [{}]", DTF.format(LocalDateTime.now())); + try { + callback.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, interval, interval, TimeUnit.MILLISECONDS); + } + + + @FunctionalInterface + public interface Callback { + void call() throws Exception; + } +} diff --git a/rpcman-core/src/main/java/cn/ipman/rpc/core/registry/ipman/IpManRegistryCenter.java b/rpcman-core/src/main/java/cn/ipman/rpc/core/registry/ipman/IpManRegistryCenter.java index e4d3a82..e158ac9 100644 --- a/rpcman-core/src/main/java/cn/ipman/rpc/core/registry/ipman/IpManRegistryCenter.java +++ b/rpcman-core/src/main/java/cn/ipman/rpc/core/registry/ipman/IpManRegistryCenter.java @@ -1,9 +1,13 @@ package cn.ipman.rpc.core.registry.ipman; import cn.ipman.rpc.core.api.RegistryCenter; +import cn.ipman.rpc.core.consumer.HttpInvoker; import cn.ipman.rpc.core.meta.InstanceMeta; import cn.ipman.rpc.core.meta.ServiceMeta; import cn.ipman.rpc.core.registry.ChangedListener; +import cn.ipman.rpc.core.registry.Event; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -38,25 +42,67 @@ public void stop() { @Override public void register(ServiceMeta service, InstanceMeta instance) { log.info(" ====>>>> [IpMan-Registry] : register instance {} to {}", instance.toHttpUrl(), service.toPath()); + InstanceMeta inst = HttpInvoker.httpPost(JSON.toJSONString(instance), regPath(service), InstanceMeta.class); + log.info(" ====>>>> [IpMan-Registry] : registered {} success", inst); } @Override public void unregister(ServiceMeta service, InstanceMeta instance) { - + log.info(" ====>>>> [IpMan-Registry] : unregister instance {} to {}", instance.toHttpUrl(), service.toPath()); + InstanceMeta inst = HttpInvoker.httpPost(JSON.toJSONString(instance), unRegPath(service), InstanceMeta.class); + log.info(" ====>>>> [IpMan-Registry] : unregistered {} success", inst); } @Override public List fetchAll(ServiceMeta service) { - return null; + log.info(" ====>>>> [IpMan-Registry] : find all instances for {}", service.toPath()); + List instances = HttpInvoker.httpGet(findAllPath(service), new TypeReference>() { + }); + log.info(" ====>>>> [IpMan-Registry] : findAll = {}", instances); + return instances; } + IpManHeathChecker heathChecker = new IpManHeathChecker(); + @Override public void subscribe(ServiceMeta service, ChangedListener listener) { + // 每隔5s, 去注册中心获取最新版本号,如果版本号大于当前版本, 就从注册中心同步最新实例的信息 + heathChecker.check(() -> { + // 获取注册中心, 最新的版本号 + String versionPath = versionPath(service); + Long newVersion = HttpInvoker.httpGet(versionPath, Long.class); + Long version = VERSIONS.getOrDefault(service.toPath(), -1L); + log.debug(" ====>>>> [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version); + // 如果版本号大于当前版本, 就从注册中心同步最新实例的信息 + if (newVersion > version) { + log.info(" ====>>>> version changed [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version); + List instanceMetas = fetchAll(service); + log.info(" ====>>>> version {} fetch all and fire: {}", newVersion, instanceMetas); + listener.fire(new Event(instanceMetas)); + VERSIONS.put(service.toPath(), newVersion); + } + }); } @Override public void unsubscribe() { } + + private String regPath(ServiceMeta service) { + return server + "/reg?service=" + service.toPath(); + } + + private String unRegPath(ServiceMeta service) { + return server + "/unreg?service=" + service.toPath(); + } + + private String findAllPath(ServiceMeta service) { + return server + "/findAll?service=" + service.toPath(); + } + + private String versionPath(ServiceMeta service) { + return server + "/version?service=" + service.toPath(); + } }