1+ /* *
2+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+ * SPDX-License-Identifier: Apache-2.0.
4+ */
5+ #pragma once
6+ #include < aws/core/http/HttpRequest.h>
7+ #include < aws/core/utils/Array.h>
8+ #include < aws/core/utils/StringUtils.h>
9+ #include < aws/core/utils/HashingUtils.h>
10+ #include < aws/core/utils/logging/LogMacros.h>
11+ #include < aws/core/utils/memory/stl/AWSStringStream.h>
12+ #include < aws/core/utils/memory/stl/AWSVector.h>
13+ #include < smithy/interceptor/Interceptor.h>
14+ #include < aws/core/client/ClientConfiguration.h>
15+ #include < aws/core/utils/Outcome.h>
16+ #include < aws/core/client/AWSError.h>
17+ #include < memory>
18+
19+ namespace smithy {
20+ namespace client {
21+ namespace features {
22+
23+ static const size_t AWS_DATA_BUFFER_SIZE = 65536 ;
24+ static const char * ALLOCATION_TAG = " ChunkingInterceptor" ;
25+ static const char * CHECKSUM_HEADER_PREFIX = " x-amz-checksum-" ;
26+
27+ template <size_t DataBufferSize = AWS_DATA_BUFFER_SIZE>
28+ class AwsChunkedStreamBuf : public std ::streambuf {
29+ public:
30+ AwsChunkedStreamBuf (Aws::Http::HttpRequest* request,
31+ const std::shared_ptr<Aws::IOStream>& stream,
32+ size_t bufferSize = DataBufferSize)
33+ : m_request(request),
34+ m_stream (stream),
35+ m_data(bufferSize)
36+ {
37+ assert (m_stream != nullptr );
38+ if (m_stream == nullptr ) {
39+ AWS_LOGSTREAM_ERROR (" AwsChunkedStream" , " stream is null" );
40+ }
41+ assert (m_request != nullptr );
42+ if (m_request == nullptr ) {
43+ AWS_LOGSTREAM_ERROR (" AwsChunkedStream" , " request is null" );
44+ }
45+ }
46+
47+ protected:
48+ int_type underflow () override {
49+ if (gptr () && gptr () < egptr ()) {
50+ return traits_type::to_int_type (*gptr ());
51+ }
52+
53+ // only read and write to chunked stream if the underlying stream
54+ // is still in a valid state and we have buffer space
55+ if (m_stream->good () && m_chunkingBufferPos >= m_chunkingBufferSize) {
56+ // Reset buffer for new data only when buffer is consumed
57+ m_chunkingBufferPos = 0 ;
58+ m_chunkingBufferSize = 0 ;
59+
60+ // Check if we have enough space for worst-case chunk (data + header + footer)
61+ size_t maxChunkSize = m_data.GetLength () + 20 ; // data + hex header + CRLF
62+ if (m_chunkingBufferSize + maxChunkSize <= m_chunkingBuffer.GetLength ()) {
63+ // Try to read in a 64K chunk, if we cant we know the stream is over
64+ m_stream->read (m_data.GetUnderlyingData (), m_data.GetLength ());
65+ size_t bytesRead = static_cast <size_t >(m_stream->gcount ());
66+ writeChunk (bytesRead);
67+
68+ // if we've read everything from the stream, we want to add the trailer
69+ // to the underlying stream
70+ if ((m_stream->peek () == EOF || m_stream->eof ()) && !m_stream->bad ()) {
71+ writeTrailerToUnderlyingStream ();
72+ }
73+ }
74+ }
75+
76+ // if the chunking buffer is empty there is nothing to read
77+ if (m_chunkingBufferPos >= m_chunkingBufferSize) {
78+ return traits_type::eof ();
79+ }
80+
81+ // Set up buffer pointers to read from chunking buffer
82+ size_t remainingBytes = m_chunkingBufferSize - m_chunkingBufferPos;
83+ size_t bytesToRead = std::min (remainingBytes, DataBufferSize);
84+
85+ setg (m_chunkingBuffer.GetUnderlyingData () + m_chunkingBufferPos,
86+ m_chunkingBuffer.GetUnderlyingData () + m_chunkingBufferPos,
87+ m_chunkingBuffer.GetUnderlyingData () + m_chunkingBufferPos + bytesToRead);
88+
89+ m_chunkingBufferPos += bytesToRead;
90+
91+ return traits_type::to_int_type (*gptr ());
92+ }
93+
94+ private:
95+ void writeTrailerToUnderlyingStream () {
96+ Aws::String trailer = " 0\r\n " ;
97+ if (m_request->GetRequestHash ().second != nullptr ) {
98+ trailer += " x-amz-checksum-" + m_request->GetRequestHash ().first + " :"
99+ + Aws::Utils::HashingUtils::Base64Encode (m_request->GetRequestHash ().second ->GetHash ().GetResult ()) + " \r\n " ;
100+ }
101+ trailer += " \r\n " ;
102+ if (m_chunkingBufferSize + trailer.length () <= m_chunkingBuffer.GetLength ()) {
103+ std::memcpy (m_chunkingBuffer.GetUnderlyingData () + m_chunkingBufferSize, trailer.c_str (), trailer.length ());
104+ m_chunkingBufferSize += trailer.length ();
105+ }
106+ }
107+
108+ void writeChunk (size_t bytesRead) {
109+ if (m_request->GetRequestHash ().second != nullptr ) {
110+ m_request->GetRequestHash ().second ->Update (reinterpret_cast <unsigned char *>(m_data.GetUnderlyingData ()), bytesRead);
111+ }
112+
113+ if (bytesRead > 0 ) {
114+ Aws::String chunkHeader = Aws::Utils::StringUtils::ToHexString (bytesRead) + " \r\n " ;
115+ size_t totalSize = chunkHeader.length () + bytesRead + 2 ;
116+ if (m_chunkingBufferSize + totalSize <= m_chunkingBuffer.GetLength ()) {
117+ std::memcpy (m_chunkingBuffer.GetUnderlyingData () + m_chunkingBufferSize, chunkHeader.c_str (), chunkHeader.length ());
118+ m_chunkingBufferSize += chunkHeader.length ();
119+ std::memcpy (m_chunkingBuffer.GetUnderlyingData () + m_chunkingBufferSize, m_data.GetUnderlyingData (), bytesRead);
120+ m_chunkingBufferSize += bytesRead;
121+ std::memcpy (m_chunkingBuffer.GetUnderlyingData () + m_chunkingBufferSize, " \r\n " , 2 );
122+ m_chunkingBufferSize += 2 ;
123+ }
124+ }
125+ }
126+
127+ // Buffer for chunked data plus overhead for HTTP chunked encoding headers, trailers, and safety margin
128+ Aws::Utils::Array<char > m_chunkingBuffer{DataBufferSize + 128 };
129+ size_t m_chunkingBufferSize{0 };
130+ size_t m_chunkingBufferPos{0 };
131+ Aws::Http::HttpRequest* m_request{nullptr };
132+ std::shared_ptr<Aws::IOStream> m_stream;
133+ Aws::Utils::Array<char > m_data;
134+ };
135+
136+ class AwsChunkedIOStream : public Aws ::IOStream {
137+ public:
138+ AwsChunkedIOStream (Aws::Http::HttpRequest* request,
139+ const std::shared_ptr<Aws::IOStream>& originalBody,
140+ size_t bufferSize = AWS_DATA_BUFFER_SIZE)
141+ : Aws::IOStream(&m_buf),
142+ m_buf (request, originalBody, bufferSize) {}
143+
144+ private:
145+ AwsChunkedStreamBuf<> m_buf;
146+ };
147+
148+ /* *
149+ * Interceptor that handles chunked encoding for streaming requests with checksums.
150+ * Wraps request body with chunked stream and sets appropriate headers.
151+ */
152+ class ChunkingInterceptor : public smithy ::interceptor::Interceptor {
153+ public:
154+ explicit ChunkingInterceptor (Aws::Client::HttpClientChunkedMode httpClientChunkedMode)
155+ : m_httpClientChunkedMode(httpClientChunkedMode) {}
156+ ~ChunkingInterceptor () override = default ;
157+
158+ ModifyRequestOutcome ModifyBeforeSigning (smithy::interceptor::InterceptorContext& context) override {
159+ auto request = context.GetTransmitRequest ();
160+
161+ if (!ShouldApplyChunking (request)) {
162+ return request;
163+ }
164+
165+ auto originalBody = request->GetContentBody ();
166+ if (!originalBody) {
167+ return request;
168+ }
169+
170+ // Set up chunked encoding headers for checksum calculation
171+ const auto & hashPair = request->GetRequestHash ();
172+ if (hashPair.second != nullptr ) {
173+ Aws::String checksumHeaderValue = Aws::String (CHECKSUM_HEADER_PREFIX) + hashPair.first ;
174+ request->DeleteHeader (checksumHeaderValue.c_str ());
175+ request->SetHeaderValue (Aws::Http::AWS_TRAILER_HEADER, checksumHeaderValue);
176+ request->SetTransferEncoding (Aws::Http::CHUNKED_VALUE);
177+
178+ if (!request->HasContentEncoding ()) {
179+ request->SetContentEncoding (Aws::Http::AWS_CHUNKED_VALUE);
180+ } else {
181+ Aws::String currentEncoding = request->GetContentEncoding ();
182+ if (currentEncoding.find (Aws::Http::AWS_CHUNKED_VALUE) == Aws::String::npos) {
183+ request->SetContentEncoding (Aws::String{Aws::Http::AWS_CHUNKED_VALUE} + " ," + currentEncoding);
184+ }
185+ }
186+
187+ if (request->HasHeader (Aws::Http::CONTENT_LENGTH_HEADER)) {
188+ request->SetHeaderValue (Aws::Http::DECODED_CONTENT_LENGTH_HEADER, request->GetHeaderValue (Aws::Http::CONTENT_LENGTH_HEADER));
189+ request->DeleteHeader (Aws::Http::CONTENT_LENGTH_HEADER);
190+ }
191+ }
192+
193+ auto chunkedBody = Aws::MakeShared<AwsChunkedIOStream>(
194+ ALLOCATION_TAG, request.get (), originalBody);
195+
196+ request->AddContentBody (chunkedBody);
197+ return request;
198+ }
199+
200+ ModifyResponseOutcome ModifyBeforeDeserialization (smithy::interceptor::InterceptorContext& context) override {
201+ return context.GetTransmitResponse ();
202+ }
203+
204+ private:
205+ bool ShouldApplyChunking (const std::shared_ptr<Aws::Http::HttpRequest>& request) const {
206+ // Use configuration setting to determine chunking behavior
207+ if (m_httpClientChunkedMode != Aws::Client::HttpClientChunkedMode::DEFAULT) {
208+ return false ;
209+ }
210+
211+ if (!request || !request->GetContentBody ()) {
212+ return false ;
213+ }
214+
215+ // Check if request has checksum requirements
216+ const auto & hashPair = request->GetRequestHash ();
217+ return hashPair.second != nullptr ;
218+ }
219+
220+ Aws::Client::HttpClientChunkedMode m_httpClientChunkedMode;
221+ };
222+
223+ } // namespace features
224+ } // namespace client
225+ } // namespace smithy
0 commit comments