-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-40557: [C++] Use PutObject
request for S3 in OutputStream when only uploading small data
#41564
base: main
Are you sure you want to change the base?
Conversation
|
From the log output, it seems like the failing CI jobs are not related to this change. Correct me if I am wrong though. Should I rebase (in case the flaky tests are already fixed on main)? |
I think it’s worth rebasing to see? |
Rebased to current main, now waiting for the CI approval again :) |
Hmm, I'm not sure that is ok. Usually, when opening a file for writing, you expect the initial open to fail if the path cannot be written to. I have no idea how much code relies on that, but that's a common expectation due to how filesystems usually work (e.g. when accessing local storage). |
This isn’t guaranteed with the current implementation though? Putting a part, or completing a multipart upload, can fail in various ways? An obvious one would be a checksum failure. |
My point is that if the path cannot be written to, the error happens when opening the file, not later on. |
That is true. I guess the question is if |
The API docs generally do not go into that level of detail. However, it is a general assumption that a filesystem "looks like" a local filesystem API-wise. It is also much more convenient to get an error early, than after you have already "written" multiple megabytes of data to the file. A compromise would be to add a dedicated option in |
We can do that. I would propose that if the optimization is disabled, we directly use multi-part uploads (basically replicating the old behavior). I don't think it makes sense to explicitly issue a HeadBucket request because that will lead to minimum 4 requests with multi-part uploads then. (although we would only have 2 requests for small writes without the optimization compared to current |
That sounds reasonable to me. |
Just to note, issuing I think an optimization flag is appropriate as the behaviour is technically changing. Does it make sense to make the flag a somewhat generic one, rather than specific to this case? There are a few other optimizations that might also fall into the "more performant, slightly different semantics" category. If I was to contribute a PR to improve one of the linked areas, would we want to add a new specific flag for this case or bundle it under a single "optimized" flag? The upside would be that it becomes more configurable, whereas the downside is that the testing and support matrix explodes. Perhaps it's better to just have a single Edit: i guess this is only relevant for higher-level Python bindings, we'd still want internal flags for individual features. |
I added a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A minor question: would single-part only send during closing?
// Upload last part | ||
RETURN_NOT_OK(CommitCurrentPart()); | ||
} | ||
if (IsMultipartUpload()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this actually happens when part_size_
is large enough but not being uploaded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate more? I think we're calling EnsureReadyToFlushFromClose
just to flush the data if we close the stream, hence sometimes we still have data in-flight that has to be uploaded (the "last part" which we commit in this branch, or a dummy part if no data was written at all).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make it clear, since if part_size_
is larger than size-limit, it should be switch to multipart during "doWrite". During closing maybe it cannot limit the size-limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean if current_part_size_
> kPartUploadSize
can be true at the time we close the stream?
I think that is not possible (and also wasn't possible with the old impl). The invariant when Close()
is called is that current_part_size_
< kPartUploadSize
holds, because if we write to the stream and current_part_size_
>= kPartUploadSize
holds, we always directly upload a part in DoWrite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, maybe simplify to has_multi_part_upload_
or other is ok, but this also looks ok to me
Yes, we only issue the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've take a round and this general looks ok to me, but maybe other reviewers would have comments on styles
Status UploadPart(std::shared_ptr<Buffer> buffer) { | ||
return UploadPart(buffer->data(), buffer->size(), buffer); | ||
Status UploadUsingSingleRequest() { | ||
std::shared_ptr<Buffer> buf; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Besides, could we debug checks that multi-part will not called after single part is updated?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need another variable for that to save that we issued the single part upload. Considering this, not sure if is worth adding that, considering that we only issue a single part upload in Close()
anyways. (+ tests would complain that the object already exists if we would trigger both requests, independent in which order I would assume)
return Status::OK(); | ||
}; | ||
|
||
auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest& request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So just to make it clear, the async_result_callback
always get a UploadState
and uses it. And the sync_result_callback
never uses that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, previously in the sync case we also directly called AddCompletedPart
which just used the state
to fiddle around with its completed_parts
member. For a single upload request, I think we can omit this and hence do not need to modify the state at all.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
// So we instead default to application/octet-stream which is less misleading | ||
if (!req.ContentTypeHasBeenSet()) { | ||
req.SetContentType("application/octet-stream"); | ||
if (metadata == nullptr || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (metadata == nullptr || | |
if (!metadata || |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
if (metadata == nullptr || | ||
!metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) { | ||
// If we do not set anything then the SDK will default to application/xml | ||
// which confuses some tools (https://github.com/apache/arrow/issues/11934) | ||
// So we instead default to application/octet-stream which is less misleading | ||
request->SetContentType("application/octet-stream"); | ||
} else { | ||
RETURN_NOT_OK(SetObjectMetadata(metadata, request)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about swapping these clauses for easy to read?
if (metadata && metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) {
RETURN_NOT_OK(SetObjectMetadata(metadata, request));
} else {
request->SetContentType("application/octet-stream");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp/src/arrow/filesystem/s3fs.h
Outdated
/// for latency-sensitive applications, at the cost of the OutputStream may throwing an | ||
/// exception at a later stage (i.e. at writing or closing) if e.g. the bucket does not | ||
/// exist. | ||
bool sanitize_bucket_on_open = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should make this more general, to open up other potential optimizations?
/// Whether to allow file-open methods to return before the actual open
///
/// Enabling this true may reduce the latency of `OpenInputStream`, `OpenOutpuStream`,
/// and similar methods, by reducing the number of roundtrips necessary. It may also
/// allow usage of more efficient S3 APIs for small files.
/// The downside is that failure conditions such as attempting to open a file in a
/// non-existing bucket will only be reported when actual I/O is done (at worse,
/// when attempting to close the file).
bool allow_delayed_open = false;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I like this much more 👍
@@ -1197,6 +1199,19 @@ TEST_F(TestS3FS, OpenOutputStreamSyncWrites) { | |||
TestOpenOutputStream(); | |||
} | |||
|
|||
TEST_F(TestS3FS, OpenOutputStreamNoBucketSanitizationSyncWrites) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding more test methods for every combinatorial expansion, perhaps we should instead use a for
loop on the various tested parameter values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a struct which abstracts the parameter combinations. 7748824
cpp/src/arrow/filesystem/s3fs.cc
Outdated
@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re | |||
|
|||
template <typename ObjectRequest> | |||
struct ObjectMetadataSetter { | |||
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: naming conventions for constants
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type"; | |
static constexpr std::string_view kContentTypeKey = "Content-Type"; |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
} | ||
|
||
return Status::OK(); | ||
} | ||
|
||
Status CleanupAfterFlush() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be named CleanupAfterClose
?
cpp/src/arrow/filesystem/s3fs.cc
Outdated
@@ -1734,7 +1812,7 @@ class ObjectOutputStream final : public io::OutputStream { | |||
return Status::OK(); | |||
} | |||
|
|||
// Upload current buffer | |||
// Upload current buffer if we are above threshold for multi-part upload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is misleading: this always uploads the current buffer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, CommitCurrentPart
starts by calling CreateMultipartUpload
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I just wanted to make the point that we won't call this when we haven't accumulated enough data. I've clarified the comment. 7748824
cpp/src/arrow/filesystem/s3fs.cc
Outdated
if (current_part_ == nullptr) { | ||
// In case the stream is closed directly after it has been opened without writing | ||
// anything, we'll have to create an empty buffer. | ||
buf = Buffer::FromVector<uint8_t>({}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but std::make_shared<Buffer>("", 0)
looks simpler to me.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
template <typename RequestType, typename OutcomeType> | ||
static Result<OutcomeType> TriggerUploadRequest( | ||
const RequestType& request, const std::shared_ptr<S3ClientHolder>& holder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this declaration is actually necessary? i.e. TriggerUploadRequest
doesn't need to be a template method, it can be a regular overloaded method.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
template <typename RequestType, typename OutcomeType> | ||
using UploadResultCallbackFunction = | ||
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>, | ||
int32_t, OutcomeType outcome)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this signature more informative
template <typename RequestType, typename OutcomeType> | |
using UploadResultCallbackFunction = | |
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>, | |
int32_t, OutcomeType outcome)>; | |
template <typename RequestType, typename OutcomeType> | |
using UploadResultCallbackFunction = | |
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>, | |
int32_t part_number, OutcomeType outcome)>; |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>( | ||
std::move(req), sync_result_callback, async_result_callback, data, nbytes, | ||
owned_buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits: move more arguments
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>( | |
std::move(req), sync_result_callback, async_result_callback, data, nbytes, | |
owned_buffer); | |
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>( | |
std::move(req), std::move(sync_result_callback), std::move(async_result_callback), | |
data, nbytes, std::move(owned_buffer)); |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
return Upload<Aws::S3::Model::UploadPartRequest, Aws::S3::Model::UploadPartOutcome>( | ||
std::move(req), sync_result_callback, async_result_callback, data, nbytes, | ||
owned_buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: more arguments can be moved.
Sorry for delaying review! Would merge after other committers approve |
Rationale for this change
See #40557. The previous implementation would always issue multi part uploads which come with 3x RTT to S3 instead of just 1x RTT with a
PutObject
request.What changes are included in this PR?
Implement logic in the S3
OutputStream
to use aPutObject
request if data is below a certain threshold (5 MB) and the output stream is closed. If more data is written, a multi part upload is triggered. Note: Previously, opening the output stream was already expensive because theCreateMultipartUpload
request was triggered then. With this change opening the output stream becomes cheap, as we rather wait until some data is written to decide which upload method to use. This required some more state-keeping in the output stream class.Are these changes tested?
No new tests were added, as there are already tests for very small writes and very large writes, which will trigger both ways of uploading. Everything should therefore be covered by existing tests.
Are there any user-facing changes?
CreateMultipartUpload
request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated). Replicating the old behavior is not possible without sending another request which defeats the purpose of this performance optimization. I hope this is fine.