Skip to content

Commit

Permalink
Decouple and refactor things
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 31, 2023
1 parent 76d8bc5 commit 9b20f26
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 138 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/vertx/serviceresolver/dns/DnsResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

import io.vertx.serviceresolver.ServiceResolver;
import io.vertx.serviceresolver.dns.impl.DnsResolverImpl;
import io.vertx.serviceresolver.impl.ResolverImpl;
import io.vertx.serviceresolver.impl.ServiceResolverImpl;

public interface DnsResolver {

static ServiceResolver create(DnsResolverOptions options) {
return new ServiceResolverImpl((vertx, lookup) -> new DnsResolverImpl(vertx, options, lookup.loadBalancer));
return new ServiceResolverImpl((vertx, lookup) -> new ResolverImpl<>(vertx, lookup.loadBalancer, new DnsResolverImpl(options)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;
import io.vertx.serviceresolver.srv.SrvResolverOptionsConverter;

@DataObject(generateConverter = true, publicConverter = false)
public class DnsResolverOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,26 @@
import io.vertx.core.Vertx;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.impl.AddressResolver;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.dns.DnsResolverOptions;
import io.vertx.serviceresolver.impl.ResolverBase;
import io.vertx.serviceresolver.impl.ResolverPlugin;
import io.vertx.serviceresolver.loadbalancing.LoadBalancer;
import io.vertx.serviceresolver.srv.SrvResolver;

import java.util.Collections;

public class DnsResolverImpl extends ResolverBase<SocketAddress, SocketAddress, DnsServiceState> implements SrvResolver {
public class DnsResolverImpl implements ResolverPlugin<SocketAddress, SocketAddress, DnsServiceState> {

private final DnsResolverOptions options;
private AddressResolver dnsResolver;

public DnsResolverImpl(Vertx vertx, DnsResolverOptions options, LoadBalancer loadBalancer) {
super(vertx, loadBalancer);
public DnsResolverImpl(DnsResolverOptions options) {
this.options = options;
}

@Override
public void init(Vertx vertx) {
AddressResolverOptions o = new AddressResolverOptions();
o.setServers(Collections.singletonList(options.getHost() + ":" + options.getPort()));
dnsResolver = new AddressResolver(vertx, o);
Expand All @@ -49,11 +51,11 @@ public SocketAddress addressOfEndpoint(SocketAddress endpoint) {
}

@Override
public Future<DnsServiceState> resolve(SocketAddress address) {
public Future<DnsServiceState> resolve(LoadBalancer loadBalancer, SocketAddress address) {
Promise<DnsServiceState> promise = Promise.promise();
dnsResolver.resolveHostnameAll(address.host(), ar -> {
if (ar.succeeded()) {
promise.complete(new DnsServiceState(address, 100, ar.result(), loadBalancer));
promise.complete(new DnsServiceState(address, 100, ar.result(), loadBalancer.selector()));
} else {
promise.fail(ar.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,27 @@
package io.vertx.serviceresolver.dns.impl;

import io.vertx.core.net.SocketAddress;
import io.vertx.serviceresolver.impl.ServiceState;
import io.vertx.serviceresolver.impl.ResolverState;
import io.vertx.serviceresolver.loadbalancing.EndpointSelector;
import io.vertx.serviceresolver.loadbalancing.LoadBalancer;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

class DnsServiceState extends ServiceState<SocketAddress> {
class DnsServiceState extends ResolverState<SocketAddress> {

final long timestamp;


DnsServiceState(SocketAddress address, long timestamp, List<InetSocketAddress> addresses, LoadBalancer loadBalancer) {
super(address.hostName(), loadBalancer);
DnsServiceState(SocketAddress address, long timestamp, List<InetSocketAddress> addresses, EndpointSelector endpointSelector) {
super(endpointSelector);

List<SocketAddress> endpoints = new ArrayList<>();
for (InetSocketAddress addr : addresses) {
add(SocketAddress.inetSocketAddress(address.port(), addr.getAddress().getHostAddress()));
endpoints.add(SocketAddress.inetSocketAddress(address.port(), addr.getAddress().getHostAddress()));
}
set(endpoints);

this.timestamp = timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

public final class EndpointImpl<E> implements Endpoint {

final ServiceState<E> state;
final E value;
final LongAdder numberOfInflightRequests = new LongAdder();
final LongAdder numberOfRequests = new LongAdder();
final LongAdder numberOfFailures = new LongAdder();
final AtomicLong minResponseTime = new AtomicLong(Long.MAX_VALUE);
final AtomicLong maxResponseTime = new AtomicLong(0);

EndpointImpl(ServiceState<E> state, E value) {
this.state = state;
EndpointImpl(E value) {
this.value = value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,69 @@
*/
package io.vertx.serviceresolver.impl;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.net.AddressResolver;
import io.vertx.serviceresolver.loadbalancing.Endpoint;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.loadbalancing.LoadBalancer;

public abstract class ResolverBase<A extends Address, E, T extends ServiceState<E>> implements AddressResolver<T, A, RequestMetric<E>, EndpointImpl<E>> {
public final class ResolverImpl<A extends Address, E, S extends ResolverState<E>> implements AddressResolver<S, A, RequestMetric<E>, EndpointImpl<E>> {

protected final Vertx vertx;
protected final LoadBalancer loadBalancer;
private final Vertx vertx;
private final LoadBalancer loadBalancer;
private final ResolverPlugin<A, E, S> plugin;

public ResolverBase(Vertx vertx, LoadBalancer loadBalancer) {
public ResolverImpl(Vertx vertx, LoadBalancer loadBalancer, ResolverPlugin<A, E, S> plugin) {

if (loadBalancer == null) {
loadBalancer = LoadBalancer.ROUND_ROBIN;
}

this.vertx = vertx;
this.loadBalancer = loadBalancer;
this.plugin = plugin;

plugin.init(vertx);
}

@Override
public A tryCast(Address address) {
return plugin.tryCast(address);
}

@Override
public EndpointImpl<E> pickEndpoint(T state) {
public Future<S> resolve(A address) {
return plugin.resolve(loadBalancer, address);
}

@Override
public void dispose(S state) {
plugin.dispose(state);
}

@Override
public EndpointImpl<E> pickEndpoint(S state) {
return (EndpointImpl<E>) state.pickAddress();
}

@Override
public boolean isValid(T state) {
public boolean isValid(S state) {
return state.isValid();
}

@Override
public final SocketAddress addressOf(EndpointImpl<E> endpoint) {
return addressOfEndpoint(endpoint.get());
public SocketAddress addressOf(EndpointImpl<E> endpoint) {
return plugin.addressOfEndpoint(endpoint.get());
}

public abstract SocketAddress addressOfEndpoint(E endpoint);

@Override
public void removeAddress(T state, EndpointImpl<E> endpoint) {
public void removeAddress(S state, EndpointImpl<E> endpoint) {
removeAddress(state, (Endpoint) endpoint);
}

public void removeAddress(T state, Endpoint endpoint) {
public void removeAddress(S state, Endpoint endpoint) {

}

Expand Down
21 changes: 21 additions & 0 deletions src/main/java/io/vertx/serviceresolver/impl/ResolverPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.vertx.serviceresolver.impl;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.serviceresolver.loadbalancing.LoadBalancer;

public interface ResolverPlugin<A extends Address, E, S extends ResolverState<E>> {

void init(Vertx vertx);

SocketAddress addressOfEndpoint(E endpoint);

A tryCast(Address address);

Future<S> resolve(LoadBalancer loadBalancer, A address);

void dispose(S state);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,20 @@

import io.vertx.serviceresolver.loadbalancing.Endpoint;
import io.vertx.serviceresolver.loadbalancing.EndpointSelector;
import io.vertx.serviceresolver.loadbalancing.LoadBalancer;

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

public abstract class ServiceState<E> {
public abstract class ResolverState<E> {

public final String name;
private final AtomicReference<List<EndpointImpl<E>>> endpoints = new AtomicReference<>(Collections.emptyList());
private final EndpointSelector selector;

public ServiceState(String name, LoadBalancer loadBalancer) {
this.name = name;
this.selector = loadBalancer.selector();
public ResolverState(EndpointSelector selector) {
this.selector = selector;
}

Endpoint pickAddress() {
Expand All @@ -39,10 +37,14 @@ Endpoint pickAddress() {
}
}

protected boolean isValid() {
return true;
}

public final void add(E endpoint) {
while (true) {
List<EndpointImpl<E>> list = endpoints.get();
EndpointImpl<E> e = new EndpointImpl<>(this, endpoint);
EndpointImpl<E> e = new EndpointImpl<>(endpoint);
List<EndpointImpl<E>> copy;
if (list.isEmpty()) {
copy = Collections.singletonList(e);
Expand All @@ -56,23 +58,26 @@ public final void add(E endpoint) {
}
}

public void clearEndpoints() {
endpoints.set(Collections.emptyList());
}

public List<EndpointImpl<E>> endpoints() {
return endpoints.get();
public final List<E> endpoints() {
List<EndpointImpl<E>> list = endpoints.get();
return new AbstractList<E>() {
@Override
public E get(int index) {
EndpointImpl<E> e = list.get(index);
return e != null ? e.get() : null;
}
@Override
public int size() {
return list.size();
}
};
}

public final void add(List<E> endpoints) {
for (E endpoint : endpoints) {
add(endpoint);
public final void set(List<E> endpoints) {
List<EndpointImpl<E>> list = new ArrayList<>();
for (E e : endpoints) {
list.add(new EndpointImpl<>(e));
}
this.endpoints.set(list);
}

protected boolean isValid() {
return true;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package io.vertx.serviceresolver.kube;

import io.vertx.serviceresolver.ServiceResolver;
import io.vertx.serviceresolver.impl.ResolverImpl;
import io.vertx.serviceresolver.impl.ServiceResolverImpl;
import io.vertx.serviceresolver.kube.impl.KubeResolverImpl;

Expand All @@ -34,6 +35,6 @@ static ServiceResolver create() {
* @return the resolver
*/
static ServiceResolver create(KubeResolverOptions options) {
return new ServiceResolverImpl((vertx, lookup) -> new KubeResolverImpl(vertx, lookup.loadBalancer, options));
return new ServiceResolverImpl((vertx, lookup) -> new ResolverImpl<>(vertx, lookup.loadBalancer, new KubeResolverImpl(options)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,36 @@
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.impl.ResolverBase;
import io.vertx.serviceresolver.impl.ResolverPlugin;
import io.vertx.serviceresolver.kube.KubeResolverOptions;
import io.vertx.serviceresolver.loadbalancing.LoadBalancer;

import static io.vertx.core.http.HttpMethod.GET;

public class KubeResolverImpl extends ResolverBase<ServiceAddress, SocketAddress, KubeServiceState> {
public class KubeResolverImpl implements ResolverPlugin<ServiceAddress, SocketAddress, KubeServiceState> {

final KubeResolverOptions options;
final String host;
final int port;
final WebSocketClient wsClient;
final HttpClient httpClient;
Vertx vertx;
WebSocketClient wsClient;
HttpClient httpClient;
final String namespace;
final String bearerToken;

public KubeResolverImpl(Vertx vertx,
LoadBalancer loadBalancer,
KubeResolverOptions options) {
super(vertx, loadBalancer);

HttpClientOptions httpClientOptions = options.getHttpClientOptions();
WebSocketClientOptions wsClientOptions = options.getWebSocketClientOptions();

public KubeResolverImpl(KubeResolverOptions options) {
this.options = options;
this.namespace = options.getNamespace();
this.host = options.getHost();
this.port = options.getPort();
this.bearerToken = options.getBearerToken();
}

@Override
public void init(Vertx vertx) {
HttpClientOptions httpClientOptions = options.getHttpClientOptions();
WebSocketClientOptions wsClientOptions = options.getWebSocketClientOptions();
this.vertx = vertx;
this.wsClient = vertx.createWebSocketClient(wsClientOptions == null ? new WebSocketClientOptions() : wsClientOptions);
this.httpClient = vertx.createHttpClient(httpClientOptions == null ? new HttpClientOptions() : httpClientOptions);
}
Expand All @@ -56,7 +59,7 @@ public ServiceAddress tryCast(Address address) {
}

@Override
public Future<KubeServiceState> resolve(ServiceAddress serviceName) {
public Future<KubeServiceState> resolve(LoadBalancer loadBalancer, ServiceAddress serviceName) {
return httpClient
.request(GET, port, host, "/api/v1/namespaces/" + namespace + "/endpoints")
.compose(req -> {
Expand All @@ -80,7 +83,7 @@ public Future<KubeServiceState> resolve(ServiceAddress serviceName) {
});
}).map(response -> {
String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion");
KubeServiceState state = new KubeServiceState(this, vertx, resourceVersion, serviceName.name(), loadBalancer);
KubeServiceState state = new KubeServiceState(this, vertx, resourceVersion, serviceName.name(), loadBalancer.selector());
JsonArray items = response.getJsonArray("items");
for (int i = 0;i < items.size();i++) {
JsonObject item = items.getJsonObject(i);
Expand Down
Loading

0 comments on commit 9b20f26

Please sign in to comment.