Skip to content

Commit

Permalink
NIFI-14067 Replaced anonymous classes with lambdas (apache#9571)
Browse files Browse the repository at this point in the history
Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
dan-s1 authored Dec 10, 2024
1 parent 5c5b7ff commit a5086a9
Show file tree
Hide file tree
Showing 45 changed files with 1,159 additions and 1,755 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -204,16 +203,13 @@ private void computeLineage() {

Map<String, LineageNode> lastEventMap = new HashMap<>(); // maps FlowFile UUID to last event for that FlowFile
final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords);
sortedRecords.sort(new Comparator<ProvenanceEventRecord>() {
@Override
public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) {
// Sort on Event Time, then Event ID.
final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime());
if (eventTimeComparison == 0) {
return Long.compare(o1.getEventId(), o2.getEventId());
} else {
return eventTimeComparison;
}
sortedRecords.sort((o1, o2) -> {
// Sort on Event Time, then Event ID.
final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime());
if (eventTimeComparison == 0) {
return Long.compare(o1.getEventId(), o2.getEventId());
} else {
return eventTimeComparison;
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Supplier;
Expand Down Expand Up @@ -121,12 +120,7 @@ private static List<IdentityMapping> getMappings(final NiFiProperties properties
}

// sort the list by the key so users can control the ordering in nifi.properties
Collections.sort(mappings, new Comparator<IdentityMapping>() {
@Override
public int compare(IdentityMapping m1, IdentityMapping m2) {
return m1.getKey().compareTo(m2.getKey());
}
});
mappings.sort(Comparator.comparing(IdentityMapping::getKey));

return mappings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,7 @@ public int removeSelectedElements(final Filter<T> filter) {
}

public List<T> asList() {
return getSelectedElements(new Filter<T>() {
@Override
public boolean select(final T value) {
return true;
}
});
return getSelectedElements(value -> true);
}

public T getOldestElement() {
Expand Down Expand Up @@ -252,7 +247,7 @@ public void forEach(final ForEachEvaluator<T> evaluator, final IterationDirectio
}
}

public static interface Filter<S> {
public interface Filter<S> {

boolean select(S value);
}
Expand All @@ -262,7 +257,7 @@ public static interface Filter<S> {
*
* @param <S> the type to evaluate
*/
public static interface ForEachEvaluator<S> {
public interface ForEachEvaluator<S> {

/**
* Evaluates the given element and returns {@code true} if the next element should be evaluated, {@code false} otherwise
Expand All @@ -273,9 +268,9 @@ public static interface ForEachEvaluator<S> {
boolean evaluate(S value);
}

public static enum IterationDirection {
public enum IterationDirection {

FORWARD,
BACKWARD;
BACKWARD
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -167,14 +166,11 @@ public synchronized Collection<T> recoverRecords() throws IOException {
}

final List<File> orderedJournalFiles = Arrays.asList(journalFiles);
Collections.sort(orderedJournalFiles, new Comparator<File>() {
@Override
public int compare(final File o1, final File o2) {
final long transactionId1 = getMinTransactionId(o1);
final long transactionId2 = getMinTransactionId(o2);
orderedJournalFiles.sort((o1, o2) -> {
final long transactionId1 = getMinTransactionId(o1);
final long transactionId2 = getMinTransactionId(o2);

return Long.compare(transactionId1, transactionId2);
}
return Long.compare(transactionId1, transactionId2);
});

final long snapshotTransactionId = snapshotRecovery.getMaxTransactionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand All @@ -46,7 +44,6 @@

import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -144,7 +141,7 @@ public void testMultipleChunks() throws Exception {
final List<BatchWriteItemRequest> results = captor.getAllValues();
Assertions.assertEquals(2, results.size());

final BatchWriteItemRequest result1 = results.get(0);
final BatchWriteItemRequest result1 = results.getFirst();
assertTrue(result1.hasRequestItems());
assertNotNull(result1.requestItems().get(TABLE_NAME));
assertItemsConvertedProperly(result1.requestItems().get(TABLE_NAME), 25);
Expand All @@ -155,7 +152,7 @@ public void testMultipleChunks() throws Exception {
assertItemsConvertedProperly(result2.requestItems().get(TABLE_NAME), 4);

runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).getFirst();
Assertions.assertEquals("2", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
}

Expand All @@ -168,7 +165,7 @@ public void testThroughputIssue() throws Exception {
runner.run();

runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_UNPROCESSED, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_UNPROCESSED).get(0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_UNPROCESSED).getFirst();
Assertions.assertEquals("1", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
}

Expand All @@ -183,7 +180,7 @@ public void testRetryAfterUnprocessed() throws Exception {
Assertions.assertEquals(4, captor.getValue().requestItems().get(TABLE_NAME).size());

runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).getFirst();
Assertions.assertEquals("2", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
}

Expand All @@ -196,7 +193,7 @@ public void testErrorDuringInsertion() throws Exception {
runner.run();

runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_FAILURE).get(0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_FAILURE).getFirst();
Assertions.assertEquals("0", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
}

Expand All @@ -212,7 +209,7 @@ public void testGeneratedPartitionKey() throws Exception {
final BatchWriteItemRequest result = captor.getValue();
Assertions.assertEquals(1, result.requestItems().get(TABLE_NAME).size());

final Map<String, AttributeValue> item = result.requestItems().get(TABLE_NAME).iterator().next().putRequest().item();
final Map<String, AttributeValue> item = result.requestItems().get(TABLE_NAME).getFirst().putRequest().item();
Assertions.assertEquals(4, item.size());
Assertions.assertEquals(string("P0"), item.get("partition"));
assertTrue(item.containsKey("generated"));
Expand Down Expand Up @@ -317,7 +314,7 @@ private void assertItemsConvertedProperly(final Collection<WriteRequest> writeRe
private void setInsertionError() {
final BatchWriteItemResponse outcome = mock(BatchWriteItemResponse.class);
final Map<String, List<WriteRequest>> unprocessedItems = new HashMap<>();
final List<WriteRequest> writeResults = Arrays.asList(mock(WriteRequest.class));
final List<WriteRequest> writeResults = Collections.singletonList(mock(WriteRequest.class));
unprocessedItems.put("test", writeResults);
when(outcome.unprocessedItems()).thenReturn(unprocessedItems);
when(outcome.hasUnprocessedItems()).thenReturn(true);
Expand All @@ -332,17 +329,14 @@ private void setServerError() {
private void setExceedThroughputAtGivenChunk(final int chunkToFail) {
final AtomicInteger numberOfCalls = new AtomicInteger(0);

when(client.batchWriteItem(captor.capture())).then(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
final int calls = numberOfCalls.incrementAndGet();

if (calls >= chunkToFail) {
throw ProvisionedThroughputExceededException.builder().message("Throughput exceeded")
.awsErrorDetails(AwsErrorDetails.builder().errorCode("error code").errorMessage("error message").build()).build();
} else {
return mock(BatchWriteItemResponse.class);
}
when(client.batchWriteItem(captor.capture())).then(invocationOnMock -> {
final int calls = numberOfCalls.incrementAndGet();

if (calls >= chunkToFail) {
throw ProvisionedThroughputExceededException.builder().message("Throughput exceeded")
.awsErrorDetails(AwsErrorDetails.builder().errorCode("error code").errorMessage("error message").build()).build();
} else {
return mock(BatchWriteItemResponse.class);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.nifi.util.db;

import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayInputStream;
Expand All @@ -38,12 +37,7 @@ static ResultSet resultSetReturningMetadata(ResultSetMetaData metadata) throws S
when(rs.getMetaData()).thenReturn(metadata);

final AtomicInteger counter = new AtomicInteger(1);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
return counter.getAndDecrement() > 0;
}
}).when(rs).next();
Mockito.doAnswer((Answer<Boolean>) invocation -> counter.getAndDecrement() > 0).when(rs).next();

return rs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,7 @@ public synchronized void setNumMessages(final long msgCount) {
}

private void transferRanges(final List<Range> ranges, final Relationship relationship) {
Collections.sort(ranges, new Comparator<Range>() {
@Override
public int compare(final Range o1, final Range o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
ranges.sort(Comparator.comparingLong(Range::getStart));

for (int i = 0; i < ranges.size(); i++) {
Range range = ranges.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,7 @@ private void checkHdfsUriForTimeout(final Configuration config) throws IOExcepti

private FileSystem getFileSystemAsUser(final Configuration config, final UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
} catch (final InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,56 +91,50 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin
*/
public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
final String charset, final int batchSize, final Consumer<List<JMSResponse>> messageSetConsumer) {
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() {
@Override
public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
final List<JMSResponse> jmsResponses = new ArrayList<>();
int batchCounter = 0;

JMSResponse response;
while (batchCounter < batchSize && (response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null) {
response.setBatchOrder(batchCounter);
jmsResponses.add(response);
batchCounter++;
}
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> {
final List<JMSResponse> jmsResponses = new ArrayList<>();
int batchCounter = 0;

JMSResponse response;
while (batchCounter < batchSize && (response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null) {
response.setBatchOrder(batchCounter);
jmsResponses.add(response);
batchCounter++;
}

if (!jmsResponses.isEmpty()) {
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
// the responsibility of the processor to handle closing the Message Consumer.
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
// the JMSResponse.
messageSetConsumer.accept(jmsResponses);
}
if (!jmsResponses.isEmpty()) {
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
// the responsibility of the processor to handle closing the Message Consumer.
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
// the JMSResponse.
messageSetConsumer.accept(jmsResponses);
}
});
}

private void doWithJmsTemplate(String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector, MessageReceiver messageReceiver) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
public Void doInJms(final Session session) throws JMSException {
this.jmsTemplate.execute((SessionCallback<Void>) session -> {

final MessageConsumer messageConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
final MessageConsumer messageConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
try {
messageReceiver.consume(session, messageConsumer);
} catch (Exception e) {
// We need to call recover to ensure that in the event of
// abrupt end or exception the current session will stop message
// delivery and restart with the oldest unacknowledged message
try {
messageReceiver.consume(session, messageConsumer);
} catch (Exception e) {
// We need to call recover to ensure that in the event of
// abrupt end or exception the current session will stop message
// delivery and restart with the oldest unacknowledged message
try {
session.recover();
} catch (Exception e1) {
// likely the session is closed...need to catch this so that the root cause of failure is propagated
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
}

JmsUtils.closeMessageConsumer(messageConsumer);
throw e;
session.recover();
} catch (Exception e1) {
// likely the session is closed...need to catch this so that the root cause of failure is propagated
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
}

return null;
JmsUtils.closeMessageConsumer(messageConsumer);
throw e;
}

return null;
}, true);
}

Expand Down
Loading

0 comments on commit a5086a9

Please sign in to comment.