Skip to content

Commit

Permalink
Added support for Nifi 1.8
Browse files Browse the repository at this point in the history
  • Loading branch information
David Kjerrumgaard committed Mar 4, 2019
1 parent bfb5a6e commit 71ff580
Show file tree
Hide file tree
Showing 40 changed files with 5,885 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.project
.settings/
*/.classpath
*/.gitignore
*/target/
/target/
.DS_Store
45 changes: 45 additions & 0 deletions nifi-pulsar-client-service-api/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-pulsar-bundle</artifactId>
<version>1.8.0</version>
</parent>

<artifactId>nifi-pulsar-client-service-api</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.pulsar;

import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ControllerService;
import org.apache.pulsar.client.api.PulsarClient;

@Tags({"Pulsar", "client", "pool"})
@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, "
+ "based on the configuration properties defined.")
public interface PulsarClientService extends ControllerService {

public PulsarClient getPulsarClient();

public String getPulsarBrokerRootURL();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.pulsar.cache;

import java.io.Closeable;
import java.io.IOException;

import org.apache.commons.collections4.map.LRUMap;

public class PulsarClientLRUCache<K, V extends Closeable> extends LRUMap<K, V> {

private static final long serialVersionUID = 730163138087670453L;
private final static float LOAD_FACTOR = 0.75F;
private final static boolean SCAN_UNTIL_REMOVABLE = false;

public PulsarClientLRUCache(int maxSize) {
this(maxSize, LOAD_FACTOR, SCAN_UNTIL_REMOVABLE);
}

public PulsarClientLRUCache(int maxSize, float loadFactor, boolean scanUntilRemovable) {
super(maxSize, loadFactor, scanUntilRemovable);
}

@Override
public void clear() {
this.values().parallelStream().forEach(closable -> {
releaseResources(closable);
});
super.clear();
}

@Override
protected boolean removeLRU(LinkEntry<K, V> entry) {
releaseResources(entry.getValue()); // release resources held by entry
return true; // actually delete entry
}

private void releaseResources(V value) {
try {
value.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.pulsar.cache;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@SuppressWarnings("rawtypes")
public class PulsarClientLRUCacheTest {

@Mock
private Producer mockedPulsarProducer;

@Before
public void setUp() throws InterruptedException {
mockedPulsarProducer = mock(Producer.class);
}

/**
* Make sure the LRUCache functions as a Map
*/
@Test
public void simpleTest() {
PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(10);

for (Character i='A'; i<='E'; i++){
cache.put(i.toString(), mockedPulsarProducer);
}

assertEquals(5, cache.size());

for (Character i='A'; i<='E'; i++){
assertNotNull( cache.get(i.toString()));
}
}

@Test
public void evictionTest() {

PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(5);

for (Character i='A'; i<='Z'; i++){
cache.put(i.toString(), mockedPulsarProducer);
}

// Make sure we only have 5 items in the cache
assertEquals(5, cache.size());

// Make sure we have the last 5 items added to the cache
for (Character i='V'; i<='Z'; i++){
assertNotNull(cache.get(i.toString()));
}
}

@Test
public void evictionLruTest() {

PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(5);

final Character A = 'A';

// Write 25 items to the cache, and the letter 'A' every other put.
for (Character i='B'; i<='Z'; i++){
cache.put(i.toString(), mockedPulsarProducer);
cache.put(A.toString(), mockedPulsarProducer);
}

// Make sure we only have 5 items in the cache
assertEquals(5, cache.size());

// Make sure that the letter 'A' is still in the cache due to frequent access
assertNotNull( cache.get(A.toString()) );

// Make sure we have the last 4 items added to the cache
for (Character i='W'; i<='Z'; i++){
assertNotNull( cache.get(i.toString()));
}
}

@Test
public void clearTest() throws PulsarClientException {
PulsarClientLRUCache<String, Producer> cache = new PulsarClientLRUCache<String, Producer>(26);

for (Character i='A'; i<='Z'; i++) {
cache.put(i.toString(), mockedPulsarProducer);
}

// Make sure we only have all the items in the cache
assertEquals(26, cache.size());
cache.clear();

verify(mockedPulsarProducer, times(26)).close();

// Make sure all the items were removed
assertEquals(0, cache.size());
}
}
41 changes: 41 additions & 0 deletions nifi-pulsar-client-service-nar/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-pulsar-bundle</artifactId>
<version>1.8.0</version>
</parent>

<artifactId>nifi-pulsar-client-service-nar</artifactId>
<packaging>nar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.8.0</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-pulsar-client-service-api</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 71ff580

Please sign in to comment.