diff --git a/cmake/sdks.cmake b/cmake/sdks.cmake index 1c6dd8c8c98..dc601a49b81 100644 --- a/cmake/sdks.cmake +++ b/cmake/sdks.cmake @@ -2,7 +2,7 @@ include(sdksCommon) set(SDK_DEPENDENCY_BUILD_LIST "") -set(NON_GENERATED_CLIENT_LIST access-management text-to-speech core queues s3-encryption identity-management transfer) ## Manually generated code with a name mimicking client name +set(NON_GENERATED_CLIENT_LIST access-management text-to-speech core queues s3-encryption identity-management transfer transfer-crt) ## Manually generated code with a name mimicking client name if(REGENERATE_CLIENTS OR REGENERATE_DEFAULTS) message(STATUS "Checking for SDK generation requirements") diff --git a/cmake/sdksCommon.cmake b/cmake/sdksCommon.cmake index f622543374d..e127f73b7c8 100644 --- a/cmake/sdksCommon.cmake +++ b/cmake/sdksCommon.cmake @@ -171,6 +171,7 @@ list(APPEND SDK_TEST_PROJECT_LIST "s3-encryption:tests/aws-cpp-sdk-s3-encryption list(APPEND SDK_TEST_PROJECT_LIST "s3control:tests/aws-cpp-sdk-s3control-integration-tests") list(APPEND SDK_TEST_PROJECT_LIST "sqs:tests/aws-cpp-sdk-sqs-integration-tests") list(APPEND SDK_TEST_PROJECT_LIST "transfer:tests/aws-cpp-sdk-transfer-tests") +list(APPEND SDK_TEST_PROJECT_LIST "transfer-crt:tests/aws-cpp-sdk-transfer-crt-tests") list(APPEND SDK_TEST_PROJECT_LIST "text-to-speech:tests/aws-cpp-sdk-text-to-speech-tests,tests/aws-cpp-sdk-polly-sample") list(APPEND SDK_TEST_PROJECT_LIST "transcribestreaming:tests/aws-cpp-sdk-transcribestreaming-integration-tests") list(APPEND SDK_TEST_PROJECT_LIST "eventbridge:tests/aws-cpp-sdk-eventbridge-tests") diff --git a/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt b/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt new file mode 100644 index 00000000000..25fa9313aa1 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt @@ -0,0 +1,53 @@ +add_project(aws-cpp-sdk-transfer-crt + "High-level C++ SDK for file transfer to/from AWS S3 (CRT variant)" + aws-cpp-sdk-s3-crt + aws-cpp-sdk-core) + +file( GLOB TRANSFER_HEADERS "include/aws/transfer-crt/*.h" ) + +file( GLOB TRANSFER_SOURCE "source/transfer-crt/*.cpp" ) + +if(MSVC) + source_group("Header Files\\aws\\transfer-crt" FILES ${TRANSFER_HEADERS}) + source_group("Source Files\\transfer-crt" FILES ${TRANSFER_SOURCE}) +endif() + +file(GLOB ALL_TRANSFER_HEADERS + ${TRANSFER_HEADERS} +) + +file(GLOB ALL_TRANSFER_SOURCE + ${TRANSFER_SOURCE} +) + +file(GLOB ALL_TRANSFER + ${ALL_TRANSFER_HEADERS} + ${ALL_TRANSFER_SOURCE} +) + +set(TRANSFER_INCLUDES + "${CMAKE_CURRENT_SOURCE_DIR}/include/" + ) + +include_directories(${TRANSFER_INCLUDES}) + +if(USE_WINDOWS_DLL_SEMANTICS AND BUILD_SHARED_LIBS) + add_definitions("-DAWS_TRANSFER_EXPORTS") +endif() + +add_library(${PROJECT_NAME} ${ALL_TRANSFER}) +add_library(AWS::${PROJECT_NAME} ALIAS ${PROJECT_NAME}) + +target_include_directories(${PROJECT_NAME} PUBLIC + $ + $) +target_link_libraries(${PROJECT_NAME} PRIVATE ${PLATFORM_DEP_LIBS} ${PROJECT_LIBS}) + +set_compiler_flags(${PROJECT_NAME}) +set_compiler_warnings(${PROJECT_NAME}) + +setup_install() + +install (FILES ${ALL_TRANSFER_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/transfer-crt) + +do_packaging() diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/DownloadStream.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/DownloadStream.h new file mode 100644 index 00000000000..f5fb696240f --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/DownloadStream.h @@ -0,0 +1,96 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +// Used by the classes below to notify the receiver of low-level file errors. +using ErrorCallback = std::function; + +// Default size for the put buffer (which is bypassed when xsputn is used). +// Measurements in "The Linux Programming Interface" show that a minimum 4096B is +// required when O_SYNC is enabled. Use a larger value to aggregate small writes. +constexpr size_t DEFAULT_BUFSIZE = 1 << 20; + +// Helper class for DownloadStream. +// +// This implements only what DownloadStream needs: a simple, file-descriptor based streambuf. +// Hence many std::streambuf operations, such as seekoff/pos, are not supported. +// The expected use-case is that mostly xsputn(const char *, size_t) will be called. +// +// The ErrorCallback that is passed into the constructor is invoked when encountering a +// low-level write error, receiving a string describing the error cause (based on errno). +class FileDescriptorBuf : public std::streambuf { + public: + // Class does not own the file descriptor @fd - caller is responsible for closing it. + FileDescriptorBuf(int fd, ErrorCallback errorCallback, size_t bufsize = DEFAULT_BUFSIZE) + : fd_{fd}, errorCallback_{errorCallback}, buffer_{Aws::MakeUniqueArray(bufsize, "FdBuf")} { + setp(buffer_.get(), buffer_.get() + bufsize); + } + + protected: + int sync() override; + int overflow(int_type c) override; + std::streamsize xsputn(const char *data, std::streamsize datalen) override; + + private: + int fd_; + ErrorCallback errorCallback_; + Aws::UniqueArrayPtr buffer_; +}; + +// Download output stream class for a given @dstPath. +// +// This takes an Error Callback which gets invoked with descriptive error message when a failure +// occurs in either this class, or the contained FileDescriptorBuf. +// +// The constructor does the following: +// 1. Create any missing directory components of @dstPath. +// 2. Generate a temporary .partial file to write to. This file will be renamed into @dstPath +// upon successful completion, or removed on failure. The implementation uses mkostemp(3), +// which is the reason we are using a file-descriptor based backend. +// 3. Open a file descriptor to the temporary file and advise the kernel about its use. +// 4. Complete the construction of the iostream, using a FileDescriptorBuf as rdbuf. +// +// The Error Callback @ec may be invoked already before the constructor call returns. +// It is also invoked by the contained FileDescriptorBuf, and during close(). +class DownloadStream final : public std::iostream { + public: + // Create a DownloadStream for @dstPath, calling @ec if any failure happens. + // Enabling O_SYNC via @sync_always is optional, as it degrades download performance. + DownloadStream(const Aws::String &dstPath, ErrorCallback ec, bool sync_always = false); + ~DownloadStream(); + + // Set eof, close the temporary file and atomically rename it into @dstPath. + void close() noexcept; + + private: + void _error(Aws::String msg) { + setstate(std::ios::badbit); + errorCallback_(std::move(msg)); + } + + private: + const Aws::String dstPath_; + Aws::String dstTempPath_; + ErrorCallback errorCallback_; + + int fd_ = -1; + Aws::UniquePtr buf_; + std::mutex close_mutex_; +}; + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h new file mode 100644 index 00000000000..c7821a75047 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h @@ -0,0 +1,100 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +// WriteMetadata specifies blob metadata information. +// @uri: destination URI of the blob, in :/// format. +// @content_type: MIME type of the @uri content. +// @content_encoding: content encoding that was applied. +// @metadata: metadata key/value pairs. +// @tags: S3 object storage tagging key/value pairs. +struct WriteMetadata { + // Constructor for the default case - just create a blob at @uri. + explicit WriteMetadata(const Aws::String &uri) : WriteMetadata(uri, "", "") {} + + WriteMetadata(const Aws::String &uri, + const Aws::String &content_type, + const Aws::String &content_encoding, + const Aws::Map &metadata = {}) + : uri{uri}, + content_type{content_type}, + content_encoding{content_encoding}, + metadata{metadata} {} + + // Destination URI of the blob, in :/// format. + Aws::String uri; + + // Content-Type (MIME type) of @uri. + Aws::String content_type; + + // Content-Encoding (if any) of @uri. + Aws::String content_encoding; + + // Metadata key/value pairs. + Aws::Map metadata; + + // S3 Object Tagging key/value pairs (S3 objects only). + // These require s3:PutObjectTagging permissions on @uri, otherwise requests fail with 403. + // The tags also have to satisfy the following syntax restrictions and limits: + // * https://docs.aws.amazon.com/AmazonS3/latest/userguide/tagging-managing.html + // * https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html + // * https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/allocation-tag-restrictions.html + Aws::Map tags; +}; + +// ReadMetadata encapsulates the metadata associated with a given blob. +struct ReadMetadata { + // URI of the blob. + Aws::String uri; + + // Size of @path in bytes. + size_t size = 0; + + // Date/time the blob was last modified. + std::chrono::system_clock::time_point last_modified; + + // MIME type of the blob. + Aws::String content_type; + + // Indicates whether the data at @path is stored in compressed format (RFC 7231, 3.1.2.2). + Aws::String content_encoding; + + // ETag value. + Aws::String etag; + + // Metadata key/value pairs. + Aws::Map metadata; +}; + +static inline std::ostream &operator<<(std::ostream &os, const ReadMetadata &md) { + os << "ReadMetadata(\"" << md.uri << "\", " << md.size; + + time_t lm = std::chrono::system_clock::to_time_t(md.last_modified); + if (lm) { // Format: "Wed Jun 30 21:49:08 1993\n" - truncate before " 1993\n": + os << ", " << Aws::String{ctime(&lm), 19}; + } + if (!md.etag.empty()) os << ", " << md.etag; + if (!md.content_type.empty()) { + os << ", " << md.content_type; + if (!md.content_encoding.empty()) os << " (" << md.content_encoding << ")"; + } + if (!md.metadata.empty()) { + os << ","; + for (const auto &e : md.metadata) os << " " << e.first << "=" << e.second; + } + return os << ")"; +} + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h new file mode 100644 index 00000000000..af3695e68c6 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h @@ -0,0 +1,201 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +enum class TransferStatus : uint8_t { + // + // TransferHandle states: + // + // The three terminal states are CANCELED, FAILED, and COMPLETED. + // + // NOT_STARTED indicates that no S3CrtClient API call has been made yet. This means that no + // status/progress callbacks will be invoked. Unlike IN_PROGRESS, we do not have to wait for + // these when transitioning to FAILED or CANCELED. + // + // IN_PROGRESS indicates that an API call was made, which means we must wait until its final + // .shutdown_callback (Handle{Put,Get}ObjectResponse()) is called. The AWS SDK and aws-c-s3 + // expect all request data structures to still be alive until then (in particular the response + // body, which may be updated several times in order to record errors). + // + // FAILING is an intermediate state that is entered from IN_PROGRESS to record that a failure + // (e.g. local write error) occurred while the transfer was progressing. It will be converted + // into FAILED once the .shutdown_callback is called. + // + NOT_STARTED, // No S3CrtClient API call has been made yet. + IN_PROGRESS, // An API call was made, and the transfer is running. + FAILING, // A failure occurred while the transfer was running. + CANCELED, // Transfer was canceled. + FAILED, // Transfer failed. + COMPLETED, // Transfer completed successfully. +}; +Aws::String StatusToString(TransferStatus status); +Aws::OStream &operator<<(Aws::OStream &s, TransferStatus status); + +// Interface for interacting with asynchronous UPLOAD/DOWNLOAD transfers. +class TransferHandle final { + public: + // Upload to the destination specified in @md. + TransferHandle(const WriteMetadata &md, + const std::shared_ptr &context); + + // Download from @srcUri. + TransferHandle(const Aws::String &srcUri, + const std::shared_ptr &context); + + /* + * Thread-safe Getter methods (values set only at initialization time). + */ + + // Bucket/key part of blob location in Amazon S3. + Aws::String GetBucket() const; + Aws::String GetKey() const; + + // S3 storage tags (key/value pairs). + const Aws::String &GetTagging() const { return tagging_; } + + // Get the user-provided context that was passed at construction time. + std::shared_ptr GetContext() const { return m_context; } + + /* + * Get/set methods called after initialization that are based on atomic variables. + * These methods synchronize-with each other via the affected atomic variable. + */ + // Get/Set the CANCEL flag, which cancels any further processing. + bool ShouldContinue() const { return !m_cancel.load(); } + void Cancel() { m_cancel.store(true); } + + // Get/update the cumulative byte count transferred since start of the transfer. + uint64_t GetBytesTransferred() const { return m_bytesTransferred.load(); } + void UpdateBytesTransferred(uint64_t amount) { m_bytesTransferred.fetch_add(amount); } + + /* + * Getters/setters called after initialization that synchronize via @m_getterSetterLock. + */ + + // Return the blob metadata information (populated for both download and upload). + // Pre-condition: function may only be called if BytesTotalSizeHasBeenSet() returns true. + const ReadMetadata &GetReadMetadata() const { + std::lock_guard guard{m_getterSetterLock}; + assert(total_size_has_been_set_); + return rmd_; + } + + // Set the total size of the object being transferred. May be called at most once. + void SetBytesTotalSize(uint64_t size) { + std::lock_guard guard{m_getterSetterLock}; + assert(!total_size_has_been_set_); + total_size_has_been_set_ = true; + rmd_.size = size; + } + // Check whether SetBytesTotalSize has been called (see TransferManager::GetObject for details). + bool BytesTotalSizeHasBeenSet() const { return total_size_has_been_set_; } + + // Set the LastModified time of the blob. + void SetLastModified(const Aws::Utils::DateTime &lastDateTime) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.last_modified = lastDateTime.UnderlyingTimestamp(); + } + + // Set the ETag of the blob. + void SetETag(const Aws::String &etag) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.etag = etag; + } + + // Set the Content-Type of the blob. + void SetContentType(const Aws::String &contentType) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.content_type = contentType; + } + + // Set the Content-Encoding of the blob (e.g. "gzip" when compressing content). + void SetContentEncoding(const Aws::String &encoding) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.content_encoding = encoding; + } + + // Set the metadata key/value pairs associated with the blob. + void SetMetadata(const Aws::Map &metadata) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.metadata = metadata; + } + + // Get/set the last error that was encountered (if any). + // Check GetStatus() first, as the default value is S3Crt::S3CrtErrors::UNKNOWN. + const Aws::Client::AWSError GetLastError() const { + std::lock_guard guard{m_getterSetterLock}; + return m_lastError; + } + void SetError(const Aws::Client::AWSError &error) { + std::lock_guard guard{m_getterSetterLock}; + m_lastError = error; + } + + /* + * Modifiers that only synchronize via @m_statusLock. + */ + // Get current TransferStatus of this handle. + // Synchronizes-with UpdateStatus. + TransferStatus GetStatus() const; + + // Block on (internal) condition variable until handle reaches a 'finished' status. + // Synchronizes-with UpdateStatus. + void WaitUntilFinished() const; + + // Conditionally transition into @status. + void UpdateStatus(TransferStatus status); + + // Return true if @value equals one of the terminal states. + static bool IsFinishedStatus(TransferStatus value) { + return value == TransferStatus::COMPLETED || value == TransferStatus::FAILED || + value == TransferStatus::CANCELED; + } + + private: + std::atomic m_bytesTransferred{0}; + std::atomic m_cancel{false}; + const std::shared_ptr m_context{nullptr}; + // S3 storage tags (upload only). + const Aws::String tagging_; + + // Variables that are protected by @m_getterSetterLock: + mutable std::mutex m_getterSetterLock; + // Blob metadata (uri, size, ...) - used for both upload and download blobs. + ReadMetadata rmd_{}; + // Flag that indicates whether @rmd_.size has been initialized. + std::atomic total_size_has_been_set_{false}; + Aws::Client::AWSError m_lastError{S3Crt::S3CrtErrors::UNKNOWN, true}; + + // Variables that are protected by @m_statusLock: + mutable std::mutex m_statusLock; + mutable std::promise m_finishedSignal; + TransferStatus m_status = TransferStatus::NOT_STARTED; +}; + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h new file mode 100644 index 00000000000..520810d970d --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h @@ -0,0 +1,125 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +// Callback type used for updates. +using HandleUpdatedCallback = std::function &)>; + +// Helper structs to carry around a TransferHandle as part of the caller context. +struct UploadContext final : Aws::Client::AsyncCallerContext { + explicit UploadContext(std::shared_ptr th) : handle{std::move(th)} {} + std::shared_ptr handle; +}; + +struct DownloadContext final : Aws::Client::AsyncCallerContext { + explicit DownloadContext(std::shared_ptr th) : handle{std::move(th)} {} + + ~DownloadContext() { + if (!dstStreamOwnershipHasBeenTransferred) { + delete dstStream; + } + } + + std::shared_ptr handle; + bool dstStreamOwnershipHasBeenTransferred = false; + std::iostream *dstStream = nullptr; + std::streambuf *dstStreamBuf = nullptr; +}; + +// S3 TransferManager for the S3CrtClient. +// +// All public methods are non-blocking and return a pointer to an asynchronous TransferHandle. +class TransferManager final : public std::enable_shared_from_this { + public: + // Callbacks are invoked under the following conditions: + // - uploadProgressCallback: when the number of bytes-sent changes, + // - downloadProgressCallback: when the number of bytes-received changes, + // - statusChangedCallback: when the handle changes status. + // NOTE: code maintains the invariant that @statusChangedCallback + // is called at most once for a "finished" TransferStatus. + static std::shared_ptr Create( + std::shared_ptr s3Client, + const HandleUpdatedCallback &uploadProgressCallback, + const HandleUpdatedCallback &downloadProgressCallback, + const HandleUpdatedCallback &statusChangedCallback); + + // Upload from @srcPath or @srcStream to @md.uri via PutObjectAsync. + // If @srcStream is not set, open the input file at @srcPath. + std::shared_ptr UploadFile( + const Aws::String &srcPath, + const std::shared_ptr &srcStream, + const WriteMetadata &md, + const std::shared_ptr &context = nullptr); + + // Download from @srcUri to local @dstPath or @dstStreamBuf, via GetObjectAsync. + // If both @dstPath and @dstStreamBuf are specified, @dstStreamBuf is used. + std::shared_ptr DownloadFile( + const Aws::String &srcUri, + const Aws::String &dstPath, + std::streambuf *dstStreamBuf = nullptr, + const std::shared_ptr &context = nullptr); + + private: + // The constructor supports a "cancel all transfers when the first failure is encountered" + // optional policy, which is not exposed to the outside. It enforces the invariant that a + // bulk transfer succeeds only after all of its managed transfers have succeeded. + TransferManager(std::shared_ptr s3Client, + const HandleUpdatedCallback &uploadProgressCallback, + const HandleUpdatedCallback &downloadProgressCallback, + const HandleUpdatedCallback &statusChangedCallback, + bool cancel_on_first_failure = true); + + void PutObject(const std::shared_ptr &streamToPut, + const std::shared_ptr &handle); + + void GetObject(const std::shared_ptr &context, + const std::shared_ptr &handle); + + void HandlePutObjectResponse(const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::PutObjectRequest &, + const Aws::S3Crt::Model::PutObjectOutcome &, + const std::shared_ptr &); + + void HandleGetObjectResponse(const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::GetObjectRequest &, + Aws::S3Crt::Model::GetObjectOutcome, + const std::shared_ptr &); + + // Record that an error has occurred. + // Set FAILED state if no API call has been made yet, otherwise transition to FAILING state. + void Fail(std::shared_ptr handle, + std::string errorMsg, + std::string exceptionMsg = "FATAL ERROR"); + + void OnUploadProgress(const std::shared_ptr &handle); + void OnDownloadProgress(const std::shared_ptr &handle); + void OnStatusChanged(const std::shared_ptr &handle); + + private: + std::shared_ptr s3Client_; + // Cancel all new/pending transfers after the first failure (optional policy): + const bool cancel_on_first_failure_; + std::atomic failure_occurred_{false}; + + HandleUpdatedCallback uploadProgressCallback; + HandleUpdatedCallback downloadProgressCallback; + HandleUpdatedCallback statusChangedCallback; +}; + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp new file mode 100644 index 00000000000..cc066ee7008 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp @@ -0,0 +1,153 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include + +#ifndef _GNU_SOURCE /* mkostemp() */ +#define _GNU_SOURCE +#endif +#include +#include +#include +#include +#include +#include + +#include + +namespace Aws { +namespace TransferCrt { + +namespace { +// Return the parent directory of @path, or an empty string if not possible. +Aws::String ParentPath(const Aws::String &path) { + const size_t n = path.find_last_of(Aws::FileSystem::PATH_DELIM); + return n == Aws::String::npos ? "" : path.substr(0, n); +} +} // namespace + +/* + * FileDescriptorBuf methods. + */ +int FileDescriptorBuf::overflow(int_type c) { + return sync() == EOF ? EOF : (c == EOF ? 0 : sputc(c)); +} + +int FileDescriptorBuf::sync() { + if (pbase() && pptr() > pbase()) { + std::streamsize n = xsputn(pbase(), pptr() - pbase()); + if (n == EOF) { + return EOF; + } + pbump(-n); + } + return 0; +} + +// On failure, invoke the error callback with the description of the errno value. +// It also throws an ios::failure in case exceptions() has been called on the stream. +std::streamsize FileDescriptorBuf::xsputn(const char *data, std::streamsize datalen) { + for (std::streamsize written = 0, n = 0; written < datalen; written += n) { + n = ::write(fd_, data + written, datalen - written); + if (n < 0 && errno != EINTR && errno != EAGAIN) { + Aws::StringStream ss; + ss << "write error: " << ::strerror(errno); + errorCallback_(ss.str()); + return EOF; + } + } + return datalen; +} + +/* + * DownloadStream methods. + */ +DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, bool sync_always) + : std::iostream{nullptr}, + dstPath_{dstPath}, + dstTempPath_{dstPath + ".partial.XXXXXX"}, + errorCallback_{ec} { + const Aws::String parent_path = ParentPath(dstPath_); + Aws::StringStream ss; + + assert(!dstPath_.empty()); + assert(errorCallback_); + + // Generate any missing directory components. + if (!parent_path.empty() && !Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true)) { + ss << "Failed to create " << dstPath << " parent directories"; + _error(ss.str()); + return; + } + + // Produce unique temporary-file suffix. Use O_SYNC to ensure data gets written out to disk. + fd_ = ::mkostemp(&dstTempPath_[0], sync_always ? O_SYNC : 0); + if (fd_ < 0) { + ss << "Failed to create " << dstTempPath_ << ": " << ::strerror(errno); + _error(ss.str()); + return; + } + + // Advise the kernel that the data used by @fd_ will not be accessed in the near time. + if (::posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED) < 0) { + ss << "Failed to posix_fadvise('" << dstTempPath_ << "'): " << ::strerror(errno); + _error(ss.str()); + return; + } + + buf_ = Aws::MakeUnique("FdBuf", fd_, [this](Aws::String writeError) { + Aws::StringStream ss; + ss << "Failed to write " << dstTempPath_ << ": " << std::move(writeError); + _error(ss.str()); + return; + }); + rdbuf(buf_.get()); +} + +void DownloadStream::close() noexcept { + Aws::StringStream ss; + std::lock_guard closer(close_mutex_); + + if (eof()) { // Idempotent. + return; + } + + // Call rdbuf()->pubsync() one last time, to empty the put-buffer: + flush(); + + setstate(std::ios::eofbit); + + if (fd_ > 0 && ::close(fd_) < 0) { + ss << "Failed to close " << dstTempPath_ << ": " << ::strerror(errno); + setstate(std::ios::failbit); + _error(ss.str()); + } + fd_ = -1; + + if (bad()) { + _error("Stream is corrupt on close"); + } else if (!dstTempPath_.empty()) { + if (::rename(dstTempPath_.c_str(), dstPath_.c_str())) { + ss << "Failed to rename " << dstTempPath_ << ": " << ::strerror(errno); + _error(ss.str()); + } + dstTempPath_.clear(); + } +} + +DownloadStream::~DownloadStream() { + if (fd_ > 0) { + ::close(fd_); + } + + if (!dstTempPath_.empty()) { + ::unlink(dstTempPath_.c_str()); + } +} + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp new file mode 100644 index 00000000000..d18d62e0328 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp @@ -0,0 +1,110 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +namespace { +using KeyValue = std::pair; +// Encode @key_values as query-parameter string. +Aws::String encode_query_string(const Aws::Map &key_values) { + return std::accumulate(key_values.begin(), + key_values.end(), + Aws::String{}, + [](const Aws::String &prev, const KeyValue &cur) { + return (prev.empty() ? "" : prev + "&") + cur.first + + (cur.second.empty() ? "" : "=") + cur.second; + }); +} +} // namespace + +TransferHandle::TransferHandle(const WriteMetadata &md, + const std::shared_ptr &ctx) + : m_context{ctx}, tagging_{encode_query_string(md.tags)}, rmd_{} { + rmd_.uri = md.uri; + SetContentType(md.content_type); + SetContentEncoding(md.content_encoding); + SetMetadata(md.metadata); +} + +TransferHandle::TransferHandle(const Aws::String &srcUri, + const std::shared_ptr &ctx) + : m_context{ctx}, rmd_{} { + rmd_.uri = srcUri; +} + +Aws::String TransferHandle::GetBucket() const { + const size_t start = sizeof("s3://") - 1; + const size_t end = rmd_.uri.find('/', start); + return rmd_.uri.substr(start, end - start); +} + +Aws::String TransferHandle::GetKey() const { + const size_t bucket_start = sizeof("s3://") - 1; + const size_t bucket_end = rmd_.uri.find('/', bucket_start); + if (bucket_end == Aws::String::npos) { + return ""; + } + return rmd_.uri.substr(bucket_end + 1); +} + +void TransferHandle::UpdateStatus(TransferStatus value) { + // Release any pending per-chunk requests on failure. + if (value == TransferStatus::FAILING || value == TransferStatus::FAILED) { + Cancel(); + } + + std::unique_lock lock(m_statusLock); + assert(m_status != TransferStatus::FAILING || value == TransferStatus::FAILED); + + // The following ensures exactly one transition from "not finished" to "finished": + if (!IsFinishedStatus(m_status) && value != m_status) { + m_status = value; + if (IsFinishedStatus(value)) { + m_finishedSignal.set_value(); + } + } +} + +TransferStatus TransferHandle::GetStatus() const { + std::lock_guard lock(m_statusLock); + return m_status; +} + +void TransferHandle::WaitUntilFinished() const { + if (!IsFinishedStatus(GetStatus())) { + m_finishedSignal.get_future().wait(); + } +} + +Aws::String StatusToString(TransferStatus status) { + switch (status) { + case TransferStatus::NOT_STARTED: + return "NOT_STARTED"; + case TransferStatus::IN_PROGRESS: + return "IN_PROGRESS"; + case TransferStatus::FAILING: + return "FAILING"; + case TransferStatus::CANCELED: + return "CANCELED"; + case TransferStatus::FAILED: + return "FAILED"; + case TransferStatus::COMPLETED: + return "COMPLETED"; + } + return "UNKNOWN"; +} + +Aws::OStream &operator<<(Aws::OStream &s, TransferStatus status) { + return s << StatusToString(status); +} + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp new file mode 100644 index 00000000000..db9141be9f6 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp @@ -0,0 +1,392 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { +constexpr char CLASS_TAG[] = "TransferManager"; + +std::shared_ptr TransferManager::Create( + std::shared_ptr s3Client, + const HandleUpdatedCallback &up, + const HandleUpdatedCallback &down, + const HandleUpdatedCallback &statusChanged) { + return std::shared_ptr(new TransferManager{s3Client, up, down, statusChanged}); +} + +TransferManager::TransferManager(std::shared_ptr s3Client, + const HandleUpdatedCallback &uploadProgressCallback, + const HandleUpdatedCallback &downloadProgressCallback, + const HandleUpdatedCallback &statusChangedCallback, + bool cancel_on_first_failure) + : s3Client_{s3Client}, + cancel_on_first_failure_{cancel_on_first_failure}, + uploadProgressCallback{uploadProgressCallback}, + downloadProgressCallback{downloadProgressCallback}, + statusChangedCallback{statusChangedCallback} {} + +std::shared_ptr TransferManager::UploadFile( + const Aws::String &srcPath, + const std::shared_ptr &inputStream, + const WriteMetadata &md, + const std::shared_ptr &ctx) { + auto handle = Aws::MakeShared(CLASS_TAG, md, ctx); + Aws::StringStream ss; + + if (cancel_on_first_failure_ && failure_occurred_) { + handle->UpdateStatus(TransferStatus::CANCELED); + OnStatusChanged(handle); + return handle; + } + + std::shared_ptr srcStream{inputStream}; + if (!srcStream) { + srcStream = + Aws::MakeShared(CLASS_TAG, + srcPath, std::ios_base::in | std::ios_base::binary); + } + + if (!srcStream->good()) { + ss << "Failed to open stream '" << srcPath << "': " + << (errno ? ::strerror(errno) : "not in a good state"); + Fail(handle, ss.str()); + return handle; + } + + // Determine length by seeking to the end. + std::streampos cur = srcStream->tellg(); + std::streampos end = srcStream->rdbuf()->pubseekoff(0, std::ios_base::end); + srcStream->seekg(cur, std::ios_base::beg); + if (!srcStream->good() || cur < 0 || end < 0) { + ss << "Failed to determine size of '" << srcPath << "': " + << (errno ? ::strerror(errno) : "stream error"); + Fail(handle, ss.str()); + return handle; + } + handle->SetBytesTotalSize(end - cur); + + PutObject(srcStream, handle); + return handle; +} + +void TransferManager::PutObject(const std::shared_ptr &streamToPut, + const std::shared_ptr &handle) { + const auto &blobMetadata = handle->GetReadMetadata(); + auto putObjectRequest = Aws::S3Crt::Model::PutObjectRequest() + .WithBucket(handle->GetBucket()) + .WithKey(handle->GetKey()) + .WithContentLength(blobMetadata.size); + + // Grant the bucket owner full control. + putObjectRequest.SetACL(Aws::S3Crt::Model::ObjectCannedACL::bucket_owner_full_control); + putObjectRequest.SetContentType(blobMetadata.content_type); + if (!blobMetadata.content_encoding.empty()) { + putObjectRequest.SetContentEncoding(blobMetadata.content_encoding); + } + if (!handle->GetTagging().empty()) { + putObjectRequest.SetTagging(handle->GetTagging()); + } + + putObjectRequest.SetBody(streamToPut); + + // AmazonWebServiceRequest methods: + putObjectRequest.SetContinueRequestHandler( + [handle](const Aws::Http::HttpRequest *) { return handle->ShouldContinue(); }); + + // Keep transfer manager alive until all callbacks are finished: + auto self = shared_from_this(); + + putObjectRequest.SetDataSentEventHandler( + [self, handle](const Aws::Http::HttpRequest *, long long /*NOLINT*/ progress) { + handle->UpdateBytesTransferred(progress); + self->OnUploadProgress(handle); + }); + + auto callback = Aws::S3Crt::PutObjectResponseReceivedHandler{ + [self](const Aws::S3Crt::S3CrtClient *client, + const Aws::S3Crt::Model::PutObjectRequest &request, + const Aws::S3Crt::Model::PutObjectOutcome &outcome, + const std::shared_ptr &context) { + self->HandlePutObjectResponse(client, request, outcome, context); + }}; + + auto asyncContext = Aws::MakeShared(CLASS_TAG, handle); + + // Transition to IN_PROGRESS right before the API call is made. + // This is necessary since the body of PutObjectAsync() may call HandlePutObjectResponse(). + handle->UpdateStatus(TransferStatus::IN_PROGRESS); + OnStatusChanged(handle); + + s3Client_->PutObjectAsync(putObjectRequest, callback, asyncContext); +} + +void TransferManager::HandlePutObjectResponse( + const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::PutObjectRequest &, + const Aws::S3Crt::Model::PutObjectOutcome &outcome, + const std::shared_ptr &context) { + const auto &handle = std::dynamic_pointer_cast(context)->handle; + + switch (handle->GetStatus()) { + case TransferStatus::IN_PROGRESS: + if (outcome.IsSuccess()) { + handle->SetETag(outcome.GetResult().GetETag()); + handle->UpdateStatus(TransferStatus::COMPLETED); + } else { + handle->UpdateStatus(handle->ShouldContinue() ? TransferStatus::FAILED + : TransferStatus::CANCELED); + handle->SetError(outcome.GetError()); + } + break; + case TransferStatus::FAILING: + break; + default: + Fail(handle, "Invalid pre-final state " + StatusToString(handle->GetStatus())); + } + + if (handle->GetStatus() == TransferStatus::FAILING) { + handle->UpdateStatus(TransferStatus::FAILED); + } + OnStatusChanged(handle); +} + +std::shared_ptr TransferManager::DownloadFile( + const Aws::String &srcUri, + const Aws::String &dstPath, + std::streambuf *dstStreamBuf, + const std::shared_ptr &ctx) { + auto handle = Aws::MakeShared(CLASS_TAG, srcUri, ctx); + auto context = Aws::MakeShared(CLASS_TAG, handle); + + if (cancel_on_first_failure_ && failure_occurred_) { + handle->UpdateStatus(TransferStatus::CANCELED); + OnStatusChanged(handle); + return handle; + } + + if (dstStreamBuf) { + context->dstStream = new std::iostream(dstStreamBuf); + context->dstStreamBuf = dstStreamBuf; + } else { + // Note that the OnStatusChanged callback may be used before this function returns: + context->dstStream = new DownloadStream(dstPath, [handle, this](std::string msg) { + Fail(handle, std::move(msg), "DownloadStream Failure"); + }); + } + + // Creation of the DownloadStream may have failed, transitioning from NOT_STARTED => FAILED. + if (handle->GetStatus() != TransferStatus::FAILED) { + GetObject(context, handle); + } + return handle; +} + +// Perform the actual download. +void TransferManager::GetObject(const std::shared_ptr &context, + const std::shared_ptr &handle) { + auto getRequest = Aws::S3Crt::Model::GetObjectRequest() + .WithBucket(handle->GetBucket()) + .WithKey(handle->GetKey()); + + // Invoking the factory function below passes the @dstStream to a StandardHttpResponse object, + // whose ResponseStream owns and manages the pointer once the factory function is called. + // + // Unlike the Aws::Transfer::TransferManager, we are not allocating a new object here; instead, + // a pointer to the already allocated object is passed. It is ok to do this, since the lambda + // is only invoked once during the lifetime of the request - in InitCommonCrtRequestOption, when + // populating the response. In contrast to the Aws::Transfer::TransferManager, the CRT code does + // not use the factory function for retries (these are internally handled by aws-c-s3). + getRequest.SetResponseStreamFactory(Aws::IOStreamFactory([context]() { + context->dstStreamOwnershipHasBeenTransferred = true; + return context->dstStream; + })); + + // Keep transfer manager alive until all callbacks are finished: + auto self = shared_from_this(); + + getRequest.SetContinueRequestHandler( + [handle](const Aws::Http::HttpRequest *) { return handle->ShouldContinue(); }); + + getRequest.SetDataReceivedEventHandler([self, context, handle](const Aws::Http::HttpRequest *, + Aws::Http::HttpResponse *res, + long long /*NOLINT*/ amount) { + // + // Set the total size after the first chunk has been processed. + // + // The aws-c-s3 code populates the Content-Length header of the HttpResponse after the + // first part of the ranged-Get has completed. Extract total-size information from this. + if (!handle->BytesTotalSizeHasBeenSet() && handle->ShouldContinue() && + handle->GetStatus() == TransferStatus::IN_PROGRESS) { + std::string length_str; + + for (const auto &hdr : res->GetHeaders()) { + if (Aws::Utils::StringUtils::ToLower(hdr.first.c_str()) == "content-length") { + length_str = hdr.second; + } + } + if (length_str.empty()) { + self->Fail(handle, "Response lacks Content-Length header"); + } else { + char *end = nullptr; + const uint64_t content_length = std::strtoull(length_str.c_str(), &end, 0); + + if (end != nullptr && *end != '\0') { + self->Fail(handle, "Invalid Content-Length: " + length_str); + } else { + handle->SetBytesTotalSize(content_length); + } + } + } + + handle->UpdateBytesTransferred(amount); + self->OnDownloadProgress(handle); + }); + + auto callback = Aws::S3Crt::GetObjectResponseReceivedHandler{ + [self](const Aws::S3Crt::S3CrtClient *client, + const Aws::S3Crt::Model::GetObjectRequest &request, + Aws::S3Crt::Model::GetObjectOutcome outcome, + const std::shared_ptr &context) { + self->HandleGetObjectResponse(client, request, std::move(outcome), context); + }}; + + // Transition to IN_PROGRESS right before the API call is made. + // This is necessary since the body of GetObjectAsync() may call HandleGetObjectResponse(). + handle->UpdateStatus(TransferStatus::IN_PROGRESS); + OnStatusChanged(handle); + + s3Client_->GetObjectAsync(getRequest, callback, context); +} + +void TransferManager::HandleGetObjectResponse( + const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::GetObjectRequest &, + Aws::S3Crt::Model::GetObjectOutcome outcome, + const std::shared_ptr &context) { + auto ctx = std::dynamic_pointer_cast(context); + const auto &handle = ctx->handle; + + switch (handle->GetStatus()) { + case TransferStatus::IN_PROGRESS: + if (outcome.IsSuccess()) { + // At this stage, the total size should have been filled in. This is done by the + // aws-c-s3 code, passing the Content-Length header via the headers_callback to + // the HttpResponse (see GetObject() above). + if (!handle->BytesTotalSizeHasBeenSet()) { + // In the special case of an empty file, the aws-c-s3 code does not use the + // body_callback and so the DataReceivedEventHandler is also not called. + if (outcome.GetResult().GetContentLength() == 0) { + handle->SetBytesTotalSize(0); + } else { + Fail(handle, "Total size has not been filled in during transfer"); + break; + } + } else if (handle->GetReadMetadata().size != + static_cast(outcome.GetResult().GetContentLength())) { + Fail(handle, "Total size differs from Content-Length", "DATA CORRUPTED"); + break; + } + + // If the user specified a DownloadStream, close and rename the partial file here. + // In the error case, the DownloadStream destructor will remove the partial file. + DownloadStream *d = dynamic_cast(ctx->dstStream); + if (d != nullptr) { + // On error, the call to Fail() within d invokes UpdateStatus(FAILING), which + // completes before d->close() returns. This causes a transition to FAILING. + d->close(); + } else { + // Explicitly close the file buffer here in order to catch local write errors. + std::filebuf *fb = dynamic_cast(ctx->dstStreamBuf); + if (fb != nullptr && fb->close() == nullptr) { + Fail(handle, "Failed to close download stream - output likely corrupt"); + } + } + + // State may have changed due to calling Fail(). + if (handle->GetStatus() != TransferStatus::FAILING) { + handle->UpdateStatus(TransferStatus::COMPLETED); + handle->SetLastModified(outcome.GetResult().GetLastModified()); + handle->SetContentType(outcome.GetResult().GetContentType()); + handle->SetContentEncoding(outcome.GetResult().GetContentEncoding()); + handle->SetETag(outcome.GetResult().GetETag()); + } + } else { + handle->UpdateStatus(handle->ShouldContinue() ? TransferStatus::FAILED + : TransferStatus::CANCELED); + handle->SetError(outcome.GetError()); + } + break; + case TransferStatus::FAILING: + break; + default: + Fail(handle, "Invalid pre-final state " + StatusToString(handle->GetStatus())); + } + + if (handle->GetStatus() == TransferStatus::FAILING) { + handle->UpdateStatus(TransferStatus::FAILED); + } + OnStatusChanged(handle); +} + +void TransferManager::Fail(std::shared_ptr handle, + std::string msg, + std::string exceptionMsg) { + switch (handle->GetStatus()) { + case TransferStatus::NOT_STARTED: + handle->UpdateStatus(TransferStatus::FAILED); + break; + case TransferStatus::IN_PROGRESS: + handle->UpdateStatus(TransferStatus::FAILING); + break; + case TransferStatus::FAILING: + msg = handle->GetLastError().GetMessage() + "\n" + msg; + break; + case TransferStatus::FAILED: + case TransferStatus::COMPLETED: + case TransferStatus::CANCELED: + return; + } + handle->SetError({S3Crt::S3CrtErrors::UNKNOWN, std::move(exceptionMsg), std::move(msg), false}); + OnStatusChanged(handle); +} + +void TransferManager::OnStatusChanged(const std::shared_ptr &handle) { + if (cancel_on_first_failure_ && (handle->GetStatus() == TransferStatus::FAILING || + handle->GetStatus() == TransferStatus::FAILED)) { + failure_occurred_ = true; + } + if (statusChangedCallback) { + statusChangedCallback(handle); + } +} + +void TransferManager::OnUploadProgress(const std::shared_ptr &handle) { + // The progress callbacks are only called while the transfer is in progress. + // If another transfer has failed in the meantime, cancel this and all other ones. + if (cancel_on_first_failure_ && failure_occurred_) { + handle->Cancel(); + } + if (uploadProgressCallback) { + uploadProgressCallback(handle); + } +} + +void TransferManager::OnDownloadProgress(const std::shared_ptr &handle) { + if (cancel_on_first_failure_ && failure_occurred_) { + handle->Cancel(); + } + if (downloadProgressCallback) { + downloadProgressCallback(handle); + } +} + +} // namespace TransferCrt +} // namespace Aws diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt b/tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt new file mode 100644 index 00000000000..403b6d6b865 --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt @@ -0,0 +1,30 @@ +add_project(aws-cpp-sdk-transfer-crt-tests + "Tests for the AWS TransferManager (CRT version) of the C++ SDK" + aws-cpp-sdk-transfer-crt + aws-cpp-sdk-s3-crt + testing-resources + aws-cpp-sdk-core) + +# Headers are included in the source so that they show up in Visual Studio. +# They are included elsewhere for consistency. + +file(GLOB TRANSFER_TEST_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/*.cpp" +) + +if(MSVC AND BUILD_SHARED_LIBS) + add_definitions(-DGTEST_LINKED_AS_SHARED_LIBRARY=1) +endif() + +enable_testing() + +if(PLATFORM_ANDROID AND BUILD_SHARED_LIBS) + add_library(${PROJECT_NAME} ${TRANSFER_TEST_SRC}) +else() + add_executable(${PROJECT_NAME} ${TRANSFER_TEST_SRC}) +endif() + +set_compiler_flags(${PROJECT_NAME}) +set_compiler_warnings(${PROJECT_NAME}) + +target_link_libraries(${PROJECT_NAME} ${PROJECT_LIBS}) diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp new file mode 100644 index 00000000000..3e0ac411e11 --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp @@ -0,0 +1,323 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace Aws { +namespace TransferCrt { +namespace { + +TEST(Construction, FileDescriptorBuf) { + static_assert(!std::is_default_constructible{}, "Should not permit default construction"); + static_assert(!std::is_trivially_destructible{}, "Non-trivial destructor"); + static_assert(std::is_nothrow_destructible{}, "Should not throw during destruction"); + static_assert(!std::is_copy_constructible{}, "Should not have copy constructor"); + static_assert(!std::is_copy_assignable{}, "Should not permit copy assignment"); + static_assert(std::is_move_constructible{}, "Should be move-constructible"); + static_assert(std::is_move_assignable{}, "Should support move assignment"); + static_assert(!std::is_trivially_move_constructible{}, "Non-trivial move constructor"); + static_assert(!std::is_trivially_move_assignable{}, "Non-trivial move assignment"); +} + +TEST(Construction, DownloadStream) { + static_assert(!std::is_default_constructible{}, "Should not permit default construction"); + static_assert(!std::is_trivially_destructible{}, "Non-trivial destructor"); + static_assert(std::is_nothrow_destructible{}, "Should not throw during destruction"); + static_assert(!std::is_copy_constructible{}, "Should not have copy constructor"); + static_assert(!std::is_copy_assignable{}, "Should not permit copy assignment"); + static_assert(!std::is_move_constructible{}, "Should not be move-constructible"); + static_assert(!std::is_move_assignable{}, "Should not support move assignment"); +} + +namespace { +// Return the parent directory of @path, or an empty string if not possible. +Aws::String ParentPath(const Aws::String &path) { + const size_t n = path.find_last_of(Aws::FileSystem::PATH_DELIM); + return n == Aws::String::npos ? "" : path.substr(0, n); +} + +// Stolen from endpoint/BuiltInParameters.cpp. +bool StringEndsWith(const Aws::String& str, const Aws::String& suffix) { + if (suffix.size() > str.size()) + return false; + return std::equal(suffix.rbegin(), suffix.rend(), str.rbegin()); +} +} // namespace + +// Test fixture to help set up / tear down DownloadStream test cases. +class DownloadStreamtest : public ::testing::Test { + public: + static void SetUpTestCase() { + Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); + } + static void TearDownTestCase() { + Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); + } + + DownloadStreamtest() : dst_{GetTestFilesDirectory() + "/test.file"} {} + ~DownloadStreamtest() { (void)UnlinkTestFile(); } + + // Open up a file descriptor to @dst_, creating any missing directory components. + int TestFile() { + const Aws::String parent_path = ParentPath(dst_); + Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true); + return ::open(dst_.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600); + } + + // Remove @dst_ from the filesystem. + int UnlinkTestFile() { return ::unlink(dst_.c_str()); } + + // Return the contents of @dst_. + Aws::String TestFileContents() { + Aws::StringStream ss; + std::ifstream is{dst_}; + + is >> std::noskipws >> ss.rdbuf(); + return ss.str(); + } + + static Aws::String GetTestFilesDirectory() { + Aws::String directory; +#ifdef __ANDROID__ + directory = Aws::FileSystem::Join(Aws::Platform::GetCacheDirectory(), "TransferCrtTests"); +#else + directory = "TransferCrtTests"; +#endif // __ANDROID__ + + Aws::FileSystem::CreateDirectoryIfNotExists(directory.c_str()); + return directory; + } + + protected: + Aws::String dst_; + Aws::String test_data_{"the quick brown fox jumps over the lazy lackadaisical lapdog"}; +}; + +TEST_F(DownloadStreamtest, FdEnsureCallbackAndExceptionWork) { + Aws::String errMsg; + FileDescriptorBuf fdb(-1, [&errMsg](Aws::String e) { errMsg = std::move(e); }); + // ACTION + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), EOF); + // VERIFICATION + EXPECT_EQ(errMsg, "write error: Bad file descriptor"); +} + +TEST_F(DownloadStreamtest, FdSupportedMethods) { + // Test assumptions as to which methods are supported. Mostly VERIFICATION in this test. + Aws::String errMsg; + int fd = TestFile(); + FileDescriptorBuf fdb(fd, [&errMsg](Aws::String e) { errMsg = std::move(e); }); + char alternateBuf[128] = {0}; + + ASSERT_GT(fd, 2); + ASSERT_EQ(fdb.pubsync(), 0); + ASSERT_EQ(fdb.sputc('a'), 'a'); + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size()); + ASSERT_EQ(errMsg, ""); + + // Show that pubsetbuf has no effect on this class. + ASSERT_EQ(::lseek(fd, 0, SEEK_SET), 0); + ASSERT_EQ(::ftruncate(fd, 0), 0); + + ASSERT_EQ(fdb.pubsetbuf(alternateBuf, sizeof(alternateBuf)), &fdb); + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size()); + ASSERT_EQ(::close(fd), 0); + ASSERT_STREQ(alternateBuf, ""); // Nothing got transferred. + ASSERT_EQ(TestFileContents(), test_data_); // Wrote to fd, as intended. + + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, FdUnsupportedMethods) { + // Document which methods are not supported. Mostly VERIFICATION in this test. + Aws::String errMsg; + int fd = TestFile(); + FileDescriptorBuf fdb(fd, [&errMsg](Aws::String e) { errMsg = std::move(e); }); + char buf[3] = {0}; + + ASSERT_GT(fd, 2); + + ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::beg), -1); + ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::cur), -1); + ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::end), -1); + + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size()); + ASSERT_EQ(fdb.pubseekpos(0), -1); + ASSERT_EQ(fdb.pubseekpos(test_data_.size()), -1); + + ASSERT_EQ(fdb.in_avail(), 0); + ASSERT_EQ(fdb.snextc(), -1); + ASSERT_EQ(fdb.sbumpc(), -1); + ASSERT_EQ(fdb.sgetc(), -1); + ASSERT_EQ(fdb.sgetn(buf, sizeof(buf)), 0); + ASSERT_EQ(fdb.sputbackc('a'), -1); + ASSERT_EQ(fdb.sungetc(), -1); + + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, DownloadStreamHappyPath) { + // Document expected use cases in ACTION/VERIFICATION blocks. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + d << test_data_; + ASSERT_TRUE(d.good()); + ASSERT_FALSE(d.eof()); + + d.close(); + ASSERT_FALSE(d.bad()); + ASSERT_FALSE(d.fail()); + ASSERT_TRUE(d.eof()); + + ASSERT_EQ(TestFileContents(), test_data_); + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, SupportedMethodsShouldSucceed) { + // Document expected use-cases for supported methods in ACTION/VERIFICATION blocks. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + // These characters use the internal buffer (testing flush), for strings xsputn is called. + for (const char &c : test_data_) { + d.put(c); + } + ASSERT_TRUE(d.good()); + + d << test_data_; + ASSERT_TRUE(d.good()); + + d.write(test_data_.c_str(), test_data_.size()); + ASSERT_TRUE(d.good()); + + d.close(); + ASSERT_FALSE(d.fail()); + + // Call flush() after the file has been closed; to require that close() flushed the buffer. + d.flush(); + ASSERT_FALSE(d.fail()); + ASSERT_TRUE(d.eof()); + + ASSERT_EQ(TestFileContents(), test_data_ + test_data_ + test_data_); + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, DownloadStreamUnsupportedMethods) { + // Document unsupported methods via VERIFICATION (ASSERT) statements. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + d << test_data_; + ASSERT_TRUE(d.good()); + + ASSERT_EQ(d.get(), -1); + ASSERT_TRUE(d.fail() && !d.bad() && d.eof()); + + d.clear(); + ASSERT_EQ(d.peek(), -1); + ASSERT_FALSE(d.good()); + ASSERT_TRUE(d.eof()); + + d.clear(); + ASSERT_EQ(d.tellg(), -1); + ASSERT_TRUE(d.good()); + + d.seekg(0); + ASSERT_TRUE(d.fail() && !d.bad() && !d.eof()); + + d.clear(); + ASSERT_EQ(d.sync(), 0); + ASSERT_TRUE(d.good()); + ASSERT_TRUE(d.flush().good()); + + // Ensure the data is written out despite the failures in between: + d.close(); + ASSERT_EQ(TestFileContents(), test_data_); + + // It is now too late to flush any data: + ASSERT_TRUE(d.eof()); + ASSERT_EQ(d.sync(), -1); + ASSERT_TRUE(d.fail() && !d.bad() && d.eof()); + + // Flush proceeds without error, since the put buffer was empty: + d.clear(); + ASSERT_TRUE(d.flush().good()); + + ASSERT_EQ(errMsg, ""); + ASSERT_EQ(UnlinkTestFile(), 0); + + // Try flush again with non-empty put buffer: + DownloadStream d2{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + for (const char &c : test_data_) { + d2.put(c); + } + ASSERT_TRUE(d2.good()); + + d2.close(); + ASSERT_TRUE(!d2.fail() && !d2.bad() && d2.eof()); + + // The following is only false at eof since we empty the put-buffer in close(): + ASSERT_FALSE(d2.flush().bad()); + + ASSERT_EQ(TestFileContents(), test_data_); + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, EnsureOutputDoesNotExistIfStreamIsCorrupted) { + // When a stream is corrupted (badbit set), ensure that no output file is generated. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + d << test_data_; + ASSERT_TRUE(d.good()); + ASSERT_FALSE(d.eof()); + + // ACTION + d.setstate(std::ios::badbit); + d.close(); + + // VERIFICATION + ASSERT_EQ(d.rdstate(), std::ios::badbit | std::ios::eofbit | std::ios::failbit); + ASSERT_EQ(errMsg, "Stream is corrupt on close"); + ASSERT_EQ(::access(dst_.c_str(), F_OK), -1); +} + +TEST_F(DownloadStreamtest, PermissionsError) { + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + // ACTION + // Change the directory permissions so that renaming the file will fail: + ASSERT_EQ(::chmod(ParentPath(dst_).c_str(), 0), 0); + + d << test_data_; + ASSERT_TRUE(d.good()); + + d.close(); + ASSERT_TRUE(d.fail() && d.bad() && d.eof()); + + // VERIFICATION + EXPECT_TRUE(StringEndsWith(errMsg, "Permission denied")); + + // CLEAN-UP (need to restore directory permissions to enable deletion). + ASSERT_EQ(::chmod(ParentPath(dst_).c_str(), 755), 0); +} + +} // namespace +} // namespace TransferCrt +} // namespace Aws diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp new file mode 100644 index 00000000000..d8c76cc8bcc --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp @@ -0,0 +1,29 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include + +int main(int argc, char** argv) +{ + Aws::SDKOptions options; + options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Trace; + + AWS_BEGIN_MEMORY_TEST_EX(options, 1024, 128); + Aws::Testing::InitPlatformTest(options); + Aws::Testing::ParseArgs(argc, argv); + + Aws::InitAPI(options); + ::testing::InitGoogleTest(&argc, argv); + int exitCode = RUN_ALL_TESTS(); + Aws::ShutdownAPI(options); + + AWS_END_MEMORY_TEST_EX; + Aws::Testing::ShutdownPlatformTest(options); + return exitCode; +} diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp new file mode 100644 index 00000000000..fb268dfaf4b --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp @@ -0,0 +1,295 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include + +#include + +namespace Aws { +namespace TransferCrt { +namespace { +/* + * TransferHandle Tests. + */ +TEST(TransferHandleTest, DefaultValues) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + ASSERT_TRUE(th.ShouldContinue()); + ASSERT_FALSE(th.BytesTotalSizeHasBeenSet()); + ASSERT_EQ(th.GetBytesTransferred(), 0u); + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + ASSERT_EQ(th.GetLastError().GetErrorType(), S3Crt::S3CrtErrors::UNKNOWN); + ASSERT_EQ(th.GetLastError().GetMessage(), ""); + ASSERT_EQ(th.GetLastError().GetExceptionName(), ""); + ASSERT_TRUE(th.GetLastError().ShouldRetry()); +} + +TEST(TransferHandleTest, ReadMetadata) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + th.SetBytesTotalSize(42); + + // VERIFICATION + EXPECT_EQ(th.GetReadMetadata().size, 42u); + + // Validate default values: + EXPECT_NE(th.GetReadMetadata().content_encoding, "gzip"); + EXPECT_EQ(th.GetReadMetadata().content_type, ""); + EXPECT_EQ(th.GetReadMetadata().metadata.size(), 0u); +} + +TEST(TransferHandleTest, StateMachine) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + // Initial state. + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + + // NOT_STARTED => IN_PROGRESS + th.UpdateStatus(TransferStatus::IN_PROGRESS); + ASSERT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + + // IN_PROGRESS => NOT_STARTED + th.UpdateStatus(TransferStatus::NOT_STARTED); + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + + // NOT_STARTED => COMPLETED + th.UpdateStatus(TransferStatus::COMPLETED); + ASSERT_EQ(th.GetStatus(), TransferStatus::COMPLETED); +} + +TEST(TransferHandleTest, FailingATransfer) { + // Mimic a TransferHandle on which FailWithError is called shortly after construction. + TransferHandle th{"s3://some.bucket/some.path", {}}; + + // PRECONDITION + ASSERT_TRUE(th.ShouldContinue()); + + // ACTION + th.UpdateStatus(TransferStatus::FAILED); + th.SetError({S3Crt::S3CrtErrors::UNKNOWN, "FATAL ERROR", "Something went wrong", false}); + + // VERIFICATION + EXPECT_FALSE(th.ShouldContinue()); + EXPECT_EQ(th.GetStatus(), TransferStatus::FAILED); + EXPECT_EQ(th.GetLastError().GetErrorType(), S3Crt::S3CrtErrors::UNKNOWN); + EXPECT_EQ(th.GetLastError().GetMessage(), "Something went wrong"); + EXPECT_EQ(th.GetLastError().GetExceptionName(), "FATAL ERROR"); +} + +TEST(TransferHandleTest, UpdateStatusIsIdempotent) { + // NOT_STARTED is the initial state. Can not set it again. + { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::NOT_STARTED); + EXPECT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + } + + // Can update to IN_PROGRESS at most once. + { + TransferHandle th{"s3://some.bucket/some.path", {}}; + th.UpdateStatus(TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + } + + // May transition from IN_PROGRESS back to NOT_STARTED (not currently used by the code). + { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + + th.UpdateStatus(TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::NOT_STARTED); + EXPECT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + + th.UpdateStatus(TransferStatus::IN_PROGRESS); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + } + + // Once a final state is reached, no more state transitions are possible. + { + for (auto &&finalState : { + TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED, + }) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + th.UpdateStatus(finalState); + + for (auto &&testState : { + TransferStatus::NOT_STARTED, + TransferStatus::IN_PROGRESS, + TransferStatus::FAILING, + TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED, + }) { + th.UpdateStatus(testState); + EXPECT_EQ(th.GetStatus(), finalState); + } + } + } +} + +TEST(TransferHandleTest, Upload) { + // Validate handling of upload parameters. + WriteMetadata wmd{"s3://some.bucket/some.path"}; + TransferHandle th{wmd, {}}; + + // Need to initialize the ReadMetadata (size must be set in order to call GetReadMetadata()). + th.SetBytesTotalSize(42); + + // VERIFICATION + EXPECT_EQ(th.GetBucket(), "some.bucket"); + EXPECT_EQ(th.GetKey(), "some.path"); + EXPECT_EQ(th.GetReadMetadata().content_type, ""); + EXPECT_EQ(th.GetReadMetadata().content_encoding, ""); +} + +TEST(TransferHandleTest, UploadMetadata) { + // Validate encoding of non-ASCII metadata. + WriteMetadata wmd{"s3://some.bucket/some.path"}; + wmd.metadata = {{"src", "Âûröræ"}, {"dst", "ÄMÄZÕÑ S3"}, {"purpose", "upload"}}; + TransferHandle th{wmd, {}}; + + // Need to initialize the ReadMetadata (see above). + th.SetBytesTotalSize(0); + + // VERIFICATION + ASSERT_EQ(th.GetReadMetadata().metadata.size(), wmd.metadata.size()); + EXPECT_EQ(th.GetReadMetadata().metadata, wmd.metadata); +} + +TEST(TransferHandleTest, UploadGzipped) { + // Ensure compressed (gzip) upload is handled as expected. + WriteMetadata wmd{"s3://", "text/plain", "gzip"}; + TransferHandle th{wmd, {}}; + + th.SetBytesTotalSize(0); // Needed to initialize ReadMetadata. + + ASSERT_EQ(th.GetReadMetadata().content_type, "text/plain"); + ASSERT_EQ(th.GetReadMetadata().content_encoding, "gzip"); +} + +/* + * Parameterized test to check combinations of (terminal) TransferStatus states. + */ +class DownloadHandleFixture : public ::testing::TestWithParam { + protected: + TransferHandle th{"s3://some.bucket/some.path", {}}; +}; + +TEST_P(DownloadHandleFixture, TerminalStatesMustNotBeChanged) { + const TransferStatus cs = GetParam(); + + th.UpdateStatus(cs); + ASSERT_EQ(th.GetStatus(), cs); + + for (const auto &ts : {TransferStatus::NOT_STARTED, + TransferStatus::IN_PROGRESS, + TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED}) { + th.UpdateStatus(ts); + ASSERT_EQ(th.GetStatus(), cs); + if (ts != cs) { + ASSERT_NE(th.GetStatus(), ts); + } + } +} + +// Test the combinations of terminal states. +INSTANTIATE_TEST_SUITE_P(TerminalStatesTests, + DownloadHandleFixture, + ::testing::Values(TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED)); + +/* + * Limited TransferManager tests (full tests require network). + */ +class TransferManagerTest : public ::testing::Test {}; + +TEST_F(TransferManagerTest, AsyncCallerContext) { + auto ctx = std::make_shared(); + // Pre-condition + EXPECT_EQ(ctx.use_count(), 1); + { + // Context is passed as const std:shared_ptr<>&, but copy-constructed internally. + // Verify that the use-count behaves as expected. + TransferHandle th{"s3://some.bucket/some.path", ctx}; + EXPECT_EQ(ctx.use_count(), 2); + EXPECT_EQ(th.GetContext().use_count(), 3); + EXPECT_EQ(ctx.use_count(), 2); + } + EXPECT_EQ(ctx.use_count(), 1); +} + +TEST_F(TransferManagerTest, DownloadPaths) { + // Ensure that an initialization failure is caught and handled. + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + + // Attempt to create a file below an invalid path. + { + auto th = mgr->DownloadFile("s3://", "/a.path **that does not exist**!"); + + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "DownloadStream Failure"); + } +} + +TEST_F(TransferManagerTest, UploadPaths) { + WriteMetadata s3_uri{"s3://some.bucket/some.path"}; + + // Open a non-existing file. + { + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + auto th = mgr->UploadFile("/.no::such^path!", {}, s3_uri); + + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR"); + } + + // Pass a bad (closed) input stream. + { + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + auto is = std::make_shared("/bin/ls"); + is->close(); + + auto th = mgr->UploadFile("", is, s3_uri); + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetMessage(), "Failed to open stream '': Permission denied"); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR"); + } + + // Pass a non-seekable input stream. + { + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + auto is = std::make_shared(std::cout.rdbuf()); + auto th = mgr->UploadFile("", is, s3_uri); + + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetMessage(), "Failed to determine size of '': Illegal seek"); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR"); + } +} + +} // namespace +} // namespace TransferCrt +} // namespace Aws