Skip to content

Commit

Permalink
NIFI-12288 Improved Long and Integer handling in Utilities
Browse files Browse the repository at this point in the history
- Added explicit round in FormatUtils.makeWholeNumberTime()
- Removed unnecessary boxing in component descriptors
- Maintained long number tracking for releasable counts in Wait and Notify Processors
  • Loading branch information
exceptionfactory committed Oct 27, 2023
1 parent 5ecc12f commit ea5f9e4
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public static double getPreciseTimeDuration(final String value, final TimeUnit d
durationLong = Math.round(durationVal);
} else {
// Try reducing the size of the units to make the input a long
List wholeResults = makeWholeNumberTime(durationVal, specifiedTimeUnit);
List<?> wholeResults = makeWholeNumberTime(durationVal, specifiedTimeUnit);
durationLong = (long) wholeResults.get(0);
specifiedTimeUnit = (TimeUnit) wholeResults.get(1);
}
Expand Down Expand Up @@ -247,7 +247,8 @@ public static double getPreciseTimeDuration(final String value, final TimeUnit d
protected static List<Object> makeWholeNumberTime(double decimal, TimeUnit timeUnit) {
// If the value is already a whole number, return it and the current time unit
if (decimal == Math.rint(decimal)) {
return Arrays.asList(new Object[]{(long) decimal, timeUnit});
final long rounded = Math.round(decimal);
return Arrays.asList(new Object[]{rounded, timeUnit});
} else if (TimeUnit.NANOSECONDS == timeUnit) {
// The time unit is as small as possible
if (decimal < 1.0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public enum ProcessorStatusDescriptor {
"FlowFiles In (5 mins)",
"The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getInputCount())),
s -> (long) s.getInputCount()),

OUTPUT_BYTES(
"outputBytes",
Expand All @@ -71,14 +71,14 @@ public enum ProcessorStatusDescriptor {
"FlowFiles Out (5 mins)",
"The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getOutputCount())),
s -> (long) s.getOutputCount()),

TASK_COUNT(
"taskCount",
"Tasks (5 mins)",
"The number of tasks that this Processor has completed in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getInvocations())),
s -> (long) s.getInvocations()),

TASK_MILLIS(
"taskMillis",
Expand All @@ -100,35 +100,35 @@ public enum ProcessorStatusDescriptor {
"FlowFiles Removed (5 mins)",
"The total number of FlowFiles removed by this Processor in the last 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getFlowFilesRemoved())),
s -> (long) s.getFlowFilesRemoved()),

AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
"Average Lineage Duration (5 mins)",
"The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.",
Formatter.DURATION,
s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
int count = 0;

for (final StatusSnapshot snapshot : values) {
final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()).longValue();
final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()).longValue();
final long processed = removed + outputCount;

count += processed;

final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long totalMillis = avgMillis * processed;
millis += totalMillis;
}
new ValueReducer<>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
long count = 0;

for (final StatusSnapshot snapshot : values) {
final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor());
final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor());
final long processed = removed + outputCount;

count += processed;

final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor());
final long totalMillis = avgMillis * processed;
millis += totalMillis;
}

return count == 0 ? 0 : millis / count;
}
},
return count == 0 ? 0 : millis / count;
}
},
true
),

Expand All @@ -138,31 +138,31 @@ public Long reduce(final List<StatusSnapshot> values) {
"The average number of nanoseconds it took this Processor to complete a task, over the past 5 minutes",
Formatter.COUNT,
s -> s.getInvocations() == 0 ? 0 : s.getProcessingNanos() / s.getInvocations(),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long procNanos = 0L;
int invocations = 0;

for (final StatusSnapshot snapshot : values) {
final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
if (taskNanos != null) {
procNanos += taskNanos.longValue();
new ValueReducer<>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long procNanos = 0L;
int invocations = 0;

for (final StatusSnapshot snapshot : values) {
final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
if (taskNanos != null) {
procNanos += taskNanos;
}

final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
if (taskInvocations != null) {
invocations += taskInvocations.intValue();
}
}

final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
if (taskInvocations != null) {
invocations += taskInvocations.intValue();
if (invocations == 0) {
return 0L;
}
}

if (invocations == 0) {
return 0L;
return procNanos / invocations;
}

return procNanos / invocations;
}
},
},
true
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,50 +57,45 @@ public enum RemoteProcessGroupStatusDescriptor {
"Received Bytes Per Second",
"The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
s -> s.getReceivedContentSize().longValue() / 300L),
s -> s.getReceivedContentSize() / 300L),

SENT_BYTES_PER_SECOND(
"sentBytesPerSecond",
"Sent Bytes Per Second",
"The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
s -> s.getSentContentSize().longValue() / 300L),
s -> s.getSentContentSize() / 300L),

TOTAL_BYTES_PER_SECOND("totalBytesPerSecond",
"Total Bytes Per Second",
"The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
new ValueMapper<RemoteProcessGroupStatus>() {
@Override
public Long getValue(final RemoteProcessGroupStatus status) {
return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
}
}),
status -> (status.getReceivedContentSize() + status.getSentContentSize()) / 300L),

AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
"Average Lineage Duration (5 mins)",
"The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.",
Formatter.DURATION,
s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
int count = 0;

for (final StatusSnapshot snapshot : values) {
final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor()).longValue();
count += sent;

final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long totalMillis = avgMillis * sent;
millis += totalMillis;
new ValueReducer<>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
long count = 0;

for (final StatusSnapshot snapshot : values) {
final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor());
count += sent;

final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor());
final long totalMillis = avgMillis * sent;
millis += totalMillis;
}

return count == 0 ? 0 : millis / count;
}

return count == 0 ? 0 : millis / count;
}
});
});


private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static class Signal {
transient private AtomicCacheEntry<String, String, Object> cachedEntry;
private Map<String, Long> counts = new HashMap<>();
private Map<String, String> attributes = new HashMap<>();
private int releasableCount = 0;
private long releasableCount = 0;

public Map<String, Long> getCounts() {
return counts;
Expand Down Expand Up @@ -110,11 +110,11 @@ public long getCount(final String counterName) {
return count != null ? count : 0;
}

public int getReleasableCount() {
public long getReleasableCount() {
return releasableCount;
}

public void setReleasableCount(int releasableCount) {
public void setReleasableCount(long releasableCount) {
this.releasableCount = releasableCount;
}

Expand Down Expand Up @@ -155,7 +155,8 @@ public <E> void releaseCandidates(final String counterName, final long requiredC
}
}

int releaseCount = Math.min(releasableCount, candidateSize);
// Convert to integer for list index sizing
final int releaseCount = Math.toIntExact(Math.min(releasableCount, candidateSize));
released.accept(candidates.subList(0, releaseCount));
waiting.accept(candidates.subList(releaseCount, candidateSize));

Expand Down
Loading

0 comments on commit ea5f9e4

Please sign in to comment.