diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java index d1d8c6390e4c..5d76857dbc90 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java @@ -211,7 +211,7 @@ public class RestLookupService extends AbstractControllerService implements Reco static final String MIME_TYPE_KEY = "mime.type"; static final String BODY_KEY = "request.body"; static final String METHOD_KEY = "request.method"; - + static final Integer INPUT_STREAM_BUFFER_SIZE = 8388608; // 8MB private static final List PROPERTY_DESCRIPTORS = List.of( URL, RECORD_READER, @@ -383,7 +383,7 @@ public Optional lookup(Map coordinates, Map { + final InputStream in = invocation.getArgument(1); + if (in.markSupported()) { + in.mark(1); // tiny read limit + final byte[] buffer = new byte[4096]; + long total = 0; + int read; + while ((read = in.read(buffer)) != -1) { + total += read; + if (total > 10_000) { // read beyond the default buffer 8192 + break; + } + } + // This reset would have failed before the fix when using BufferedInputStream over network stream + in.reset(); + } + return recordReader; + }); + + // Return a simple record at once + final RecordSchema schema = new SimpleRecordSchema(List.of(new RecordField("ok", RecordFieldType.BOOLEAN.getDataType()))); + final Record firstRecord = new MapRecord(schema, Map.of("ok", true)); + when(recordReader.nextRecord()).thenReturn(firstRecord, (Record) null); + + final Map coordinates = new LinkedHashMap<>(); + final Optional result = restLookupService.lookup(coordinates); + assertTrue(result.isPresent(), "Expected record to be present when reader performs mark/reset beyond buffer limit"); + } + + @Test + void testLookupThrowsWhenMarkResetExceedsBufferSize() throws Exception { + // Create a body significantly larger than the configured buffer size (8 MB) + final int size = RestLookupService.INPUT_STREAM_BUFFER_SIZE + 50_000; + final String body = '{' + "\"data\":\"" + "x".repeat(size) + "\"}"; + + mockWebServer.enqueue(new MockResponse.Builder() + .code(HTTP_OK) + .body(body) + .build()); + + // Simulate a RecordReader that misuses mark/reset: sets a tiny read limit, + // reads far beyond the BufferedInputStream capacity, then attempts reset. + when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenAnswer(invocation -> { + final InputStream in = invocation.getArgument(1); + if (in.markSupported()) { + in.mark(1); // Tiny read limit + final byte[] buffer = new byte[8192]; + long total = 0; + int read; + while ((read = in.read(buffer)) != -1) { + total += read; + if (total > RestLookupService.INPUT_STREAM_BUFFER_SIZE + 10_000) { + break; + } + } + // This reset should fail since we read beyond the mark's readlimit and the buffer size + in.reset(); + } + return recordReader; // Not expected to be used due to exception + }); + + // Verify that the lookup wraps the reset failure into LookupFailureException + assertThrows(LookupFailureException.class, () -> restLookupService.lookup(new LinkedHashMap<>())); + } +}