Skip to content

Commit

Permalink
Release 2.0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
predix-adoption-bot committed Jul 10, 2018
1 parent 7b02f26 commit 85c8301
Show file tree
Hide file tree
Showing 52 changed files with 9,699 additions and 123 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Event Hub SDK is a library that helps connect an application to [Event Hub](http
- [Closing Connection](#closing-connection)
- [Automatic Reconnection](#automatic-reconnection)
- [OAuth2 token management](#oauth2-token-management)
- [Debugging](#debugging)
- [Building SDK from repository](#building-sdk-from-repository)
- [Common Issues](#building-sdk-from-repository)

Expand All @@ -22,7 +23,7 @@ The most simple way to use this SDK is to pull from the Predix Snapshot artifact
<dependency>
<groupId>com.ge.predix.eventhub</groupId>
<artifactId>predix-event-hub-sdk</artifactId>
<version>2.0.1</version>
<version>2.0.7</version>
</dependency>
...
</dependencies>
Expand Down
1 change: 1 addition & 0 deletions examples/event-hub-java-spark-sample-app/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea/
15 changes: 15 additions & 0 deletions examples/event-hub-java-spark-sample-app/MANIFEST.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
applications:
- name: event-hub-java-sample-spark-app
memory: 512M
instances: 1
timeout: 180
path: target/event-hub-java-spark-sample-app-1.0-SNAPSHOT.jar
buildpack: java-buildpack
env:
UAA_INSTANCE_NAME: <uaa>
EVENTHUB_INSTANCE_NAME: <event_hub>
CLIENT_ID: <client_id>
CLIENT_SECRET: <client_password>
EVENTHUB_ENABLE_DEBUG: true

14 changes: 14 additions & 0 deletions examples/event-hub-java-spark-sample-app/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# event-hub-java-spark-sample-app

## Background:
Built on top of EventHub Java Sample App (https://github.com/PredixDev//predix-event-hub-java-sdk/examples) but this app adds a simple Spark job.

## Summary:
Writes all received messages in the Subscriber Callback to a file.

A GET request to "/spark" will trigger a simple Spark job that counts how many times unique words appear in the written file of received messages.

The results of the Spark job are then written to "output/part-00000".


[![Analytics](https://ga-beacon.appspot.com/UA-82773213-1/predix-event-hub-sdk/readme?pixel)](https://github.com/PredixDev)
117 changes: 117 additions & 0 deletions examples/event-hub-java-spark-sample-app/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>event-hub-java-spark-sample-app</groupId>
<artifactId>event-hub-java-spark-sample-app</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.ge.predix.eventhub.spark.sample.Application</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jackson.version>2.6.5</jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>com.neovisionaries</groupId>
<artifactId>nv-websocket-client</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>com.ge.predix.eventhub</groupId>
<artifactId>predix-event-hub-sdk</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ge.predix.eventhub.spark.sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@ComponentScan
@Configuration
@SpringBootConfiguration
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package com.ge.predix.eventhub.spark.sample;

import com.ge.predix.eventhub.*;
import com.ge.predix.eventhub.client.Client;
import com.ge.predix.eventhub.configuration.EventHubConfiguration;
import com.ge.predix.eventhub.configuration.PublishConfiguration;
import com.ge.predix.eventhub.configuration.SubscribeConfiguration;
import com.google.protobuf.ByteString;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.web.bind.annotation.*;
import scala.Tuple2;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

@RestController
@EnableAutoConfiguration
public class Controller {
private static final Logger logger = Logger.getLogger(Controller.class.getName());
private static Client eventHub;
private static EventHubConfiguration eventHubConfiguration;
private static List<Message> rxMessages = Collections.synchronizedList(new ArrayList<Message>());
private static List<String> rxErrors = Collections.synchronizedList(new ArrayList<String>());
private static AtomicInteger rxErrorCount = new AtomicInteger();

private static final String subscriberName = "java-sdk-spark-sample-app-subscriber";
private static final String subscriberInstance = "spark-sample-app-instance";
private static BufferedWriter writer = null;
private static SparkConf conf = null;
private static JavaSparkContext sc = null;


@PostConstruct
public void makeClient() throws Exception {
writer = new BufferedWriter(new FileWriter("messages.txt", true));
conf = new SparkConf().setMaster("local").setAppName("Work Count App");
sc = new JavaSparkContext(conf);
System.out.println("******************************* Using environment variables *******************************");
fromExplicit();

class SubCallback implements Client.SubscribeCallback {
@Override
public void onMessage(Message message) {
rxMessages.add(message);
try {
writer.write(message.toString());
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(Throwable throwable) {
rxErrors.add(throwable.getMessage());
rxErrorCount.incrementAndGet();
}
}
eventHub.subscribe(new SubCallback());
}

@PreDestroy
void closeWriter() throws IOException {
writer.close();
}

@RequestMapping(value = "/spark", method = RequestMethod.GET)
void spark() {
JavaRDD<String> lines = sc.textFile("messages.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2<>(s,1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("output");
sc.close();
}


private void fromExplicit() throws EventHubClientException {
try {
EventHubConfiguration configuration = new EventHubConfiguration.Builder()
.host(System.getenv("EVENTHUB_URI"))
.port(Integer.parseInt(System.getenv("EVENTHUB_PORT")))
.authURL(System.getenv("AUTH_URL"))
.clientID(System.getenv("CLIENT_ID"))
.clientSecret(System.getenv("CLIENT_SECRET"))
.zoneID(System.getenv("ZONE_ID"))
.publishConfiguration(new PublishConfiguration.Builder().publisherType(PublishConfiguration.PublisherType.SYNC).build())
.subscribeConfiguration(new SubscribeConfiguration.Builder().subscriberName(subscriberName).subscriberInstance(subscriberInstance).build())
.build();

eventHub = new Client(configuration);
logger.info("** logging Client credentials **");
logger.info(configuration.getAuthURL());
logger.info(configuration.getClientID());
logger.info(configuration.getHost());
logger.info(configuration.getZoneID());
logger.info(configuration.getPort() + "");
logger.info(configuration.isAutomaticTokenRenew() + "");
logger.info("** logging Client details done**");
eventHub.forceRenewToken();
eventHubConfiguration = configuration;

} catch(EventHubClientException.InvalidConfigurationException e) {
logger.info(e.getMessage());
System.out.println("Could not create client");
}
}

@RequestMapping(value = "/subscribe", method = RequestMethod.GET)
String subscribe() throws EventHubClientException, IOException {
JSONObject responses = new JSONObject();
JSONArray messages = new JSONArray();
JSONArray errors = new JSONArray();


while(rxMessages.size() != 0) {
Message message = rxMessages.remove(0);
messages.put(new JSONObject(String.format("{\"id\":\"%s\", \"body\":%s}", message.getId(), message.getBody().toStringUtf8())));
}


for (String error : rxErrors) {
errors.put(error);
}

responses.put("messages", messages);
responses.put("errors", errors);

return responses.toString();
}

@RequestMapping(value = "/publish", method = RequestMethod.POST)
String publish(@RequestBody String input, @RequestParam(value = "id") String id, @RequestParam(value = "count", required = false) Integer count ) throws EventHubClientException {
List<Ack> acks;
if(count != null && count >= 1){
Messages.Builder msgBuilder = Messages.newBuilder();
for(int i=0;i<count;i++){
Message msg = Message.newBuilder().setId(String.format("%s-%d", id, i))
.setZoneId(eventHubConfiguration.getZoneID())
.setBody(ByteString.copyFromUtf8(String.format("{message:\"%s\"}", input))).build();

msgBuilder.addMsg(msg);
}
acks = eventHub.addMessages(msgBuilder.build()).flush();
}
else{
count = 1;
acks = eventHub.addMessage(id, input, null).flush();
}

JSONArray array = new JSONArray();
for(Ack a : acks){
array.put(ackToJSON(a));
}
return array.toString();
}


@RequestMapping("/")
String home() {
return "Hello Event Hub!";
}

private JSONObject ackToJSON(Ack a) {
JSONObject j = new JSONObject();
if(!a.getId().equals(a.getDefaultInstanceForType().getId()))
j.put("id", a.getId());
j.put("status_code", a.getStatusCode());
if(a.getStatusCode() == AckStatus.ACCEPTED){
j.put("offset", a.getOffset());
j.put("partition", a.getPartition());
j.put("topic", a.getTopic());
}else{
j.put("desc", a.getDesc());
}
return j;
}
}
6 changes: 6 additions & 0 deletions examples/event-hub-java-spark-sample-app/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

rm -r output
rm messages.txt
mvn clean install
java -jar target/event-hub-java-spark-sample-app-1.0-SNAPSHOT.jar --server.port=58621
Loading

0 comments on commit 85c8301

Please sign in to comment.