Skip to content

Commit

Permalink
[FLINK-9311] [pubsub] Add unit and integration tests for PubSub conne…
Browse files Browse the repository at this point in the history
…ctors
  • Loading branch information
nielsbasjes authored and Xeli committed Sep 2, 2018
1 parent 5062218 commit 1e9e267
Show file tree
Hide file tree
Showing 23 changed files with 1,387 additions and 162 deletions.
86 changes: 35 additions & 51 deletions flink-connectors/flink-connector-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,20 @@ under the License.

<packaging>jar</packaging>

<properties>
<pubsub.version>1.37.1</pubsub.version>
</properties>
<!-- This is the way we get a consistent set of versions of the Google tools -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bom</artifactId>
<version>0.53.0-alpha</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand All @@ -51,7 +59,29 @@ under the License.
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>${pubsub.version}</version>
<!-- Version is pulled from google-cloud-bom -->
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand All @@ -69,51 +99,5 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.guava</pattern>
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.common.base</pattern>
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc.auth</pattern>
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc.protobuf</pattern>
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.pubsub;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.pubsub;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.pubsub.v1.PubsubMessage;

import java.io.IOException;

class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
/**
* A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received.
*
*/
public class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
private Bound<OUT> bound;

private BoundedPubSubSource() {
Expand All @@ -28,11 +49,19 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
bound.receivedMessage();
}

/**
* Creates a {@link BoundedPubSubSourceBuilder}.
* @param <OUT> Type of Object which will be read by the produced {@link BoundedPubSubSource}
*/
@SuppressWarnings("unchecked")
public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() {
return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource<OUT>());
}

/**
* Builder to create BoundedPubSubSource.
* @param <OUT> Type of Object which will be read by the BoundedPubSubSource
*/
@SuppressWarnings("unchecked")
public static class BoundedPubSubSourceBuilder<OUT, PSS extends BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> {
private Long boundedByAmountOfMessages;
Expand All @@ -52,7 +81,7 @@ public BUILDER boundedByTimeSinceLastMessage(long timeSinceLastMessage) {
return (BUILDER) this;
}

private Bound <OUT> createBound() {
private Bound<OUT> createBound() {
if (boundedByAmountOfMessages != null && boundedByTimeSinceLastMessage != null) {
return Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, boundedByTimeSinceLastMessage);
}
Expand Down
Loading

0 comments on commit 1e9e267

Please sign in to comment.