Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InputStreamSubscriber.transferTo(OutputStreamPublisher) optimization #4083

Closed
2 tasks
fanyang01 opened this issue Jun 9, 2023 · 4 comments
Closed
2 tasks
Labels
bug This issue is a bug. p2 This is a standard priority issue

Comments

@fanyang01
Copy link

Describe the feature

TL;DR: Making InputStreamSubscriber implement a specialized InputStream#transferTo(OutputStream out) method that detects whether out is an OutputStreamPublisher and directly passes the ByteBuffers to it if so (instead of copying them).

Currently, InputStreamSubscriber inherits the transferTo(OutputStream out) method from java.io.InputStream, which uses an internal buffer to copy data. It works with usual synchronous output streams, such as System.out or ByteArrayOutputStream, but not with asynchronous ones such as OutputStreamPublisher, which is used by BlockingOutputStreamAsyncRequestBody. The reason is that OutputStreamPublisher#write(byte[]) does not copy the buffer, but just passes it to the subscriber via ByteBuffer#wrap(byte[]). Therefore, the buffer may be modified by subsequent InputStream#read(byte[] buffer) before the subscriber reads it, which leads to unexpected behavior.

To avoid this issue, the simplest solution is to do something like out.write(in.readAllBytes()). This is memory-consuming for a long input stream.

Use Case

I want to open an S3 object as an InputStream in, read a header part of it, modify something, and then write the modified header plus the remaining unchanged content to a new S3 object using the putObject(BlockingOutputStreamAsyncRequestBody) interface. It is straightforward to use in.transferTo(out) to copy the unchanged content. But currently, I have to use out.write(in.readAllBytes()) instead for correctness.

That is to say, one has to be very careful when using BlockingOutputStreamAsyncRequestBody. I encountered some cases that the data written to S3 was malformed due to this issue. Unfortunately, such behavior is not well-documented.

Implementing this feature will make such use cases safer and more performant.

Proposed Solution

No response

Other Information

No response

Acknowledgements

  • I may be able to implement this feature request
  • This feature might incur a breaking change

AWS Java SDK version used

2.20.19

JDK version used

OpenJDK 64-Bit Server VM Corretto-17.0.3.6.1

Operating System and version

Darwin Kernel Version 22.5.0

@fanyang01 fanyang01 added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels Jun 9, 2023
@debora-ito debora-ito added needs-review This issue or PR needs review from the team. and removed needs-triage This issue or PR still needs to be triaged. labels Jun 22, 2023
@debora-ito
Copy link
Member

debora-ito commented Jul 18, 2023

@fanyang01 thank you for the report. We are considering this a bug.

@debora-ito debora-ito added bug This issue is a bug. p2 This is a standard priority issue and removed feature-request A feature should be added or improved. needs-review This issue or PR needs review from the team. labels Jul 18, 2023
@raelik
Copy link

raelik commented Jan 25, 2024

@fanyang01 I recently ran into the corruption issue you mentioned while trying to use BlockingOutputStreamAsyncRequestBody (which I'm now calling BOSARB). I have a bit of code that reads data from a JDBC ResultSet, and writes it directly to an OutputStream in various different formats (csv, json, etc). Trying to do that with BOSARB was an exercise in frustration. I ultimately pivoted to using BISARB and a PipedInputStream/PipedOutputStream pair with a separate thread writing to the output stream.

@agicquelamz
Copy link

We have made changes in V2.31.2 to copy byte arrays passed to BlockingOutputStreamAsyncRequestBody to eliminate the potential for concurrent access.

Copy link

This issue is now closed. Comments on closed issues are hard for our team to see.
If you need more assistance, please open a new issue that references this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. p2 This is a standard priority issue
Projects
None yet
Development

No branches or pull requests

4 participants