From 97d84b2dcf87145336edb3affe8ca19f7d5f9aa9 Mon Sep 17 00:00:00 2001 From: ravinarayansingh Date: Sat, 27 Sep 2025 19:28:47 -0700 Subject: [PATCH 1/2] Enhance `RestLookupService` with `SpoolingMarkedInputStream` for robust mark/reset support, add related tests. --- .../apache/nifi/lookup/RestLookupService.java | 8 +- .../lookup/SpoolingMarkedInputStream.java | 183 ++++++++++++++++++ .../TestRestLookupServiceMarkReset.java | 136 +++++++++++++ 3 files changed, 323 insertions(+), 4 deletions(-) create mode 100644 nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SpoolingMarkedInputStream.java create mode 100644 nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java index d1d8c6390e4c..ba79a794649d 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java @@ -63,7 +63,6 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.net.Proxy; @@ -382,9 +381,10 @@ public Optional lookup(Map coordinates, Map 0) { + final int r = replayIn.read(b, off, availableForReplay); + if (r > 0) { + totalRead += r; + absoluteReadPos += r; + } + } + + if (absoluteReadPos >= absoluteSpoolPos) { + // Finished replaying current window; close replay stream so subsequent reads come from source + closeReplay(); + } + } + + // Then, if more bytes requested, continue pulling from source and spooling them + if (totalRead < len) { + final int r = in.read(b, off + totalRead, len - totalRead); + if (r > 0) { + // Write the newly read bytes into the spool file to extend the replay window + spoolOut.write(b, off + totalRead, r); + spoolOut.flush(); + absoluteSpoolPos += r; + absoluteReadPos += r; + totalRead += r; + } + } + + return totalRead == 0 ? -1 : totalRead; + } + + private void ensureReplayOpenAt(final long position) throws IOException { + if (replayIn != null) { + replayIn.close(); + } + replayIn = new FileInputStream(spoolFile); + long skipped = 0L; + while (skipped < position) { + final long s = replayIn.skip(position - skipped); + if (s <= 0) { + // In case skip cannot advance, read and discard + final long remaining = position - skipped; + final int toRead = (int) Math.min(remaining, DEFAULT_COPY_BUFFER); + final byte[] buf = new byte[toRead]; + final int r = replayIn.read(buf); + if (r < 0) { + break; + } + skipped += r; + } else { + skipped += s; + } + } + } + + private void closeReplay() throws IOException { + if (replayIn != null) { + replayIn.close(); + replayIn = null; + } + } + + @Override + public void close() throws IOException { + IOException first = null; + try { + super.close(); + } catch (final IOException e) { + first = e; + } + try { + closeReplay(); + spoolOut.close(); + } catch (final IOException e) { + if (first == null) first = e; + } finally { + // Attempt to delete spool file + Files.deleteIfExists(spoolFile.toPath()); + } + if (first != null) throw first; + } +} diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java new file mode 100644 index 000000000000..c5cc825c50f6 --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.io.InputStream; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; + +@Timeout(10) +@ExtendWith(MockitoExtension.class) +class TestRestLookupServiceMarkReset { + + private MockWebServer mockWebServer; + private RestLookupService restLookupService; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private RecordReaderFactory recordReaderFactory; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private RecordReader recordReader; + + private static final String SERVICE_ID = RestLookupService.class.getSimpleName() + "MarkReset"; + private static final String READER_ID = RecordReaderFactory.class.getSimpleName() + "MarkReset"; + + @BeforeEach + void setUp() throws IOException, InitializationException { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + + TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class); + restLookupService = new RestLookupService(); + + when(recordReaderFactory.getIdentifier()).thenReturn(READER_ID); + runner.addControllerService(READER_ID, recordReaderFactory); + runner.addControllerService(SERVICE_ID, restLookupService); + + final String url = mockWebServer.url("/markreset").toString(); + runner.setProperty(restLookupService, RestLookupService.URL, url); + runner.setProperty(restLookupService, RestLookupService.RECORD_READER, READER_ID); + runner.enableControllerService(restLookupService); + } + + @AfterEach + void tearDown() throws IOException { + mockWebServer.close(); + } + + @Test + void testReaderUsesMarkResetBeyondBufferedStreamLimit() throws Exception { + // Large JSON body to exceed the default BufferedInputStream buffer (8192 bytes) + final int size = 20_000; + String sb = '{' + "\"data\":\"" + + "a".repeat(size) + + "\"}"; + + mockWebServer.enqueue(new MockResponse.Builder() + .code(HTTP_OK) + .body(sb) + .build()); + + // When the RecordReaderFactory is asked to create a reader, simulate mark/reset misuse + when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenAnswer(invocation -> { + final InputStream in = invocation.getArgument(1); + if (in.markSupported()) { + in.mark(1); // tiny read limit + final byte[] buffer = new byte[4096]; + long total = 0; + int read; + while ((read = in.read(buffer)) != -1) { + total += read; + if (total > 10_000) { // read beyond the default buffer 8192 + break; + } + } + // This reset would have failed before the fix when using BufferedInputStream over network stream + in.reset(); + } + return recordReader; + }); + + // Return a simple record at once + final RecordSchema schema = new SimpleRecordSchema(List.of(new RecordField("ok", RecordFieldType.BOOLEAN.getDataType()))); + final Record firstRecord = new MapRecord(schema, Map.of("ok", true)); + when(recordReader.nextRecord()).thenReturn(firstRecord, (Record) null); + + final Map coordinates = new LinkedHashMap<>(); + final Optional result = restLookupService.lookup(coordinates); + assertTrue(result.isPresent(), "Expected record to be present when reader performs mark/reset beyond buffer limit"); + } +} From 1bf5eeb3491bdb50ac9c591299affd8bf4a64fa1 Mon Sep 17 00:00:00 2001 From: ravinarayansingh Date: Wed, 1 Oct 2025 12:31:10 -0700 Subject: [PATCH 2/2] Replace `SpoolingMarkedInputStream` with `BufferedInputStream` in `RestLookupService`, update tests to validate behavior with buffer limits --- .../apache/nifi/lookup/RestLookupService.java | 10 +- .../lookup/SpoolingMarkedInputStream.java | 183 ------------------ .../TestRestLookupServiceMarkReset.java | 37 ++++ 3 files changed, 42 insertions(+), 188 deletions(-) delete mode 100644 nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SpoolingMarkedInputStream.java diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java index ba79a794649d..5d76857dbc90 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java @@ -63,6 +63,7 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.net.Proxy; @@ -210,7 +211,7 @@ public class RestLookupService extends AbstractControllerService implements Reco static final String MIME_TYPE_KEY = "mime.type"; static final String BODY_KEY = "request.body"; static final String METHOD_KEY = "request.method"; - + static final Integer INPUT_STREAM_BUFFER_SIZE = 8388608; // 8MB private static final List PROPERTY_DESCRIPTORS = List.of( URL, RECORD_READER, @@ -381,10 +382,9 @@ public Optional lookup(Map coordinates, Map 0) { - final int r = replayIn.read(b, off, availableForReplay); - if (r > 0) { - totalRead += r; - absoluteReadPos += r; - } - } - - if (absoluteReadPos >= absoluteSpoolPos) { - // Finished replaying current window; close replay stream so subsequent reads come from source - closeReplay(); - } - } - - // Then, if more bytes requested, continue pulling from source and spooling them - if (totalRead < len) { - final int r = in.read(b, off + totalRead, len - totalRead); - if (r > 0) { - // Write the newly read bytes into the spool file to extend the replay window - spoolOut.write(b, off + totalRead, r); - spoolOut.flush(); - absoluteSpoolPos += r; - absoluteReadPos += r; - totalRead += r; - } - } - - return totalRead == 0 ? -1 : totalRead; - } - - private void ensureReplayOpenAt(final long position) throws IOException { - if (replayIn != null) { - replayIn.close(); - } - replayIn = new FileInputStream(spoolFile); - long skipped = 0L; - while (skipped < position) { - final long s = replayIn.skip(position - skipped); - if (s <= 0) { - // In case skip cannot advance, read and discard - final long remaining = position - skipped; - final int toRead = (int) Math.min(remaining, DEFAULT_COPY_BUFFER); - final byte[] buf = new byte[toRead]; - final int r = replayIn.read(buf); - if (r < 0) { - break; - } - skipped += r; - } else { - skipped += s; - } - } - } - - private void closeReplay() throws IOException { - if (replayIn != null) { - replayIn.close(); - replayIn = null; - } - } - - @Override - public void close() throws IOException { - IOException first = null; - try { - super.close(); - } catch (final IOException e) { - first = e; - } - try { - closeReplay(); - spoolOut.close(); - } catch (final IOException e) { - if (first == null) first = e; - } finally { - // Attempt to delete spool file - Files.deleteIfExists(spoolFile.toPath()); - } - if (first != null) throw first; - } -} diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java index c5cc825c50f6..b3d136021133 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestRestLookupServiceMarkReset.java @@ -48,6 +48,7 @@ import static java.net.HttpURLConnection.HTTP_OK; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.when; @@ -133,4 +134,40 @@ void testReaderUsesMarkResetBeyondBufferedStreamLimit() throws Exception { final Optional result = restLookupService.lookup(coordinates); assertTrue(result.isPresent(), "Expected record to be present when reader performs mark/reset beyond buffer limit"); } + + @Test + void testLookupThrowsWhenMarkResetExceedsBufferSize() throws Exception { + // Create a body significantly larger than the configured buffer size (8 MB) + final int size = RestLookupService.INPUT_STREAM_BUFFER_SIZE + 50_000; + final String body = '{' + "\"data\":\"" + "x".repeat(size) + "\"}"; + + mockWebServer.enqueue(new MockResponse.Builder() + .code(HTTP_OK) + .body(body) + .build()); + + // Simulate a RecordReader that misuses mark/reset: sets a tiny read limit, + // reads far beyond the BufferedInputStream capacity, then attempts reset. + when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenAnswer(invocation -> { + final InputStream in = invocation.getArgument(1); + if (in.markSupported()) { + in.mark(1); // Tiny read limit + final byte[] buffer = new byte[8192]; + long total = 0; + int read; + while ((read = in.read(buffer)) != -1) { + total += read; + if (total > RestLookupService.INPUT_STREAM_BUFFER_SIZE + 10_000) { + break; + } + } + // This reset should fail since we read beyond the mark's readlimit and the buffer size + in.reset(); + } + return recordReader; // Not expected to be used due to exception + }); + + // Verify that the lookup wraps the reset failure into LookupFailureException + assertThrows(LookupFailureException.class, () -> restLookupService.lookup(new LinkedHashMap<>())); + } }