Skip to content

Commit 4d95df5

Browse files
committed
Add GrpcHealthCheckedEndpointGroupBuilder
Motivation: Add `GrpcHealthCheckedEndpointGroupBuilder` which builds a health checked endpoint group whose health comes from a [standard gRPC health check service result](https://grpc.io/docs/guides/health-checking/). Modifications: * Adds `GrpcHealthCheckedEndpointGroupBuilder` which extends `AbstractHealthCheckedEndpointGroupBuilder` and creates a new health check function * Adds `GrpcHealthChecker` which is the health check function that creates and uses a gRPC `HealthGrpc` stub to check the gRPC health service on the endpoint. If the health check response is `SERVING`, it is healthy. It is unhealthy if the response is not `SERVING` or if there was a request failure. * Adds tests. Result: * A user can create a health checked endpoint group that is backed by a gRPC health check service. * Closes line#5930 Closes
1 parent a2fd1d3 commit 4d95df5

File tree

6 files changed

+474
-0
lines changed

6 files changed

+474
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
import java.util.function.Function;
21+
22+
import com.linecorp.armeria.client.endpoint.EndpointGroup;
23+
import com.linecorp.armeria.client.endpoint.healthcheck.AbstractHealthCheckedEndpointGroupBuilder;
24+
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
25+
import com.linecorp.armeria.common.annotation.Nullable;
26+
import com.linecorp.armeria.common.util.AsyncCloseable;
27+
import com.linecorp.armeria.internal.client.grpc.GrpcHealthChecker;
28+
29+
/**
30+
* Builds a health checked endpoint group whose health comes from a standard gRPC health check service.
31+
*/
32+
public final class GrpcHealthCheckedEndpointGroupBuilder
33+
extends AbstractHealthCheckedEndpointGroupBuilder<GrpcHealthCheckedEndpointGroupBuilder> {
34+
35+
private @Nullable String service;
36+
37+
GrpcHealthCheckedEndpointGroupBuilder(EndpointGroup delegate) {
38+
super(delegate);
39+
}
40+
41+
/**
42+
* Returns a {@link GrpcHealthCheckedEndpointGroupBuilder} that builds a health checked
43+
* endpoint group with the specified {@link EndpointGroup}.
44+
*/
45+
public static GrpcHealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate) {
46+
return new GrpcHealthCheckedEndpointGroupBuilder(requireNonNull(delegate));
47+
}
48+
49+
/**
50+
* Sets the optional service field of the gRPC health check request.
51+
*/
52+
public GrpcHealthCheckedEndpointGroupBuilder service(@Nullable String service) {
53+
this.service = service;
54+
return this;
55+
}
56+
57+
@Override
58+
protected Function<? super HealthCheckerContext, ? extends AsyncCloseable> newCheckerFactory() {
59+
return new GrpcHealthCheckerFactory(service);
60+
}
61+
62+
private static final class GrpcHealthCheckerFactory
63+
implements Function<HealthCheckerContext, AsyncCloseable> {
64+
65+
private final @Nullable String service;
66+
67+
private GrpcHealthCheckerFactory(@Nullable String service) {
68+
this.service = service;
69+
}
70+
71+
@Override
72+
public AsyncCloseable apply(HealthCheckerContext ctx) {
73+
final GrpcHealthChecker healthChecker = new GrpcHealthChecker(ctx, ctx.endpoint(),
74+
ctx.protocol(), service);
75+
healthChecker.start();
76+
return healthChecker;
77+
}
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
/**
18+
* gRPC health checked endpoint.
19+
*/
20+
@NonNullByDefault
21+
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
22+
23+
import com.linecorp.armeria.common.annotation.NonNullByDefault;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.internal.client.grpc;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.locks.ReentrantLock;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
23+
import com.linecorp.armeria.client.ClientRequestContext;
24+
import com.linecorp.armeria.client.ClientRequestContextCaptor;
25+
import com.linecorp.armeria.client.Clients;
26+
import com.linecorp.armeria.client.Endpoint;
27+
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
28+
import com.linecorp.armeria.client.grpc.GrpcClients;
29+
import com.linecorp.armeria.common.ResponseHeaders;
30+
import com.linecorp.armeria.common.SessionProtocol;
31+
import com.linecorp.armeria.common.annotation.Nullable;
32+
import com.linecorp.armeria.common.util.AsyncCloseable;
33+
import com.linecorp.armeria.common.util.AsyncCloseableSupport;
34+
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;
35+
36+
import io.grpc.health.v1.HealthCheckRequest;
37+
import io.grpc.health.v1.HealthCheckResponse;
38+
import io.grpc.health.v1.HealthGrpc;
39+
import io.grpc.stub.StreamObserver;
40+
41+
public final class GrpcHealthChecker implements AsyncCloseable {
42+
43+
static final double HEALTHY = 1d;
44+
static final double UNHEALTHY = 0d;
45+
46+
private final HealthCheckerContext ctx;
47+
@Nullable private final String service;
48+
private final HealthGrpc.HealthStub stub;
49+
50+
private final ReentrantLock lock = new ReentrantShortLock();
51+
private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);
52+
53+
public GrpcHealthChecker(HealthCheckerContext ctx, Endpoint endpoint, SessionProtocol sessionProtocol,
54+
@Nullable String service) {
55+
this.ctx = ctx;
56+
this.service = service;
57+
58+
this.stub = GrpcClients.builder(sessionProtocol, endpoint)
59+
.options(ctx.clientOptions())
60+
.build(HealthGrpc.HealthStub.class);
61+
}
62+
63+
public void start() {
64+
check();
65+
}
66+
67+
@VisibleForTesting
68+
void check() {
69+
lock();
70+
try {
71+
final HealthCheckRequest.Builder builder = HealthCheckRequest.newBuilder();
72+
if (this.service != null) {
73+
builder.setService(service);
74+
}
75+
76+
try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) {
77+
stub.check(builder.build(), new StreamObserver<HealthCheckResponse>() {
78+
@Override
79+
public void onNext(HealthCheckResponse healthCheckResponse) {
80+
final ClientRequestContext reqCtx = reqCtxCaptor.get();
81+
if (healthCheckResponse.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
82+
ctx.updateHealth(HEALTHY, reqCtx, null, null);
83+
} else {
84+
ctx.updateHealth(UNHEALTHY, reqCtx, null, null);
85+
}
86+
}
87+
88+
@Override
89+
public void onError(Throwable throwable) {
90+
final ClientRequestContext reqCtx = reqCtxCaptor.get();
91+
ctx.updateHealth(UNHEALTHY, reqCtx, ResponseHeaders.of(500), throwable);
92+
}
93+
94+
@Override
95+
public void onCompleted() {
96+
}
97+
});
98+
}
99+
} finally {
100+
unlock();
101+
}
102+
}
103+
104+
@Override
105+
public CompletableFuture<?> closeAsync() {
106+
return closeable.closeAsync();
107+
}
108+
109+
private synchronized void closeAsync(CompletableFuture<?> future) {
110+
future.complete(null);
111+
}
112+
113+
@Override
114+
public void close() {
115+
closeable.close();
116+
}
117+
118+
private void lock() {
119+
lock.lock();
120+
}
121+
122+
private void unlock() {
123+
lock.unlock();
124+
}
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
23+
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.RegisterExtension;
26+
27+
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
28+
import com.linecorp.armeria.common.SessionProtocol;
29+
import com.linecorp.armeria.common.grpc.HealthGrpcServerExtension;
30+
31+
class GrpcHealthCheckedEndpointGroupBuilderTest {
32+
33+
@RegisterExtension
34+
private static HealthGrpcServerExtension serverExtension = new HealthGrpcServerExtension();
35+
36+
@Test
37+
public void hasHealthyEndpoint() {
38+
serverExtension.setAction(HealthGrpcServerExtension.Action.DO_HEALTHY);
39+
40+
final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
41+
.builder(serverExtension.endpoint(SessionProtocol.H2C))
42+
.build();
43+
44+
assertThat(endpointGroup.whenReady().join()).hasSize(1);
45+
}
46+
47+
@Test
48+
public void empty() throws Exception {
49+
serverExtension.setAction(HealthGrpcServerExtension.Action.DO_UNHEALTHY);
50+
51+
final HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
52+
.builder(serverExtension.endpoint(SessionProtocol.H2C))
53+
.build();
54+
55+
assertThatThrownBy(() -> {
56+
// whenReady() will timeout because there are no healthy endpoints
57+
endpointGroup.whenReady().get(1, TimeUnit.SECONDS);
58+
}).isInstanceOf(TimeoutException.class);
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.common.grpc;
17+
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import com.google.protobuf.TextFormat;
22+
23+
import com.linecorp.armeria.server.ServerBuilder;
24+
import com.linecorp.armeria.server.grpc.GrpcService;
25+
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
26+
27+
import io.grpc.health.v1.HealthCheckRequest;
28+
import io.grpc.health.v1.HealthCheckResponse;
29+
import io.grpc.health.v1.HealthGrpc;
30+
import io.grpc.stub.StreamObserver;
31+
32+
public class HealthGrpcServerExtension extends ServerExtension {
33+
34+
private static final Logger LOGGER = LoggerFactory.getLogger(HealthGrpcServerExtension.class);
35+
36+
private static final HealthCheckResponse HEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
37+
.setStatus(HealthCheckResponse.ServingStatus.SERVING)
38+
.build();
39+
40+
private static final HealthCheckResponse UNHEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
41+
.setStatus(HealthCheckResponse.ServingStatus.NOT_SERVING)
42+
.build();
43+
44+
public enum Action {
45+
DO_HEALTHY, DO_UNHEALTHY, DO_TIMEOUT
46+
}
47+
48+
private Action action;
49+
50+
@Override
51+
protected void configure(ServerBuilder sb) throws Exception {
52+
final GrpcService grpcService = GrpcService.builder()
53+
.addService(new HealthGrpc.HealthImplBase() {
54+
@Override
55+
public void check(HealthCheckRequest request,
56+
StreamObserver<HealthCheckResponse> responseObserver) {
57+
LOGGER.debug("Received health check response {}", TextFormat.shortDebugString(request));
58+
59+
if (action == Action.DO_HEALTHY) {
60+
responseObserver.onNext(HEALTHY_HEALTH_CHECK_RESPONSE);
61+
responseObserver.onCompleted();
62+
} else if (action == Action.DO_UNHEALTHY) {
63+
responseObserver.onNext(UNHEALTHY_HEALTH_CHECK_RESPONSE);
64+
responseObserver.onCompleted();
65+
} else if (action == Action.DO_TIMEOUT) {
66+
LOGGER.debug("Not sending a response...");
67+
}
68+
69+
LOGGER.debug("Completed health check response");
70+
}
71+
72+
@Override
73+
public void watch(HealthCheckRequest request,
74+
StreamObserver<HealthCheckResponse> responseObserver) {
75+
throw new UnsupportedOperationException();
76+
}
77+
})
78+
.build();
79+
80+
sb.service(grpcService);
81+
}
82+
83+
public void setAction(Action action) {
84+
this.action = action;
85+
}
86+
}

0 commit comments

Comments
 (0)