Skip to content

Commit

Permalink
imploded registry by ipman
Browse files Browse the repository at this point in the history
  • Loading branch information
ipipman committed Apr 21, 2024
1 parent d1e8d4f commit 1c77d35
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,13 @@ static <T> T httpGet(String url, Class<T> clazz) {


@SneakyThrows
@SuppressWarnings("unused")
static <T> T httpGet(String url, TypeReference<T> typeReference) {
log.debug(" =====>>>>>> httpGet: " + url);
String respJson = Default.get(url);
log.debug(" =====>>>>>> respJson: " + respJson);
return JSON.parseObject(respJson, typeReference);
}


/**
* 使用HTTP POST方法发送请求,并将其解析为指定的Java类型。
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package cn.ipman.rpc.core.registry.ipman;

import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Description for this class
*
* @Author IpMan
* @Date 2024/4/21 20:12
*/
@Slf4j
public class IpManHeathChecker {

// 注册中心探活间隔, 5s
final int interval = 5_000;

final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

static final DateTimeFormatter DTF = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss");

public void check(Callback callback) {
executor.scheduleWithFixedDelay(() -> {
log.debug(" schedule to check ipman registry ... [{}]", DTF.format(LocalDateTime.now()));
try {
callback.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, interval, interval, TimeUnit.MILLISECONDS);
}


@FunctionalInterface
public interface Callback {
void call() throws Exception;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package cn.ipman.rpc.core.registry.ipman;

import cn.ipman.rpc.core.api.RegistryCenter;
import cn.ipman.rpc.core.consumer.HttpInvoker;
import cn.ipman.rpc.core.meta.InstanceMeta;
import cn.ipman.rpc.core.meta.ServiceMeta;
import cn.ipman.rpc.core.registry.ChangedListener;
import cn.ipman.rpc.core.registry.Event;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;

Expand Down Expand Up @@ -38,25 +42,67 @@ public void stop() {
@Override
public void register(ServiceMeta service, InstanceMeta instance) {
log.info(" ====>>>> [IpMan-Registry] : register instance {} to {}", instance.toHttpUrl(), service.toPath());
InstanceMeta inst = HttpInvoker.httpPost(JSON.toJSONString(instance), regPath(service), InstanceMeta.class);
log.info(" ====>>>> [IpMan-Registry] : registered {} success", inst);
}

@Override
public void unregister(ServiceMeta service, InstanceMeta instance) {

log.info(" ====>>>> [IpMan-Registry] : unregister instance {} to {}", instance.toHttpUrl(), service.toPath());
InstanceMeta inst = HttpInvoker.httpPost(JSON.toJSONString(instance), unRegPath(service), InstanceMeta.class);
log.info(" ====>>>> [IpMan-Registry] : unregistered {} success", inst);
}

@Override
public List<InstanceMeta> fetchAll(ServiceMeta service) {
return null;
log.info(" ====>>>> [IpMan-Registry] : find all instances for {}", service.toPath());
List<InstanceMeta> instances = HttpInvoker.httpGet(findAllPath(service), new TypeReference<List<InstanceMeta>>() {
});
log.info(" ====>>>> [IpMan-Registry] : findAll = {}", instances);
return instances;
}

IpManHeathChecker heathChecker = new IpManHeathChecker();

@Override
public void subscribe(ServiceMeta service, ChangedListener listener) {
// 每隔5s, 去注册中心获取最新版本号,如果版本号大于当前版本, 就从注册中心同步最新实例的信息
heathChecker.check(() -> {
// 获取注册中心, 最新的版本号
String versionPath = versionPath(service);
Long newVersion = HttpInvoker.httpGet(versionPath, Long.class);
Long version = VERSIONS.getOrDefault(service.toPath(), -1L);
log.debug(" ====>>>> [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version);

// 如果版本号大于当前版本, 就从注册中心同步最新实例的信息
if (newVersion > version) {
log.info(" ====>>>> version changed [{}] newVersion:{} oldVersion:{}", service.toPath(), newVersion, version);
List<InstanceMeta> instanceMetas = fetchAll(service);
log.info(" ====>>>> version {} fetch all and fire: {}", newVersion, instanceMetas);
listener.fire(new Event(instanceMetas));
VERSIONS.put(service.toPath(), newVersion);
}
});
}

@Override
public void unsubscribe() {

}

private String regPath(ServiceMeta service) {
return server + "/reg?service=" + service.toPath();
}

private String unRegPath(ServiceMeta service) {
return server + "/unreg?service=" + service.toPath();
}

private String findAllPath(ServiceMeta service) {
return server + "/findAll?service=" + service.toPath();
}

private String versionPath(ServiceMeta service) {
return server + "/version?service=" + service.toPath();
}
}

0 comments on commit 1c77d35

Please sign in to comment.