diff --git a/pom.xml b/pom.xml index 62ce8617e..8f2c67885 100644 --- a/pom.xml +++ b/pom.xml @@ -123,6 +123,11 @@ junit-jupiter ${junit.version} + + com.github.tomakehurst + wiremock-jre8 + ${wiremock.version} + org.slf4j slf4j-simple @@ -147,6 +152,7 @@ 4.1.119.Final 5.12.1 1.7.21 + 3.0.1 UTF-8 diff --git a/pushy/pom.xml b/pushy/pom.xml index 10289282d..a1b443925 100644 --- a/pushy/pom.xml +++ b/pushy/pom.xml @@ -67,6 +67,11 @@ slf4j-simple test + + com.github.tomakehurst + wiremock-jre8 + test + io.netty netty-transport-native-epoll diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/AbstractApnsChannelFactory.java b/pushy/src/main/java/com/eatthepath/pushy/apns/AbstractApnsChannelFactory.java new file mode 100644 index 000000000..a52c57a8a --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/AbstractApnsChannelFactory.java @@ -0,0 +1,161 @@ +package com.eatthepath.pushy.apns; + +import com.eatthepath.pushy.apns.proxy.ProxyHandlerFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.resolver.AddressResolverGroup; +import io.netty.resolver.NoopAddressResolverGroup; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCounted; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.PromiseNotifier; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import java.io.Closeable; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An APNs channel factory creates new channels connected to an APNs server. Channels constructed by this factory are + * intended for use in an {@link ApnsChannelPool}. + */ +abstract class AbstractApnsChannelFactory implements PooledObjectFactory, Closeable { + + private final SslContext sslContext; + private final AtomicBoolean hasReleasedSslContext = new AtomicBoolean(false); + + private final AddressResolverGroup addressResolverGroup; + + private final Bootstrap bootstrapTemplate; + + private final AtomicLong currentDelaySeconds = new AtomicLong(0); + + private static final long MIN_CONNECT_DELAY_SECONDS = 1; + private static final long MAX_CONNECT_DELAY_SECONDS = 60; + + static final AttributeKey> CHANNEL_READY_PROMISE_ATTRIBUTE_KEY = + AttributeKey.valueOf(ApnsNotificationChannelFactory.class, "channelReadyPromise"); + + AbstractApnsChannelFactory(final InetSocketAddress serverAddress, + final SslContext sslContext, + final ProxyHandlerFactory proxyHandlerFactory, + final boolean hostnameVerificationEnabled, + final Duration connectionTimeout, + final ApnsClientResources clientResources) { + + this.sslContext = sslContext; + + if (this.sslContext instanceof ReferenceCounted) { + ((ReferenceCounted) this.sslContext).retain(); + } + + this.addressResolverGroup = proxyHandlerFactory != null + ? NoopAddressResolverGroup.INSTANCE + : clientResources.getRoundRobinDnsAddressResolverGroup(); + + this.bootstrapTemplate = new Bootstrap(); + this.bootstrapTemplate.group(clientResources.getEventLoopGroup()); + this.bootstrapTemplate.option(ChannelOption.TCP_NODELAY, true); + this.bootstrapTemplate.remoteAddress(serverAddress); + this.bootstrapTemplate.resolver(this.addressResolverGroup); + + if (connectionTimeout != null) { + this.bootstrapTemplate.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectionTimeout.toMillis()); + } + + this.bootstrapTemplate.handler(new ChannelInitializer() { + + @Override + protected void initChannel(final SocketChannel channel) { + final String authority = serverAddress.getHostName(); + final SslHandler sslHandler = sslContext.newHandler(channel.alloc(), authority, serverAddress.getPort()); + + if (hostnameVerificationEnabled) { + final SSLEngine sslEngine = sslHandler.engine(); + final SSLParameters sslParameters = sslEngine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + sslEngine.setSSLParameters(sslParameters); + } + + constructPipeline(sslHandler, channel.pipeline()); + } + }); + } + + protected abstract void constructPipeline(final SslHandler sslHandler, final ChannelPipeline pipeline); + + /** + * Creates and connects a new channel. The initial connection attempt may be delayed to accommodate exponential + * back-off requirements. + * + * @param channelReadyPromise the promise to be notified when a channel has been created and connected to the APNs + * server + * + * @return a future that will be notified once a channel has been created and connected to the APNs server + */ + @Override + public Future create(final Promise channelReadyPromise) { + final long delay = this.currentDelaySeconds.get(); + + channelReadyPromise.addListener(future -> { + final long updatedDelay = future.isSuccess() ? 0 : + Math.max(Math.min(delay * 2, MAX_CONNECT_DELAY_SECONDS), MIN_CONNECT_DELAY_SECONDS); + + this.currentDelaySeconds.compareAndSet(delay, updatedDelay); + }); + + + this.bootstrapTemplate.config().group().schedule(() -> { + final Bootstrap bootstrap = this.bootstrapTemplate.clone() + .channelFactory(new AugmentingReflectiveChannelFactory<>( + ClientChannelClassUtil.getSocketChannelClass(this.bootstrapTemplate.config().group()), + CHANNEL_READY_PROMISE_ATTRIBUTE_KEY, channelReadyPromise)); + + final ChannelFuture connectFuture = bootstrap.connect(); + + connectFuture.addListener(future -> { + if (!future.isSuccess()) { + channelReadyPromise.tryFailure(future.cause()); + } + }); + }, delay, TimeUnit.SECONDS); + + return channelReadyPromise; + } + + /** + * Destroys a channel by closing it. + * + * @param channel the channel to destroy + * @param promise the promise to notify when the channel has been destroyed + * + * @return a future that will be notified when the channel has been destroyed + */ + @Override + public Future destroy(final Channel channel, final Promise promise) { + channel.close().addListener(new PromiseNotifier<>(promise)); + return promise; + } + + @Override + public void close() { + try { + this.addressResolverGroup.close(); + } finally { + if (this.sslContext instanceof ReferenceCounted) { + if (this.hasReleasedSslContext.compareAndSet(false, true)) { + ((ReferenceCounted) this.sslContext).release(); + } + } + } + } +} \ No newline at end of file diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelFactory.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelFactory.java deleted file mode 100644 index a3845743f..000000000 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelFactory.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Copyright (c) 2020 Jon Chambers - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package com.eatthepath.pushy.apns; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.flush.FlushConsolidationHandler; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.resolver.AddressResolverGroup; -import io.netty.resolver.NoopAddressResolverGroup; -import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCounted; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; -import java.io.Closeable; -import java.net.SocketAddress; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * An APNs channel factory creates new channels connected to an APNs server. Channels constructed by this factory are - * intended for use in an {@link ApnsChannelPool}. - */ -class ApnsChannelFactory implements PooledObjectFactory, Closeable { - - private final SslContext sslContext; - private final AtomicBoolean hasReleasedSslContext = new AtomicBoolean(false); - - private final AddressResolverGroup addressResolverGroup; - - private final Bootstrap bootstrapTemplate; - - private final AtomicLong currentDelaySeconds = new AtomicLong(0); - - private static final long MIN_CONNECT_DELAY_SECONDS = 1; - private static final long MAX_CONNECT_DELAY_SECONDS = 60; - - static final AttributeKey> CHANNEL_READY_PROMISE_ATTRIBUTE_KEY = - AttributeKey.valueOf(ApnsChannelFactory.class, "channelReadyPromise"); - - ApnsChannelFactory(final ApnsClientConfiguration clientConfiguration, - final ApnsClientResources clientResources) { - - this.sslContext = clientConfiguration.getSslContext(); - - if (this.sslContext instanceof ReferenceCounted) { - ((ReferenceCounted) this.sslContext).retain(); - } - - this.addressResolverGroup = clientConfiguration.getProxyHandlerFactory().isPresent() - ? NoopAddressResolverGroup.INSTANCE - : clientResources.getRoundRobinDnsAddressResolverGroup(); - - this.bootstrapTemplate = new Bootstrap(); - this.bootstrapTemplate.group(clientResources.getEventLoopGroup()); - this.bootstrapTemplate.option(ChannelOption.TCP_NODELAY, true); - this.bootstrapTemplate.remoteAddress(clientConfiguration.getApnsServerAddress()); - this.bootstrapTemplate.resolver(this.addressResolverGroup); - - clientConfiguration.getConnectionTimeout().ifPresent(timeout -> - this.bootstrapTemplate.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) timeout.toMillis())); - - this.bootstrapTemplate.handler(new ChannelInitializer() { - - @Override - protected void initChannel(final SocketChannel channel) { - final String authority = clientConfiguration.getApnsServerAddress().getHostName(); - final SslHandler sslHandler = sslContext.newHandler(channel.alloc(), authority, clientConfiguration.getApnsServerAddress().getPort()); - - if (clientConfiguration.isHostnameVerificationEnabled()) { - final SSLEngine sslEngine = sslHandler.engine(); - final SSLParameters sslParameters = sslEngine.getSSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - sslEngine.setSSLParameters(sslParameters); - } - - final ApnsClientHandler apnsClientHandler; - { - final ApnsClientHandler.ApnsClientHandlerBuilder clientHandlerBuilder; - - if (clientConfiguration.getSigningKey().isPresent()) { - clientHandlerBuilder = new TokenAuthenticationApnsClientHandler.TokenAuthenticationApnsClientHandlerBuilder() - .signingKey(clientConfiguration.getSigningKey().get()) - .tokenExpiration(clientConfiguration.getTokenExpiration()) - .authority(authority); - } else { - clientHandlerBuilder = new ApnsClientHandler.ApnsClientHandlerBuilder() - .authority(authority); - } - - clientConfiguration.getFrameLogger().ifPresent(clientHandlerBuilder::frameLogger); - - apnsClientHandler = clientHandlerBuilder.build(); - - clientConfiguration.getGracefulShutdownTimeout().ifPresent(timeout -> - apnsClientHandler.gracefulShutdownTimeoutMillis(timeout.toMillis())); - } - - final ChannelPipeline pipeline = channel.pipeline(); - - clientConfiguration.getProxyHandlerFactory().ifPresent(proxyHandlerFactory -> - pipeline.addFirst(proxyHandlerFactory.createProxyHandler())); - - pipeline.addLast(sslHandler); - pipeline.addLast(new FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true)); - pipeline.addLast(new IdleStateHandler(clientConfiguration.getCloseAfterIdleDuration().toMillis(), 0, 0, TimeUnit.MILLISECONDS)); - pipeline.addLast(apnsClientHandler); - } - }); - } - - /** - * Creates and connects a new channel. The initial connection attempt may be delayed to accommodate exponential - * back-off requirements. - * - * @param channelReadyPromise the promise to be notified when a channel has been created and connected to the APNs - * server - * - * @return a future that will be notified once a channel has been created and connected to the APNs server - */ - @Override - public Future create(final Promise channelReadyPromise) { - final long delay = this.currentDelaySeconds.get(); - - channelReadyPromise.addListener(future -> { - final long updatedDelay = future.isSuccess() ? 0 : - Math.max(Math.min(delay * 2, MAX_CONNECT_DELAY_SECONDS), MIN_CONNECT_DELAY_SECONDS); - - ApnsChannelFactory.this.currentDelaySeconds.compareAndSet(delay, updatedDelay); - }); - - - this.bootstrapTemplate.config().group().schedule(() -> { - final Bootstrap bootstrap = ApnsChannelFactory.this.bootstrapTemplate.clone() - .channelFactory(new AugmentingReflectiveChannelFactory<>( - ClientChannelClassUtil.getSocketChannelClass(ApnsChannelFactory.this.bootstrapTemplate.config().group()), - CHANNEL_READY_PROMISE_ATTRIBUTE_KEY, channelReadyPromise)); - - final ChannelFuture connectFuture = bootstrap.connect(); - - connectFuture.addListener(future -> { - if (!future.isSuccess()) { - channelReadyPromise.tryFailure(future.cause()); - } - }); - }, delay, TimeUnit.SECONDS); - - return channelReadyPromise; - } - - /** - * Destroys a channel by closing it. - * - * @param channel the channel to destroy - * @param promise the promise to notify when the channel has been destroyed - * - * @return a future that will be notified when the channel has been destroyed - */ - @Override - public Future destroy(final Channel channel, final Promise promise) { - channel.close().addListener(new PromiseNotifier<>(promise)); - return promise; - } - - @Override - public void close() { - try { - this.addressResolverGroup.close(); - } finally { - if (this.sslContext instanceof ReferenceCounted) { - if (this.hasReleasedSslContext.compareAndSet(false, true)) { - ((ReferenceCounted) this.sslContext).release(); - } - } - } - } -} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementChannelFactory.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementChannelFactory.java new file mode 100644 index 000000000..b53b9505a --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementChannelFactory.java @@ -0,0 +1,53 @@ +package com.eatthepath.pushy.apns; + +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; + +/** + * An APNs channel factory creates new channels connected to an APNs channel management server. Channels constructed by + * this factory are intended for use in an {@link ApnsChannelPool}. + */ +class ApnsChannelManagementChannelFactory extends AbstractApnsChannelFactory { + + private final ApnsClientConfiguration clientConfiguration; + + ApnsChannelManagementChannelFactory(final ApnsClientConfiguration clientConfiguration, + final ApnsClientResources clientResources) { + + super(clientConfiguration.getApnsServerAddress(), + clientConfiguration.getSslContext(), + clientConfiguration.getProxyHandlerFactory().orElse(null), + clientConfiguration.isHostnameVerificationEnabled(), + clientConfiguration.getConnectionTimeout().orElse(null), + clientResources); + + this.clientConfiguration = clientConfiguration; + } + + protected void constructPipeline(final SslHandler sslHandler, final ChannelPipeline pipeline) { + final String authority = clientConfiguration.getApnsServerAddress().getHostName(); + + final ApnsChannelManagementHandler apnsClientHandler; + { + final ApnsChannelManagementHandler.ApnsChannelManagementHandlerBuilder channelManagementHandlerBuilder; + + channelManagementHandlerBuilder = new ApnsChannelManagementHandler.ApnsChannelManagementHandlerBuilder() + .signingKey(clientConfiguration.getSigningKey().get()) + .tokenExpiration(clientConfiguration.getTokenExpiration()) + .authority(authority); + + clientConfiguration.getFrameLogger().ifPresent(channelManagementHandlerBuilder::frameLogger); + + apnsClientHandler = channelManagementHandlerBuilder.build(); + + clientConfiguration.getGracefulShutdownTimeout().ifPresent(timeout -> + apnsClientHandler.gracefulShutdownTimeoutMillis(timeout.toMillis())); + } + + clientConfiguration.getProxyHandlerFactory().ifPresent(proxyHandlerFactory -> + pipeline.addFirst(proxyHandlerFactory.createProxyHandler())); + + pipeline.addLast(sslHandler); + pipeline.addLast(apnsClientHandler); + } +} \ No newline at end of file diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementClient.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementClient.java new file mode 100644 index 000000000..d164f8971 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementClient.java @@ -0,0 +1,355 @@ +package com.eatthepath.pushy.apns; + +import com.eatthepath.json.JsonParser; +import com.eatthepath.json.JsonSerializer; +import com.eatthepath.uuid.FastUUID; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.EmptyHttp2Headers; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.AsciiString; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * An APNs channel management client issues requests to an APNs channel management server. APNs channel management + * clients share credentials with their parent {@link ApnsClient}. + * + * @see Sending channel management requests to APNs + */ +class ApnsChannelManagementClient { + + private static final Logger log = LoggerFactory.getLogger(ApnsChannelManagementClient.class); + private final ApnsChannelPool channelPool; + + private static final AsciiString PATH_PREFIX = AsciiString.of("/1/apps/"); + private static final AsciiString SINGLE_CHANNEL_PATH_SUFFIX = AsciiString.of("/channels"); + private static final AsciiString ALL_CHANNELS_PATH_SUFFIX = AsciiString.of("/all-channels"); + + private static final AsciiString CHANNEL_ID_HEADER = AsciiString.of("apns-channel-id"); + + private static final String CHANNELS_BODY_KEY = "channels"; + private static final String MESSAGE_STORAGE_POLICY_BODY_KEY = "message-storage-policy"; + private static final String PUSH_TYPE_BODY_KEY = "push-type"; + + private static final ApnsChannelPoolMetricsListener NO_OP_METRICS_LISTENER = new ApnsChannelPoolMetricsListener() { + + @Override + public void handleConnectionAdded() { + } + + @Override + public void handleConnectionRemoved() { + } + + @Override + public void handleConnectionCreationFailed() { + } + }; + + private static class SimpleCreateChannelResponse implements CreateChannelResponse { + + private final String channelId; + + private final int status; + private final UUID requestId; + + private SimpleCreateChannelResponse(final String channelId, final int status, final UUID requestId) { + this.channelId = channelId; + this.status = status; + this.requestId = requestId; + } + + @Override + public String getChannelId() { + return channelId; + } + + @Override + public int getStatus() { + return status; + } + + @Override + public UUID getRequestId() { + return requestId; + } + } + + private static class SimpleGetChannelConfigurationResponse implements GetChannelConfigurationResponse { + + private final MessageStoragePolicy messageStoragePolicy; + + private final int status; + private final UUID requestId; + + private SimpleGetChannelConfigurationResponse(final MessageStoragePolicy messageStoragePolicy, + final int status, + final UUID requestId) { + + this.messageStoragePolicy = messageStoragePolicy; + this.status = status; + this.requestId = requestId; + } + + @Override + public MessageStoragePolicy getMessageStoragePolicy() { + return messageStoragePolicy; + } + + public int getStatus() { + return status; + } + + public UUID getRequestId() { + return requestId; + } + } + + private static class SimpleDeleteChannelResponse implements DeleteChannelResponse { + + private final int status; + private final UUID requestId; + + private SimpleDeleteChannelResponse(final int status, final UUID requestId) { + this.status = status; + this.requestId = requestId; + } + + @Override + public int getStatus() { + return status; + } + + @Override + public UUID getRequestId() { + return requestId; + } + } + + private static class SimpleGetChannelIdsResponse implements GetChannelIdsResponse { + + private final List channelIds; + + private final int status; + private final UUID requestId; + + private SimpleGetChannelIdsResponse(final List channelIds, final int status, final UUID requestId) { + this.channelIds = channelIds; + this.status = status; + this.requestId = requestId; + } + + @Override + public List getChannelIds() { + return channelIds; + } + + @Override + public int getStatus() { + return status; + } + + @Override + public UUID getRequestId() { + return requestId; + } + } + + ApnsChannelManagementClient(final ApnsClientConfiguration clientConfiguration, final ApnsClientResources clientResources) { + final ApnsChannelManagementChannelFactory channelFactory = + new ApnsChannelManagementChannelFactory(clientConfiguration, clientResources); + + this.channelPool = new ApnsChannelPool(channelFactory, + 1, + clientResources.getEventLoopGroup().next(), + NO_OP_METRICS_LISTENER); + } + + public CompletableFuture createChannel(final String bundleId, + final MessageStoragePolicy messageStoragePolicy, + final UUID apnsRequestId) { + + final Map requestBody = new HashMap<>(); + requestBody.put(MESSAGE_STORAGE_POLICY_BODY_KEY, messageStoragePolicy.getCode()); + requestBody.put(PUSH_TYPE_BODY_KEY, "LiveActivity"); + + final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.POST, + EmptyHttp2Headers.INSTANCE, + PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX), + JsonSerializer.writeJsonTextAsString(requestBody), + apnsRequestId); + + sendRequest(request); + + return request.getResponseFuture() + .thenApply(http2Response -> { + final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status()); + final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders()); + + if (status.code() == 201) { + return new SimpleCreateChannelResponse(http2Response.getHeaders().get(CHANNEL_ID_HEADER).toString(), + status.code(), + apnsRequestIdFromResponse); + } else { + throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, getErrorReason(http2Response)); + } + }); + } + + public CompletableFuture getChannelConfiguration(final String bundleId, + final String channelId, + final UUID apnsRequestId) { + + final Http2Headers headers = new DefaultHttp2Headers(); + headers.add(CHANNEL_ID_HEADER, channelId); + + final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.GET, + headers, + PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX), + null, + apnsRequestId); + + sendRequest(request); + + return request.getResponseFuture() + .thenApply(http2Response -> { + final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status()); + final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders()); + + if (status.code() == 200) { + final Map parsedResponse; + + try { + parsedResponse = new JsonParser().parseJsonObject(new String(http2Response.getData(), StandardCharsets.UTF_8)); + } catch (final ParseException e) { + throw new CompletionException(e); + } + + final MessageStoragePolicy messageStoragePolicy = + MessageStoragePolicy.getFromCode(((Long) parsedResponse.get(MESSAGE_STORAGE_POLICY_BODY_KEY)).intValue()); + + return new SimpleGetChannelConfigurationResponse(messageStoragePolicy, status.code(), apnsRequestIdFromResponse); + } else { + throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, getErrorReason(http2Response)); + } + }); + } + + public CompletableFuture deleteChannel(final String bundleId, + final String channelId, + final UUID apnsRequestId) { + + final Http2Headers headers = new DefaultHttp2Headers(); + headers.add(CHANNEL_ID_HEADER, channelId); + + final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.DELETE, + headers, + PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX), + null, + apnsRequestId); + + sendRequest(request); + + return request.getResponseFuture() + .thenApply(http2Response -> { + final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status()); + final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders()); + + if (status.code() == 204) { + return new SimpleDeleteChannelResponse(status.code(), apnsRequestIdFromResponse); + } else { + throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, getErrorReason(http2Response)); + } + }); + } + + public CompletableFuture getChannelIds(final String bundleId, final UUID apnsRequestId) { + final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.GET, + EmptyHttp2Headers.INSTANCE, + PATH_PREFIX.concat(bundleId).concat(ALL_CHANNELS_PATH_SUFFIX), + null, + apnsRequestId); + + sendRequest(request); + + return request.getResponseFuture() + .thenApply(http2Response -> { + final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status()); + final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders()); + + if (status.code() == 200) { + final Map parsedResponse; + + try { + parsedResponse = new JsonParser().parseJsonObject(new String(http2Response.getData(), StandardCharsets.UTF_8)); + } catch (final ParseException e) { + throw new CompletionException(e); + } + + @SuppressWarnings("unchecked") final List channelIds = + (List) parsedResponse.get(CHANNELS_BODY_KEY); + + return new SimpleGetChannelIdsResponse(channelIds, status.code(), apnsRequestIdFromResponse); + } else { + throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, getErrorReason(http2Response)); + } + }); + } + + private void sendRequest(final ApnsChannelManagementRequest request) { + this.channelPool.acquire().addListener((GenericFutureListener>) acquireFuture -> { + if (acquireFuture.isSuccess()) { + final Channel channel = acquireFuture.getNow(); + + channel.writeAndFlush(request); + channelPool.release(channel); + } else { + request.getResponseFuture().completeExceptionally(acquireFuture.cause()); + } + }); + } + + private static UUID getApnsRequestId(final Http2Headers headers) { + final CharSequence uuidSequence = headers.get(ApnsChannelManagementHandler.APNS_REQUEST_ID_HEADER); + + try { + return uuidSequence != null ? FastUUID.parseUUID(uuidSequence) : null; + } catch (final IllegalArgumentException e) { + return null; + } + } + + private static String getErrorReason(final Http2Response http2Response) { + if (http2Response.getData() == null) { + return null; + } + + try { + final Map parsedResponse = + new JsonParser().parseJsonObject(new String(http2Response.getData(), StandardCharsets.UTF_8)); + + return (String) parsedResponse.get("reason"); + } catch (final Exception e) { + log.warn("Could not extract error reason from response", e); + return null; + } + } + + Future close() { + return channelPool.close(); + } +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementHandler.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementHandler.java new file mode 100644 index 000000000..129a96d13 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementHandler.java @@ -0,0 +1,455 @@ +/* + * Copyright (c) 2020 Jon Chambers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.eatthepath.pushy.apns; + +import com.eatthepath.pushy.apns.auth.ApnsSigningKey; +import com.eatthepath.pushy.apns.auth.AuthenticationToken; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.HttpScheme; +import io.netty.handler.codec.http2.*; +import io.netty.util.AsciiString; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.PromiseCombiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +class ApnsChannelManagementHandler extends Http2ConnectionHandler implements Http2FrameListener, Http2Connection.Listener { + + private final Map unattachedRequestsByStreamId = new IntObjectHashMap<>(); + + private final Http2Connection.PropertyKey responseHeadersPropertyKey; + private final Http2Connection.PropertyKey responseDataPropertyKey; + private final Http2Connection.PropertyKey requestPropertyKey; + private final Http2Connection.PropertyKey streamErrorCausePropertyKey; + + private final ApnsSigningKey signingKey; + private AuthenticationToken authenticationToken; + + private final Duration tokenExpiration; + private ScheduledFuture tokenExpirationFuture; + + private final String authority; + + private Throwable connectionErrorCause; + + static final AsciiString APNS_REQUEST_ID_HEADER = new AsciiString("apns-request-id"); + private static final AsciiString APNS_AUTHORIZATION_HEADER = new AsciiString("authorization"); + + private static final IOException STREAMS_EXHAUSTED_EXCEPTION = + new IOException("HTTP/2 streams exhausted; closing connection."); + + private static final IOException STREAM_CLOSED_BEFORE_REPLY_EXCEPTION = + new IOException("Stream closed before a reply was received"); + + private static final Logger log = LoggerFactory.getLogger(ApnsChannelManagementHandler.class); + + public static class ApnsChannelManagementHandlerBuilder extends AbstractHttp2ConnectionHandlerBuilder { + + private String authority; + private ApnsSigningKey signingKey; + private Duration tokenExpiration; + + ApnsChannelManagementHandlerBuilder authority(final String authority) { + this.authority = authority; + return this; + } + + String authority() { + return this.authority; + } + + @Override + public ApnsChannelManagementHandlerBuilder frameLogger(final Http2FrameLogger frameLogger) { + return super.frameLogger(frameLogger); + } + + @Override + public Http2FrameLogger frameLogger() { + return super.frameLogger(); + } + + public ApnsChannelManagementHandlerBuilder signingKey(final ApnsSigningKey signingKey) { + this.signingKey = signingKey; + return this; + } + + public ApnsSigningKey signingKey() { + return this.signingKey; + } + + public ApnsChannelManagementHandlerBuilder tokenExpiration(final Duration tokenExpiration) { + this.tokenExpiration = tokenExpiration; + return this; + } + + public Duration tokenExpiration() { + return this.tokenExpiration; + } + + @Override + protected final boolean isServer() { + return false; + } + + @Override + protected boolean encoderEnforceMaxConcurrentStreams() { + return true; + } + + @Override + public ApnsChannelManagementHandler build(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder, final Http2Settings initialSettings) { + Objects.requireNonNull(this.authority(), "Authority must be set before building an ApnsChannelManagementHandler."); + + final ApnsChannelManagementHandler handler = new ApnsChannelManagementHandler(decoder, encoder, initialSettings, this.authority(), this.signingKey(), this.tokenExpiration()); + this.frameListener(handler); + return handler; + } + + @Override + public ApnsChannelManagementHandler build() { + return super.build(); + } + } + + ApnsChannelManagementHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder, final Http2Settings initialSettings, final String authority, final ApnsSigningKey signingKey, final Duration tokenExpiration) { + super(decoder, encoder, initialSettings); + + this.authority = authority; + this.signingKey = signingKey; + this.tokenExpiration = tokenExpiration; + + this.responseHeadersPropertyKey = this.connection().newKey(); + this.responseDataPropertyKey = this.connection().newKey(); + this.requestPropertyKey = this.connection().newKey(); + this.streamErrorCausePropertyKey = this.connection().newKey(); + + this.connection().addListener(this); + } + + @Override + public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise writePromise) { + if (message instanceof ApnsChannelManagementRequest) { + final ApnsChannelManagementRequest channelManagementRequest = (ApnsChannelManagementRequest) message; + + writePromise.addListener(future -> { + if (!future.isSuccess()) { + log.trace("Failed to write push notification.", future.cause()); + channelManagementRequest.getResponseFuture().completeExceptionally(future.cause()); + } + }); + + this.writeRequest(context, channelManagementRequest, writePromise); + } else { + // This should never happen, but in case some foreign debris winds up in the pipeline, just pass it through. + log.error("Unexpected object in pipeline: {}", message); + context.write(message, writePromise); + } + } + + private void retryRequestFromStream(final ChannelHandlerContext context, final int streamId) { + final Http2Stream stream = this.connection().stream(streamId); + + final ApnsChannelManagementRequest request = stream.removeProperty(this.requestPropertyKey); + + final ChannelPromise writePromise = context.channel().newPromise(); + this.writeRequest(context, request, writePromise); + } + + private void writeRequest(final ChannelHandlerContext context, final ApnsChannelManagementRequest request, final ChannelPromise writePromise) { + if (context.channel().isActive()) { + final int streamId = this.connection().local().incrementAndGetNextStreamId(); + + if (streamId > 0) { + // We'll attach the request and response promise to the stream as soon as the stream is created. + // Because we're using a StreamBufferingEncoder under the hood, there's no guarantee as to when the stream + // will actually be created, and so we attach these in the onStreamAdded listener to make sure everything + // is happening in a predictable order. + this.unattachedRequestsByStreamId.put(streamId, request); + + final Http2Headers headers = getHeadersForRequest(request, context, streamId); + + if (request.getPayload() == null) { + this.encoder().writeHeaders(context, streamId, headers, 0, true, writePromise); + log.trace("Wrote headers on stream {}: {}", streamId, headers); + } else { + final ChannelPromise headersPromise = context.newPromise(); + this.encoder().writeHeaders(context, streamId, headers, 0, false, headersPromise); + log.trace("Wrote headers on stream {}: {}", streamId, headers); + + final ByteBuf payloadBuffer = Unpooled.wrappedBuffer(request.getPayload().getBytes(StandardCharsets.UTF_8)); + + final ChannelPromise dataPromise = context.newPromise(); + this.encoder().writeData(context, streamId, payloadBuffer, 0, true, dataPromise); + log.trace("Wrote payload on stream {}: {}", streamId, request.getPayload()); + + final PromiseCombiner promiseCombiner = new PromiseCombiner(context.executor()); + promiseCombiner.addAll(headersPromise, dataPromise); + promiseCombiner.finish(writePromise); + } + } else { + // This is very unlikely, but in the event that we run out of stream IDs, we need to open a new + // connection. Just closing the context should be enough; automatic reconnection should take things + // from there. + writePromise.tryFailure(STREAMS_EXHAUSTED_EXCEPTION); + context.channel().close(); + } + } else { + writePromise.tryFailure(STREAM_CLOSED_BEFORE_REPLY_EXCEPTION); + } + } + + protected Http2Headers getHeadersForRequest(final ApnsChannelManagementRequest request, final ChannelHandlerContext context, final int streamId) { + final Http2Headers headers = new DefaultHttp2Headers() + .scheme(HttpScheme.HTTPS.name()) + .authority(this.authority) + .method(request.getMethod().asciiName()) + .path(request.getPath()); + + if (request.getApnsRequestId() != null) { + headers.add(APNS_REQUEST_ID_HEADER, request.getApnsRequestId().toString()); + } + + if (this.authenticationToken == null) { + log.debug("Generated a new authentication token for channel {} at stream {}", context.channel(), streamId); + this.authenticationToken = new AuthenticationToken(this.signingKey, Instant.now()); + + tokenExpirationFuture = context.executor().schedule(() -> { + log.debug("Authentication token for channel {} has expired", context.channel()); + this.authenticationToken = null; + }, this.tokenExpiration.toMillis(), TimeUnit.MILLISECONDS); + + headers.add(APNS_AUTHORIZATION_HEADER, this.authenticationToken.getAuthorizationHeader()); + } + + return headers.add(request.getHeaders()); + } + + @Override + public int onDataRead(final ChannelHandlerContext context, final int streamId, final ByteBuf data, final int padding, final boolean endOfStream) { + final int bytesProcessed = data.readableBytes() + padding; + + final Http2Stream stream = this.connection().stream(streamId); + + ((CompositeByteBuf) stream.getProperty(this.responseDataPropertyKey)).addComponent(true, data.retain()); + + if (endOfStream) { + this.handleEndOfStream(stream); + } + + return bytesProcessed; + } + + @Override + public void onHeadersRead(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final int streamDependency, final short weight, final boolean exclusive, final int padding, final boolean endOfStream) { + this.onHeadersRead(context, streamId, headers, padding, endOfStream); + } + + @Override + public void onHeadersRead(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final int padding, final boolean endOfStream) { + final Http2Stream stream = this.connection().stream(streamId); + stream.setProperty(this.responseHeadersPropertyKey, headers); + + if (endOfStream) { + this.handleEndOfStream(stream); + } + } + + private void handleEndOfStream(final Http2Stream stream) { + final ApnsChannelManagementRequest request = stream.getProperty(this.requestPropertyKey); + + request.getResponseFuture().complete(new Http2Response(stream.getProperty(this.responseHeadersPropertyKey), + ByteBufUtil.getBytes(stream.getProperty(this.responseDataPropertyKey)))); + } + + @Override + public void onPriorityRead(final ChannelHandlerContext ctx, final int streamId, final int streamDependency, final short weight, final boolean exclusive) { + } + + @Override + public void onRstStreamRead(final ChannelHandlerContext context, final int streamId, final long errorCode) { + if (errorCode == Http2Error.REFUSED_STREAM.code()) { + // This can happen if the server reduces MAX_CONCURRENT_STREAMS while we already have notifications in + // flight. We may get multiple RST_STREAM frames per stream since we send multiple frames (HEADERS and + // DATA) for each push notification, but we should only get one REFUSED_STREAM error; the rest should all be + // STREAM_CLOSED. + this.retryRequestFromStream(context, streamId); + } + } + + @Override + public void onSettingsAckRead(final ChannelHandlerContext ctx) { + } + + @Override + public void onSettingsRead(final ChannelHandlerContext context, final Http2Settings settings) { + log.debug("Received settings from APNs gateway: {}", settings); + + // Always try to mark the "channel ready" promise as a success after we receive a SETTINGS frame. If it's the + // first SETTINGS frame, we know all handshaking and connection setup is done and the channel is ready to use. + // If it's a subsequent SETTINGS frame, this will have no effect. + getChannelReadyPromise(context.channel()).trySuccess(context.channel()); + } + + @Override + public void onPingRead(final ChannelHandlerContext ctx, final long pingData) { + } + + @Override + public void onPingAckRead(final ChannelHandlerContext context, final long pingData) { + } + + @Override + public void onPushPromiseRead(final ChannelHandlerContext ctx, final int streamId, final int promisedStreamId, final Http2Headers headers, final int padding) { + } + + @Override + public void onGoAwayRead(final ChannelHandlerContext context, final int lastStreamId, final long errorCode, final ByteBuf debugData) { + log.info("Received GOAWAY from APNs channel management server: {}", debugData.toString(StandardCharsets.UTF_8)); + context.close(); + } + + @Override + public void onWindowUpdateRead(final ChannelHandlerContext ctx, final int streamId, final int windowSizeIncrement) { + } + + @Override + public void onUnknownFrame(final ChannelHandlerContext ctx, final byte frameType, final int streamId, final Http2Flags flags, final ByteBuf payload) { + } + + @Override + public void onStreamAdded(final Http2Stream stream) { + stream.setProperty(this.requestPropertyKey, this.unattachedRequestsByStreamId.remove(stream.id())); + stream.setProperty(this.responseDataPropertyKey, Unpooled.compositeBuffer()); + } + + @Override + public void onStreamActive(final Http2Stream stream) { + } + + @Override + public void onStreamHalfClosed(final Http2Stream stream) { + } + + @Override + public void onStreamClosed(final Http2Stream stream) { + // Always try to fail promises associated with closed streams; most of the time, this should fail silently, but + // in cases of unexpected closure, it will make sure that nothing gets left hanging. + final ApnsChannelManagementRequest request = stream.getProperty(this.requestPropertyKey); + + if (request != null) { + final Throwable cause; + + if (stream.getProperty(this.streamErrorCausePropertyKey) != null) { + cause = stream.getProperty(this.streamErrorCausePropertyKey); + } else if (this.connectionErrorCause != null) { + cause = this.connectionErrorCause; + } else { + cause = STREAM_CLOSED_BEFORE_REPLY_EXCEPTION; + } + + request.getResponseFuture().completeExceptionally(cause); + } + } + + @Override + public void onStreamRemoved(final Http2Stream stream) { + stream.removeProperty(this.responseHeadersPropertyKey); + stream.removeProperty(this.requestPropertyKey); + + final CompositeByteBuf responseData = stream.removeProperty(this.responseDataPropertyKey); + responseData.release(); + } + + @Override + public void onGoAwaySent(final int lastStreamId, final long errorCode, final ByteBuf debugData) { + } + + @Override + public void onGoAwayReceived(final int lastStreamId, final long errorCode, final ByteBuf debugData) { + } + + @Override + protected void onStreamError(final ChannelHandlerContext context, final boolean isOutbound, final Throwable cause, final Http2Exception.StreamException streamException) { + final Http2Stream stream = this.connection().stream(streamException.streamId()); + + // The affected stream may already be closed (or was never open in the first place) + if (stream != null) { + stream.setProperty(this.streamErrorCausePropertyKey, streamException); + } + + super.onStreamError(context, isOutbound, cause, streamException); + } + + @Override + protected void onConnectionError(final ChannelHandlerContext context, final boolean isOutbound, final Throwable cause, final Http2Exception http2Exception) { + this.connectionErrorCause = http2Exception != null ? http2Exception : cause; + + super.onConnectionError(context, isOutbound, cause, http2Exception); + } + + @Override + public void channelInactive(final ChannelHandlerContext context) throws Exception { + for (final ApnsChannelManagementRequest request : this.unattachedRequestsByStreamId.values()) { + request.getResponseFuture().completeExceptionally(STREAM_CLOSED_BEFORE_REPLY_EXCEPTION); + } + + this.unattachedRequestsByStreamId.clear(); + + if (getChannelReadyPromise(context.channel()).tryFailure(STREAM_CLOSED_BEFORE_REPLY_EXCEPTION)) { + log.debug("Channel became inactive before SETTINGS frame received"); + } + + if (this.tokenExpirationFuture != null) { + this.tokenExpirationFuture.cancel(false); + this.tokenExpirationFuture = null; + } + + super.channelInactive(context); + } + + public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) { + // Always try to fail the "channel ready" promise if we catch an exception; in some cases, these may happen + // after a connection has already become ready, in which case the failure attempt will have no effect. + getChannelReadyPromise(context.channel()).tryFailure(cause); + } + + private Promise getChannelReadyPromise(final Channel channel) { + return channel.attr(AbstractApnsChannelFactory.CHANNEL_READY_PROMISE_ATTRIBUTE_KEY).get(); + } +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementRequest.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementRequest.java new file mode 100644 index 000000000..84d28b926 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementRequest.java @@ -0,0 +1,55 @@ +package com.eatthepath.pushy.apns; + +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.AsciiString; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +class ApnsChannelManagementRequest { + private final HttpMethod method; + private final Http2Headers headers; + private final AsciiString path; + private final String payload; + private final UUID apnsRequestId; + private final CompletableFuture responseFuture; + + ApnsChannelManagementRequest(final HttpMethod method, + final Http2Headers headers, + final AsciiString path, + final String payload, + final UUID apnsRequestId) { + + this.method = method; + this.headers = headers; + this.path = path; + this.payload = payload; + this.apnsRequestId = apnsRequestId; + this.responseFuture = new CompletableFuture<>(); + } + + public HttpMethod getMethod() { + return method; + } + + public AsciiString getPath() { + return path; + } + + public Http2Headers getHeaders() { + return headers; + } + + public String getPayload() { + return payload; + } + + public UUID getApnsRequestId() { + return apnsRequestId; + } + + public CompletableFuture getResponseFuture() { + return responseFuture; + } +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java index f1c3ebd09..b0bd01f76 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java @@ -36,7 +36,7 @@ import java.util.Set; /** - *

A pool of channels connected to an APNs server. Channel pools use a {@link ApnsChannelFactory} to create + *

A pool of channels connected to an APNs server. Channel pools use a {@link ApnsNotificationChannelFactory} to create * connections (up to a given maximum capacity) on demand.

* *

Callers acquire channels from the pool via the {@link ApnsChannelPool#acquire()} method, and must return them to diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java index 6e119448c..eec7a3492 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,10 +59,12 @@ * result, clients do not need to be "started" explicitly, and are ready to begin sending notifications as soon * as they're constructed.

* - *

Notifications sent by a client to an APNs server are sent asynchronously. A - * {@link CompletableFuture} is returned immediately when a notification is sent, but will not complete until the - * attempt to send the notification has failed, the notification has been accepted by the APNs server, or the - * notification has been rejected by the APNs server.

+ *

Notifications sent by a client to an APNs server are sent asynchronously. A {@link CompletableFuture} is returned + * immediately when a notification is sent, but will not complete until the attempt to send the notification has failed, + * the notification has been accepted by the APNs server, or the notification has been rejected by the APNs server.

+ * + *

APNs clients also allow for management of broadcast push notification channels. Channel management requests use + * the same credentials and client resources as requests to send push notifications.

* *

APNs clients are intended to be long-lived, persistent resources. They are also inherently thread-safe and can be * shared across many threads in a complex application. Callers must shut them down via the {@link ApnsClient#close()} @@ -79,6 +82,8 @@ public class ApnsClient { private final ApnsChannelPool channelPool; + private final ApnsChannelManagementClient channelManagementClient; + private final ApnsClientMetricsListener metricsListener; private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -90,7 +95,6 @@ public class ApnsClient { private static class NoopApnsClientMetricsListener implements ApnsClientMetricsListener { - @Override public void handleWriteFailure(final String topic) { } @@ -129,7 +133,7 @@ public void handleConnectionCreationFailed() { this.metricsListener = clientConfiguration.getMetricsListener() .orElseGet(NoopApnsClientMetricsListener::new); - final ApnsChannelFactory channelFactory = new ApnsChannelFactory(clientConfiguration, this.clientResources); + final ApnsNotificationChannelFactory channelFactory = new ApnsNotificationChannelFactory(clientConfiguration, this.clientResources); final ApnsChannelPoolMetricsListener channelPoolMetricsListener = new ApnsChannelPoolMetricsListener() { @@ -153,6 +157,8 @@ public void handleConnectionCreationFailed() { clientConfiguration.getConcurrentConnections(), this.clientResources.getEventLoopGroup().next(), channelPoolMetricsListener); + + this.channelManagementClient = new ApnsChannelManagementClient(clientConfiguration, this.clientResources); } /** @@ -216,6 +222,92 @@ public PushNotificationFutureIn order to support several simultaneous events, you can maintain up to 10,000 channels for your app + * in the development and production environment, respectively. Once the Live Activity event is complete, and you no + * longer plan to use the channel for any subsequent updates, delete the channel to avoid going over the allocated + * channel limit. + * + * @param bundleId the bundle ID for the app for which to create a new channel + * @param messageStoragePolicy the message storage policy for the new channel + * @param apnsRequestId a unique identifier for this request; may be {@code null}, in which case the remote server + * will generate a unique ID for this request + * + * @return a future that completes when the channel has been created + * + * @see Sending channel management requests to APN + * + * @since 0.16 + */ + public CompletableFuture createChannel(final String bundleId, + final MessageStoragePolicy messageStoragePolicy, + final UUID apnsRequestId) { + + return channelManagementClient.createChannel(bundleId, messageStoragePolicy, apnsRequestId); + } + + /** + * Retrieves the configuration for a given broadcast notification channel. + * + * @param bundleId the bundle ID for the app with which the channel is associated + * @param channelId the channel ID for which to retrieve configuration details + * @param apnsRequestId a unique identifier for this request; may be {@code null}, in which case the remote server + * will generate a unique ID for this request + * + * @return a future that yields configuration details for the given channel + * + * @see Sending channel management requests to APN + * + * @since 0.16 + */ + public CompletableFuture getChannelConfiguration(final String bundleId, + final String channelId, + final UUID apnsRequestId) { + + return channelManagementClient.getChannelConfiguration(bundleId, channelId, apnsRequestId); + } + + /** + * Deletes a broadcast notification channel. + * + * @param bundleId the bundle ID for the app with which the channel is associated + * @param channelId the ID of the channel to delete + * @param apnsRequestId a unique identifier for this request; may be {@code null}, in which case the remote server + * will generate a unique ID for this request + * + * @return a future that completes when the given channel has been deleted + * + * @see Sending channel management requests to APN + * + * @since 0.16 + */ + public CompletableFuture deleteChannel(final String bundleId, + final String channelId, + final UUID apnsRequestId) { + + return channelManagementClient.deleteChannel(bundleId, channelId, apnsRequestId); + } + + /** + * Retrieves a list of all broadcast notification channel IDs associated with the given bundle ID. + * + * @param bundleId the bundle ID for the app for which to retrieve a list of broadcast notification channel IDs + * @param apnsRequestId a unique identifier for this request; may be {@code null}, in which case the remote server + * will generate a unique ID for this request + * + * @return a future that yields a list of all broadcast notification channel IDs for the given bundle ID + * + * @see Sending channel management requests to APN + * + * @since 0.16 + */ + public CompletableFuture getChannelIds(final String bundleId, final UUID apnsRequestId) { + return channelManagementClient.getChannelIds(bundleId, apnsRequestId); + } + /** *

Gracefully shuts down the client, closing all connections and releasing all persistent resources. The * disconnection process will wait until notifications that have been sent to the APNs server have been either @@ -242,13 +334,14 @@ public CompletableFuture close() { if (this.isClosed.compareAndSet(false, true)) { closeFuture = new CompletableFuture<>(); - this.channelPool.close().addListener((GenericFutureListener>) closePoolFuture -> { - if (ApnsClient.this.shouldShutDownClientResources) { - ApnsClient.this.clientResources.shutdownGracefully().addListener(future -> closeFuture.complete(null)); - } else { - closeFuture.complete(null); - } - }); + this.channelManagementClient.close().addListener(closeChannelManagementClientFuture -> + this.channelPool.close().addListener((GenericFutureListener>) closePoolFuture -> { + if (ApnsClient.this.shouldShutDownClientResources) { + ApnsClient.this.clientResources.shutdownGracefully().addListener(future -> closeFuture.complete(null)); + } else { + closeFuture.complete(null); + } + })); } else { closeFuture = CompletableFuture.completedFuture(null); } diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java index 2b8377094..06f2dd1a8 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java @@ -576,26 +576,14 @@ public ApnsClientBuilder setUseAlpn(boolean useAlpn) { return this; } - /** - * Constructs a new {@link ApnsClient} with the previously-set configuration. - * - * @return a new ApnsClient instance with the previously-set configuration - * - * @throws SSLException if an SSL context could not be created for the new client for any reason - * @throws IllegalStateException if this method is called without specifying an APNs server address, if this method - * is called without providing TLS credentials or a signing key, or if this method is called with both TLS - * credentials and a signing key - * - * @since 0.8 - */ - public ApnsClient build() throws SSLException { + ApnsClientConfiguration buildClientConfiguration() throws SSLException { if (this.apnsServerAddress == null) { throw new IllegalStateException("No APNs server address specified."); } if (this.clientCertificate == null && this.privateKey == null && this.signingKey == null) { throw new IllegalStateException("No client credentials specified; either TLS credentials (a " + - "certificate/private key) or an APNs signing key must be provided before building a client."); + "certificate/private key) or an APNs signing key must be provided before building a client."); } else if ((this.clientCertificate != null || this.privateKey != null) && this.signingKey != null) { throw new IllegalStateException("Clients may not have both a signing key and TLS credentials."); } @@ -613,18 +601,18 @@ public ApnsClient build() throws SSLException { } final SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() - .sslProvider(sslProvider) - .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE); + .sslProvider(sslProvider) + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE); if (useAlpn) { sslContextBuilder.applicationProtocolConfig( - new ApplicationProtocolConfig( - ApplicationProtocolConfig.Protocol.ALPN, - // NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers. - ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, - // ACCEPT is currently the only mode supported by both OpenSsl and JDK providers. - ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, - ApplicationProtocolNames.HTTP_2)); + new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + // NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers. + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + // ACCEPT is currently the only mode supported by both OpenSsl and JDK providers. + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)); } if (this.clientCertificate != null && this.privateKey != null) { @@ -643,25 +631,38 @@ public ApnsClient build() throws SSLException { } try { - final ApnsClientConfiguration clientConfiguration = - new ApnsClientConfiguration(this.apnsServerAddress, - sslContext, - this.enableHostnameVerification, - this.signingKey, - this.tokenExpiration, - this.proxyHandlerFactory, - this.connectionTimeout, - this.closeAfterIdleDuration, - this.gracefulShutdownTimeout, - this.concurrentConnections, - this.metricsListener, - this.frameLogger); - - return new ApnsClient(clientConfiguration, this.apnsClientResources); + return new ApnsClientConfiguration(this.apnsServerAddress, + sslContext, + this.enableHostnameVerification, + this.signingKey, + this.tokenExpiration, + this.proxyHandlerFactory, + this.connectionTimeout, + this.closeAfterIdleDuration, + this.gracefulShutdownTimeout, + this.concurrentConnections, + this.metricsListener, + this.frameLogger); } finally { if (sslContext instanceof ReferenceCounted) { ((ReferenceCounted) sslContext).release(); } } } + + /** + * Constructs a new {@link ApnsClient} with the previously-set configuration. + * + * @return a new ApnsClient instance with the previously-set configuration + * + * @throws SSLException if an SSL context could not be created for the new client for any reason + * @throws IllegalStateException if this method is called without specifying an APNs server address, if this method + * is called without providing TLS credentials or a signing key, or if this method is called with both TLS + * credentials and a signing key + * + * @since 0.8 + */ + public ApnsClient build() throws SSLException { + return new ApnsClient(buildClientConfiguration(), this.apnsClientResources); + } } diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientHandler.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientHandler.java index 790221521..59fd3062b 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientHandler.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientHandler.java @@ -490,6 +490,6 @@ public void exceptionCaught(final ChannelHandlerContext context, final Throwable } private Promise getChannelReadyPromise(final Channel channel) { - return channel.attr(ApnsChannelFactory.CHANNEL_READY_PROMISE_ATTRIBUTE_KEY).get(); + return channel.attr(ApnsNotificationChannelFactory.CHANNEL_READY_PROMISE_ATTRIBUTE_KEY).get(); } } diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsNotificationChannelFactory.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsNotificationChannelFactory.java new file mode 100644 index 000000000..4c095244a --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsNotificationChannelFactory.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2020 Jon Chambers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.eatthepath.pushy.apns; + +import io.netty.channel.ChannelPipeline; +import io.netty.handler.flush.FlushConsolidationHandler; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.concurrent.TimeUnit; + +/** + * An APNs channel factory creates new channels connected to an APNs server. Channels constructed by this factory are + * intended for use in an {@link ApnsChannelPool}. + */ +class ApnsNotificationChannelFactory extends AbstractApnsChannelFactory { + + private final ApnsClientConfiguration clientConfiguration; + + ApnsNotificationChannelFactory(final ApnsClientConfiguration clientConfiguration, + final ApnsClientResources clientResources) { + + super(clientConfiguration.getApnsServerAddress(), + clientConfiguration.getSslContext(), + clientConfiguration.getProxyHandlerFactory().orElse(null), + clientConfiguration.isHostnameVerificationEnabled(), + clientConfiguration.getConnectionTimeout().orElse(null), + clientResources); + + this.clientConfiguration = clientConfiguration; + } + + protected void constructPipeline(final SslHandler sslHandler, final ChannelPipeline pipeline) { + final String authority = clientConfiguration.getApnsServerAddress().getHostName(); + + final ApnsClientHandler apnsClientHandler; + { + final ApnsClientHandler.ApnsClientHandlerBuilder clientHandlerBuilder; + + if (clientConfiguration.getSigningKey().isPresent()) { + clientHandlerBuilder = new TokenAuthenticationApnsClientHandler.TokenAuthenticationApnsClientHandlerBuilder() + .signingKey(clientConfiguration.getSigningKey().get()) + .tokenExpiration(clientConfiguration.getTokenExpiration()) + .authority(authority); + } else { + clientHandlerBuilder = new ApnsClientHandler.ApnsClientHandlerBuilder() + .authority(authority); + } + + clientConfiguration.getFrameLogger().ifPresent(clientHandlerBuilder::frameLogger); + + apnsClientHandler = clientHandlerBuilder.build(); + + clientConfiguration.getGracefulShutdownTimeout().ifPresent(timeout -> + apnsClientHandler.gracefulShutdownTimeoutMillis(timeout.toMillis())); + } + + clientConfiguration.getProxyHandlerFactory().ifPresent(proxyHandlerFactory -> + pipeline.addFirst(proxyHandlerFactory.createProxyHandler())); + + pipeline.addLast(sslHandler); + pipeline.addLast(new FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true)); + pipeline.addLast(new IdleStateHandler(clientConfiguration.getCloseAfterIdleDuration().toMillis(), 0, 0, TimeUnit.MILLISECONDS)); + pipeline.addLast(apnsClientHandler); + } +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ChannelManagementException.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ChannelManagementException.java new file mode 100644 index 000000000..00fa044e7 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ChannelManagementException.java @@ -0,0 +1,56 @@ +package com.eatthepath.pushy.apns; + +import java.util.UUID; + +/** + * A channel management exception indicates that a request to interact with a broadcast push notification channel was + * received, acknowledged, and ultimately rejected by the APNs server. + * + * @see Handling error responses from Apple Push Notification service + */ +public class ChannelManagementException extends RuntimeException { + + private final int status; + private final UUID apnsRequestId; + private final String reason; + + /** + * Constructs a new channel management exception with the given status code, request ID, and rejection reason. + * + * @param status the HTTP status code returned by the APNs server + * @param apnsRequestId the unique identifier for the request that was rejected + * @param reason an APNs-specific rejection reason provided by the APNs server + */ + public ChannelManagementException(final int status, final UUID apnsRequestId, final String reason) { + this.status = status; + this.apnsRequestId = apnsRequestId; + this.reason = reason; + } + + /** + * Returns the HTTP status code returned by the APNs server + * + * @return the HTTP status code returned by the APNs server + */ + public int getStatus() { + return status; + } + + /** + * Returns a unique identifier for the request that was rejected. + * + * @return a unique identifier for the request that was rejected + */ + public UUID getApnsRequestId() { + return apnsRequestId; + } + + /** + * Returns an APNs-specific rejection reason provided by the APNs server. + * + * @return an APNs-specific rejection reason provided by the APNs server + */ + public String getReason() { + return reason; + } +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ChannelManagementResponse.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ChannelManagementResponse.java new file mode 100644 index 000000000..6dbd760e3 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ChannelManagementResponse.java @@ -0,0 +1,26 @@ +package com.eatthepath.pushy.apns; + +import java.util.UUID; + +/** + * A response from the APNs broadcast channel management system. + * + * @since 0.16 + */ +public interface ChannelManagementResponse { + + /** + * Returns the unique identifier (which may be assigned by the caller or by the server if the caller does not provide + * a unique identifier) for the original request. + * + * @return the unique identifier for the original request + */ + UUID getRequestId(); + + /** + * Returns the HTTP status code returned by the APNs server. + * + * @return the HTTP status code returned by the APNs server + */ + int getStatus(); +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/CreateChannelResponse.java b/pushy/src/main/java/com/eatthepath/pushy/apns/CreateChannelResponse.java new file mode 100644 index 000000000..4ea0ffa5a --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/CreateChannelResponse.java @@ -0,0 +1,16 @@ +package com.eatthepath.pushy.apns; + +/** + * A response to a "create channel" request sent to the APNs broadcast channel management system. + * + * @since 0.16 + */ +public interface CreateChannelResponse extends ChannelManagementResponse { + + /** + * Returns the base64-encoded representation of the newly-created channel ID. + * + * @return the base64-encoded representation of the newly-created channel ID + */ + String getChannelId(); +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/DeleteChannelResponse.java b/pushy/src/main/java/com/eatthepath/pushy/apns/DeleteChannelResponse.java new file mode 100644 index 000000000..587312df2 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/DeleteChannelResponse.java @@ -0,0 +1,9 @@ +package com.eatthepath.pushy.apns; + +/** + * A response to a "delete channel" request sent to the APNs broadcast notification channel management system. + * + * @since 0.16 + */ +public interface DeleteChannelResponse extends ChannelManagementResponse { +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/GetChannelConfigurationResponse.java b/pushy/src/main/java/com/eatthepath/pushy/apns/GetChannelConfigurationResponse.java new file mode 100644 index 000000000..cdc40bc66 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/GetChannelConfigurationResponse.java @@ -0,0 +1,17 @@ +package com.eatthepath.pushy.apns; + +/** + * A response to a "get channel configuration" request sent to the APNs broadcast notification channel management + * system. + * + * @since 0.16 + */ +public interface GetChannelConfigurationResponse extends ChannelManagementResponse { + + /** + * Returns the message storage policy for the channel named in the original request. + * + * @return the message storage policy for the channel named in the original request + */ + MessageStoragePolicy getMessageStoragePolicy(); +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/GetChannelIdsResponse.java b/pushy/src/main/java/com/eatthepath/pushy/apns/GetChannelIdsResponse.java new file mode 100644 index 000000000..f94f63ec3 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/GetChannelIdsResponse.java @@ -0,0 +1,19 @@ +package com.eatthepath.pushy.apns; + +import java.util.List; +import java.util.Optional; + +/** + * A response to a "list all channel IDs" request sent to the APNS broadcast notification channel management system. + * + * @since 0.16 + */ +public interface GetChannelIdsResponse extends ChannelManagementResponse { + + /** + * Returns a list of all active channel IDs for the bundle ID named in the original request. + * + * @return a list of all active channel IDs for the bundle ID named in the original request + */ + List getChannelIds(); +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/Http2Response.java b/pushy/src/main/java/com/eatthepath/pushy/apns/Http2Response.java new file mode 100644 index 000000000..a306ddc1a --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/Http2Response.java @@ -0,0 +1,21 @@ +package com.eatthepath.pushy.apns; + +import io.netty.handler.codec.http2.Http2Headers; + +class Http2Response { + private final Http2Headers headers; + private final byte[] data; + + public Http2Response(final Http2Headers headers, final byte[] data) { + this.headers = headers; + this.data = data; + } + + public Http2Headers getHeaders() { + return headers; + } + + public byte[] getData() { + return data; + } +} diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/MessageStoragePolicy.java b/pushy/src/main/java/com/eatthepath/pushy/apns/MessageStoragePolicy.java new file mode 100644 index 000000000..5edae4772 --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/MessageStoragePolicy.java @@ -0,0 +1,58 @@ +package com.eatthepath.pushy.apns; + +import java.util.UUID; + +/** + * An enumeration of message storage policies for APNs broadcast notification channels. + * + * @see Sending broadcast push notification requests to APNs + * @see ApnsClient#createChannel(String, MessageStoragePolicy, UUID) + * @see ApnsClient#getChannelConfiguration(String, String, UUID) + * @see ApnsPushNotification#getExpiration() + * + * @since 0.16 + */ +public enum MessageStoragePolicy { + + /** + * Indicates that a broadcast notification channel should not store and forward notifications if they cannot be + * delivered immediately. As the documentation notes: + * + *

Providing a nonzero expiration for a channel created with the No Message Stored storage policy results + * in message rejection.
+ */ + NO_MESSAGE_STORED(0), + + /** + * Indicates that a broadcast notification channel may attempt to store and forward notifications if they cannot be + * delivered immediately. As the broadcast notification documentation explains: + * + *
As a best-effort service, APNs may reorder notifications you send on the same channel. If APNs can’t + * deliver a notification immediately, it may store the notification based on the channel’s message storage policy + * specified during channel creation. Notifications with Medium and Low apns-priority might get grouped and delivered + * in bursts to the person’s device. APNs may also throttle your notifications and, in some cases, not deliver them. + * The exact behavior is determined by the way the person interacts with your application and the power state of the + * device.
+ */ + MOST_RECENT_MESSAGE_STORED(1); + + private final int code; + + MessageStoragePolicy(final int code) { + this.code = code; + } + + int getCode() { + return this.code; + } + + static MessageStoragePolicy getFromCode(final int code) { + for (final MessageStoragePolicy policy : MessageStoragePolicy.values()) { + if (policy.getCode() == code) { + return policy; + } + } + + throw new IllegalArgumentException(String.format("No message storage policy found with code %d", code)); + } +} diff --git a/pushy/src/test/java/com/eatthepath/pushy/apns/ApnsChannelManagementClientTest.java b/pushy/src/test/java/com/eatthepath/pushy/apns/ApnsChannelManagementClientTest.java new file mode 100644 index 000000000..5b3874a58 --- /dev/null +++ b/pushy/src/test/java/com/eatthepath/pushy/apns/ApnsChannelManagementClientTest.java @@ -0,0 +1,320 @@ +package com.eatthepath.pushy.apns; + +import com.eatthepath.json.JsonSerializer; +import com.eatthepath.pushy.apns.auth.ApnsSigningKey; +import com.eatthepath.pushy.apns.auth.KeyPairUtil; +import com.github.tomakehurst.wiremock.junit5.WireMockExtension; +import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder; +import io.netty.channel.nio.NioEventLoopGroup; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.net.ssl.SSLException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.KeyPair; +import java.security.NoSuchAlgorithmException; +import java.security.interfaces.ECPrivateKey; +import java.util.*; +import java.util.concurrent.CompletionException; +import java.util.stream.Stream; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.junit.jupiter.api.Assertions.*; + +class ApnsChannelManagementClientTest { + + private ApnsChannelManagementClient channelManagementClient; + + private static ApnsClientResources CLIENT_RESOURCES; + + @RegisterExtension + static WireMockExtension wireMockExtension = WireMockExtension.newInstance() + .options(wireMockConfig() + .keystoreType("PKCS12") + .keystorePath("server.p12") + .keystorePassword("pushy-test") + .keyManagerPassword("pushy-test") + .dynamicHttpsPort()) + .build(); + + protected static final String TEAM_ID = "team-id"; + protected static final String KEY_ID = "key-id"; + + private static final String CA_CERTIFICATE_FILENAME = "/ca.pem"; + + @BeforeAll + public static void setUpBeforeClass() { + CLIENT_RESOURCES = new ApnsClientResources(new NioEventLoopGroup(1)); + } + + @BeforeEach + void setUp() throws NoSuchAlgorithmException, InvalidKeyException, SSLException { + final KeyPair keyPair = KeyPairUtil.generateKeyPair(); + final ApnsSigningKey signingKey = new ApnsSigningKey(KEY_ID, TEAM_ID, (ECPrivateKey) keyPair.getPrivate()); + + final ApnsClientBuilder clientBuilder = new ApnsClientBuilder() + .setApnsServer("localhost", wireMockExtension.getRuntimeInfo().getHttpsPort()) + .setTrustedServerCertificateChain(getClass().getResourceAsStream(CA_CERTIFICATE_FILENAME)) + .setSigningKey(signingKey) + .setApnsClientResources(CLIENT_RESOURCES) + .setUseAlpn(true); + + channelManagementClient = + new ApnsChannelManagementClient(clientBuilder.buildClientConfiguration(), CLIENT_RESOURCES); + } + + @AfterEach + void tearDown() throws InterruptedException { + channelManagementClient.close().await(); + } + + @AfterAll + public static void tearDownAfterAll() throws Exception { + CLIENT_RESOURCES.shutdownGracefully().await(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void createChannel(final boolean specifyApnsRequestId) { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + final String channelId = Base64.getEncoder().encodeToString("channel-id".getBytes(StandardCharsets.UTF_8)); + + stubFor(post("/1/apps/com.example.Test/channels") + .willReturn(status(201) + .withHeader("apns-channel-id", channelId) + .withHeader("apns-request-id", apnsRequestId.toString()))); + + final CreateChannelResponse createChannelResponse = channelManagementClient.createChannel(bundleId, + MessageStoragePolicy.MOST_RECENT_MESSAGE_STORED, + specifyApnsRequestId ? apnsRequestId : null) + .join(); + + assertEquals(201, createChannelResponse.getStatus()); + assertEquals(apnsRequestId, createChannelResponse.getRequestId()); + assertEquals(channelId, createChannelResponse.getChannelId()); + + RequestPatternBuilder requestPatternBuilder = + postRequestedFor(urlEqualTo(String.format("/1/apps/%s/channels", bundleId))) + .withHeader("authorization", matching("bearer .+")) + .withRequestBody(equalToJson("{\"message-storage-policy\":1, \"push-type\":\"LiveActivity\"}")); + + if (specifyApnsRequestId) { + requestPatternBuilder = requestPatternBuilder.withHeader("apns-request-id", equalTo(apnsRequestId.toString())); + } + + verify(requestPatternBuilder); + } + + @Test + void createChannelError() { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + + stubFor(post("/1/apps/com.example.Test/channels") + .willReturn(badRequest() + .withHeader("apns-request-id", apnsRequestId.toString()) + .withBody("{\"reason\":\"BadChannelId\"}"))); + + final CompletionException completionException = assertThrows(CompletionException.class, () -> + channelManagementClient.createChannel(bundleId, MessageStoragePolicy.MOST_RECENT_MESSAGE_STORED, apnsRequestId).join()); + + final ChannelManagementException channelManagementException = + assertInstanceOf(ChannelManagementException.class, completionException.getCause()); + + assertEquals(400, channelManagementException.getStatus()); + assertEquals(apnsRequestId, channelManagementException.getApnsRequestId()); + assertEquals("BadChannelId", channelManagementException.getReason()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void getChannelConfiguration(final boolean specifyApnsRequestId) { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + final String channelId = Base64.getEncoder().encodeToString("channel-id".getBytes(StandardCharsets.UTF_8)); + + stubFor(get("/1/apps/com.example.Test/channels") + .willReturn(ok("{\"message-storage-policy\":1, \"push-type\":\"LiveActivity\"}") + .withHeader("apns-request-id", apnsRequestId.toString()))); + + final GetChannelConfigurationResponse getChannelConfigurationResponse = + channelManagementClient.getChannelConfiguration( + bundleId, + channelId, + specifyApnsRequestId ? apnsRequestId : null) + .join(); + + assertEquals(200, getChannelConfigurationResponse.getStatus()); + assertEquals(apnsRequestId, getChannelConfigurationResponse.getRequestId()); + assertEquals(MessageStoragePolicy.MOST_RECENT_MESSAGE_STORED, getChannelConfigurationResponse.getMessageStoragePolicy()); + + RequestPatternBuilder requestPatternBuilder = + getRequestedFor(urlEqualTo(String.format("/1/apps/%s/channels", bundleId))) + .withHeader("authorization", matching("bearer .+")) + .withHeader("apns-channel-id", equalTo(channelId)); + + if (specifyApnsRequestId) { + requestPatternBuilder = requestPatternBuilder.withHeader("apns-request-id", equalTo(apnsRequestId.toString())); + } + + verify(requestPatternBuilder); + } + + @Test + void getChannelConfigurationError() { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + final String channelId = Base64.getEncoder().encodeToString("channel-id".getBytes(StandardCharsets.UTF_8)); + + stubFor(get("/1/apps/com.example.Test/channels") + .willReturn(badRequest() + .withHeader("apns-request-id", apnsRequestId.toString()) + .withBody("{\"reason\":\"BadChannelId\"}"))); + + final CompletionException completionException = assertThrows(CompletionException.class, () -> + channelManagementClient.getChannelConfiguration(bundleId, channelId, apnsRequestId).join()); + + final ChannelManagementException channelManagementException = + assertInstanceOf(ChannelManagementException.class, completionException.getCause()); + + assertEquals(400, channelManagementException.getStatus()); + assertEquals(apnsRequestId, channelManagementException.getApnsRequestId()); + assertEquals("BadChannelId", channelManagementException.getReason()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void deleteChannel(final boolean specifyApnsRequestId) { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + final String channelId = Base64.getEncoder().encodeToString("channel-id".getBytes(StandardCharsets.UTF_8)); + + stubFor(delete("/1/apps/com.example.Test/channels") + .willReturn(noContent() + .withHeader("apns-request-id", apnsRequestId.toString()))); + + final DeleteChannelResponse deleteChannelResponse = channelManagementClient.deleteChannel(bundleId, + channelId, + specifyApnsRequestId ? apnsRequestId : null) + .join(); + + assertEquals(204, deleteChannelResponse.getStatus()); + assertEquals(apnsRequestId, deleteChannelResponse.getRequestId()); + + RequestPatternBuilder requestPatternBuilder = + deleteRequestedFor(urlEqualTo(String.format("/1/apps/%s/channels", bundleId))) + .withHeader("authorization", matching("bearer .+")) + .withHeader("apns-channel-id", equalTo(channelId)); + + if (specifyApnsRequestId) { + requestPatternBuilder = requestPatternBuilder.withHeader("apns-request-id", equalTo(apnsRequestId.toString())); + } + + verify(requestPatternBuilder); + } + + @Test + void deleteChannelError() { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + final String channelId = Base64.getEncoder().encodeToString("channel-id".getBytes(StandardCharsets.UTF_8)); + + stubFor(delete("/1/apps/com.example.Test/channels") + .willReturn(badRequest() + .withHeader("apns-request-id", apnsRequestId.toString()) + .withBody("{\"reason\":\"BadChannelId\"}"))); + + final CompletionException completionException = assertThrows(CompletionException.class, () -> + channelManagementClient.deleteChannel(bundleId, channelId, apnsRequestId).join()); + + final ChannelManagementException channelManagementException = + assertInstanceOf(ChannelManagementException.class, completionException.getCause()); + + assertEquals(400, channelManagementException.getStatus()); + assertEquals(apnsRequestId, channelManagementException.getApnsRequestId()); + assertEquals("BadChannelId", channelManagementException.getReason()); + } + + @ParameterizedTest + @MethodSource + void getChannelIds(final boolean specifyApnsRequestId, final List channelIds) { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + + final Map> channelResponse = new HashMap<>(); + channelResponse.put("channels", channelIds); + + stubFor(get("/1/apps/com.example.Test/all-channels") + .willReturn(ok(JsonSerializer.writeJsonTextAsString(channelResponse)) + .withHeader("apns-request-id", apnsRequestId.toString()))); + + final GetChannelIdsResponse getChannelIdsResponse = channelManagementClient.getChannelIds(bundleId, + specifyApnsRequestId ? apnsRequestId : null) + .join(); + + assertEquals(200, getChannelIdsResponse.getStatus()); + assertEquals(apnsRequestId, getChannelIdsResponse.getRequestId()); + assertEquals(channelIds, getChannelIdsResponse.getChannelIds()); + + RequestPatternBuilder requestPatternBuilder = + getRequestedFor(urlEqualTo(String.format("/1/apps/%s/all-channels", bundleId))) + .withHeader("authorization", matching("bearer .+")); + + if (specifyApnsRequestId) { + requestPatternBuilder = requestPatternBuilder.withHeader("apns-request-id", equalTo(apnsRequestId.toString())); + } + + verify(requestPatternBuilder); + } + + private static Stream getChannelIds() { + return Stream.of( + Arguments.argumentSet("Single channel, specified request ID", + true, generateChannelIds(1)), + + Arguments.argumentSet("Single channel, unspecified request ID", + false, generateChannelIds(1)), + + // The upstream service has a limit of 10,000 active channels + Arguments.argumentSet("Large channel batch, specified request ID", + true, generateChannelIds(10_000))); + } + + private static List generateChannelIds(final int channelIdCount) { + final List channelIds = new ArrayList<>(channelIdCount); + + for (int i = 0; i < channelIdCount; i++) { + channelIds.add(Base64.getEncoder().encodeToString(("channel-id-" + i).getBytes(StandardCharsets.UTF_8))); + } + + return channelIds; + } + + @Test + void getChannelIdsError() { + final String bundleId = "com.example.Test"; + final UUID apnsRequestId = UUID.randomUUID(); + + stubFor(get("/1/apps/com.example.Test/all-channels") + .willReturn(badRequest() + .withHeader("apns-request-id", apnsRequestId.toString()) + .withBody("{\"reason\":\"BadChannelId\"}"))); + + final CompletionException completionException = assertThrows(CompletionException.class, () -> + channelManagementClient.getChannelIds(bundleId, apnsRequestId).join()); + + final ChannelManagementException channelManagementException = + assertInstanceOf(ChannelManagementException.class, completionException.getCause()); + + assertEquals(400, channelManagementException.getStatus()); + assertEquals(apnsRequestId, channelManagementException.getApnsRequestId()); + assertEquals("BadChannelId", channelManagementException.getReason()); + } +}