Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40557: [C++] Use PutObject request for S3 in OutputStream when only uploading small data #41564

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

OliLay
Copy link

@OliLay OliLay commented May 7, 2024

Rationale for this change

See #40557. The previous implementation would always issue multi part uploads which come with 3x RTT to S3 instead of just 1x RTT with a PutObject request.

What changes are included in this PR?

Implement logic in the S3 OutputStream to use a PutObject request if data is below a certain threshold (5 MB) and the output stream is closed. If more data is written, a multi part upload is triggered. Note: Previously, opening the output stream was already expensive because the CreateMultipartUpload request was triggered then. With this change opening the output stream becomes cheap, as we rather wait until some data is written to decide which upload method to use. This required some more state-keeping in the output stream class.

Are these changes tested?

No new tests were added, as there are already tests for very small writes and very large writes, which will trigger both ways of uploading. Everything should therefore be covered by existing tests.

Are there any user-facing changes?

  • Previously, we would fail when opening the output stream if the bucket doesn't exist. We inferred that by sending the CreateMultipartUpload request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated). Replicating the old behavior is not possible without sending another request which defeats the purpose of this performance optimization. I hope this is fine.

Copy link

github-actions bot commented May 7, 2024

⚠️ GitHub issue #40557 has been automatically assigned in GitHub to PR creator.

@OliLay
Copy link
Author

OliLay commented May 13, 2024

From the log output, it seems like the failing CI jobs are not related to this change. Correct me if I am wrong though. Should I rebase (in case the flaky tests are already fixed on main)?

@orf
Copy link

orf commented May 13, 2024

I think it’s worth rebasing to see?

@OliLay
Copy link
Author

OliLay commented May 14, 2024

Rebased to current main, now waiting for the CI approval again :)

@mapleFU mapleFU requested review from pitrou and felipecrv May 14, 2024 12:16
cpp/src/arrow/filesystem/s3fs.cc Outdated Show resolved Hide resolved
cpp/src/arrow/filesystem/s3fs_test.cc Show resolved Hide resolved
@github-actions github-actions bot added awaiting review Awaiting review awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels May 14, 2024
@pitrou
Copy link
Member

pitrou commented May 14, 2024

Previously, we would fail when opening the output stream if the bucket doesn't exist. We inferred that by sending the CreateMultipartUpload request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated).

Hmm, I'm not sure that is ok. Usually, when opening a file for writing, you expect the initial open to fail if the path cannot be written to. I have no idea how much code relies on that, but that's a common expectation due to how filesystems usually work (e.g. when accessing local storage).

@orf
Copy link

orf commented May 14, 2024

Previously, we would fail when opening the output stream if the bucket doesn't exist. We inferred that by sending the CreateMultipartUpload request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated).

Hmm, I'm not sure that is ok. Usually, when opening a file for writing, you expect the initial open to fail if the path cannot be written to. I have no idea how much code relies on that, but that's a common expectation due to how filesystems usually work (e.g. when accessing local storage).

This isn’t guaranteed with the current implementation though? Putting a part, or completing a multipart upload, can fail in various ways? An obvious one would be a checksum failure.

@pitrou
Copy link
Member

pitrou commented May 14, 2024

My point is that if the path cannot be written to, the error happens when opening the file, not later on.

@OliLay
Copy link
Author

OliLay commented May 14, 2024

My point is that if the path cannot be written to, the error happens when opening the file, not later on.

That is true. I guess the question is if arrow's OutputStream API makes an explicit guarantee that Open should throw if the target does not exist. My guess would be that you shouldn't built code upon this assumption if it isn't explicitly stated in arrow's API/docs (which it is not), but of course real-world usage deviates from that (Hyrum's Law).
But checking if the bucket exists would at least come with another 1x RTT to S3 and the goal of the PR was to reduce the amount of blocking calls to S3 to reduce overall latency. If we add another check here, we'll have a total 2x RTT to S3 for small uploads, which is better than the initial 3x RTT without this change, but still not optimal from a performance-view. (and we would probably have 4x RTT for multipart uploads)

@pitrou
Copy link
Member

pitrou commented May 14, 2024

That is true. I guess the question is if arrow's OutputStream API makes an explicit guarantee that Open should throw if the target does not exist. My guess would be that you shouldn't built code upon this assumption if it isn't explicitly stated in arrow's API/docs (which it is not), but of course real-world usage deviates from that (Hyrum's Law).

The API docs generally do not go into that level of detail. However, it is a general assumption that a filesystem "looks like" a local filesystem API-wise.

It is also much more convenient to get an error early, than after you have already "written" multiple megabytes of data to the file.

A compromise would be to add a dedicated option in S3Options, but of course the optimization would only benefit those users that enable the option.

@OliLay
Copy link
Author

OliLay commented May 14, 2024

A compromise would be to add a dedicated option in S3Options, but of course the optimization would only benefit those users that enable the option.

We can do that. I would propose that if the optimization is disabled, we directly use multi-part uploads (basically replicating the old behavior). I don't think it makes sense to explicitly issue a HeadBucket request because that will lead to minimum 4 requests with multi-part uploads then. (although we would only have 2 requests for small writes without the optimization compared to current main)
What do you think?

@pitrou
Copy link
Member

pitrou commented May 14, 2024

We can do that. I would propose that if the optimization is disabled, we directly use multi-part uploads (basically replicating the old behavior).

That sounds reasonable to me.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review awaiting committer review Awaiting committer review labels May 14, 2024
@orf
Copy link

orf commented May 14, 2024

Just to note, issuing HeadBucket doesn't guarantee that a write will succeed - there isn't really a good way to check without actually writing. A HeadObject on the key and failing on any 403 is probably ok though? However there are valid cases where you'd want to write to a key that your principal is not able to read from. HeadBucket also requires full s3:ListBucket permissions, policies that restrict listing to specific prefixes would need to be updated.

I think an optimization flag is appropriate as the behaviour is technically changing. Does it make sense to make the flag a somewhat generic one, rather than specific to this case?

There are a few other optimizations that might also fall into the "more performant, slightly different semantics" category. If I was to contribute a PR to improve one of the linked areas, would we want to add a new specific flag for this case or bundle it under a single "optimized" flag?

The upside would be that it becomes more configurable, whereas the downside is that the testing and support matrix explodes. Perhaps it's better to just have a single optimized=True flag, vs receiving bug reports when specifically optimize_put_object=True, optimize_delete_dir=False, optimize_move=True, optimize_delete=False, optimize_ensure_parents_exist=True, optimize_foobar=True are set?

Edit: i guess this is only relevant for higher-level Python bindings, we'd still want internal flags for individual features.

@OliLay
Copy link
Author

OliLay commented May 15, 2024

I added a sanitize_bucket_on_open_ flag to the S3Options, adjusted the logic and also instantiated tests with this flag enabled.
I guess the Python bindings can be tackled in a separate PR, right?

@OliLay OliLay requested a review from mapleFU May 15, 2024 07:38
Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor question: would single-part only send during closing?

cpp/src/arrow/filesystem/s3fs.cc Outdated Show resolved Hide resolved
cpp/src/arrow/filesystem/s3fs.cc Outdated Show resolved Hide resolved
cpp/src/arrow/filesystem/s3fs.cc Show resolved Hide resolved
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
}
if (IsMultipartUpload()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually happens when part_size_ is large enough but not being uploaded?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more? I think we're calling EnsureReadyToFlushFromClose just to flush the data if we close the stream, hence sometimes we still have data in-flight that has to be uploaded (the "last part" which we commit in this branch, or a dummy part if no data was written at all).

Copy link
Member

@mapleFU mapleFU May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make it clear, since if part_size_ is larger than size-limit, it should be switch to multipart during "doWrite". During closing maybe it cannot limit the size-limit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if current_part_size_ > kPartUploadSize can be true at the time we close the stream?
I think that is not possible (and also wasn't possible with the old impl). The invariant when Close() is called is that current_part_size_ < kPartUploadSize holds, because if we write to the stream and current_part_size_ >= kPartUploadSize holds, we always directly upload a part in DoWrite.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe simplify to has_multi_part_upload_ or other is ok, but this also looks ok to me

@OliLay
Copy link
Author

OliLay commented May 15, 2024

A minor question: would single-part only send during closing?

Yes, we only issue the PutObject request if we close the stream.

@OliLay OliLay requested a review from mapleFU May 15, 2024 10:48
Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've take a round and this general looks ok to me, but maybe other reviewers would have comments on styles

Status UploadPart(std::shared_ptr<Buffer> buffer) {
return UploadPart(buffer->data(), buffer->size(), buffer);
Status UploadUsingSingleRequest() {
std::shared_ptr<Buffer> buf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Besides, could we debug checks that multi-part will not called after single part is updated?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need another variable for that to save that we issued the single part upload. Considering this, not sure if is worth adding that, considering that we only issue a single part upload in Close() anyways. (+ tests would complain that the object already exists if we would trigger both requests, independent in which order I would assume)

return Status::OK();
};

auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest& request,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to make it clear, the async_result_callback always get a UploadState and uses it. And the sync_result_callback never uses that?

Copy link
Author

@OliLay OliLay May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, previously in the sync case we also directly called AddCompletedPart which just used the state to fiddle around with its completed_parts member. For a single upload request, I think we can omit this and hence do not need to modify the state at all.

// So we instead default to application/octet-stream which is less misleading
if (!req.ContentTypeHasBeenSet()) {
req.SetContentType("application/octet-stream");
if (metadata == nullptr ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (metadata == nullptr ||
if (!metadata ||

Comment on lines 1602 to 1610
if (metadata == nullptr ||
!metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) {
// If we do not set anything then the SDK will default to application/xml
// which confuses some tools (https://github.com/apache/arrow/issues/11934)
// So we instead default to application/octet-stream which is less misleading
request->SetContentType("application/octet-stream");
} else {
RETURN_NOT_OK(SetObjectMetadata(metadata, request));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about swapping these clauses for easy to read?

if (metadata && metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) {
  RETURN_NOT_OK(SetObjectMetadata(metadata, request));
} else {
  request->SetContentType("application/octet-stream");
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels May 15, 2024
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels May 16, 2024
/// for latency-sensitive applications, at the cost of the OutputStream may throwing an
/// exception at a later stage (i.e. at writing or closing) if e.g. the bucket does not
/// exist.
bool sanitize_bucket_on_open = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should make this more general, to open up other potential optimizations?

  /// Whether to allow file-open methods to return before the actual open
  ///
  /// Enabling this true may reduce the latency of `OpenInputStream`, `OpenOutpuStream`,
  /// and similar methods, by reducing the number of roundtrips necessary. It may also
  /// allow usage of more efficient S3 APIs for small files.
  /// The downside is that failure conditions such as attempting to open a file in a
  /// non-existing bucket will only be reported when actual I/O is done (at worse,
  /// when attempting to close the file).
  bool allow_delayed_open = false;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like this much more 👍

@@ -1197,6 +1199,19 @@ TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
TestOpenOutputStream();
}

TEST_F(TestS3FS, OpenOutputStreamNoBucketSanitizationSyncWrites) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding more test methods for every combinatorial expansion, perhaps we should instead use a for loop on the various tested parameter values?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a struct which abstracts the parameter combinations. 7748824

@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re

template <typename ObjectRequest>
struct ObjectMetadataSetter {
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: naming conventions for constants

Suggested change
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type";
static constexpr std::string_view kContentTypeKey = "Content-Type";

}

return Status::OK();
}

Status CleanupAfterFlush() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named CleanupAfterClose?

@@ -1734,7 +1812,7 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}

// Upload current buffer
// Upload current buffer if we are above threshold for multi-part upload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is misleading: this always uploads the current buffer, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, CommitCurrentPart starts by calling CreateMultipartUpload.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I just wanted to make the point that we won't call this when we haven't accumulated enough data. I've clarified the comment. 7748824

if (current_part_ == nullptr) {
// In case the stream is closed directly after it has been opened without writing
// anything, we'll have to create an empty buffer.
buf = Buffer::FromVector<uint8_t>({});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but std::make_shared<Buffer>("", 0) looks simpler to me.

Comment on lines 1885 to 1887
template <typename RequestType, typename OutcomeType>
static Result<OutcomeType> TriggerUploadRequest(
const RequestType& request, const std::shared_ptr<S3ClientHolder>& holder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this declaration is actually necessary? i.e. TriggerUploadRequest doesn't need to be a template method, it can be a regular overloaded method.

Comment on lines 1880 to 1883
template <typename RequestType, typename OutcomeType>
using UploadResultCallbackFunction =
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>,
int32_t, OutcomeType outcome)>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this signature more informative

Suggested change
template <typename RequestType, typename OutcomeType>
using UploadResultCallbackFunction =
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>,
int32_t, OutcomeType outcome)>;
template <typename RequestType, typename OutcomeType>
using UploadResultCallbackFunction =
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>,
int32_t part_number, OutcomeType outcome)>;

Comment on lines 1990 to 1992
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>(
std::move(req), sync_result_callback, async_result_callback, data, nbytes,
owned_buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits: move more arguments

Suggested change
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>(
std::move(req), sync_result_callback, async_result_callback, data, nbytes,
owned_buffer);
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>(
std::move(req), std::move(sync_result_callback), std::move(async_result_callback),
data, nbytes, std::move(owned_buffer));

Comment on lines 2038 to 2040
return Upload<Aws::S3::Model::UploadPartRequest, Aws::S3::Model::UploadPartOutcome>(
std::move(req), sync_result_callback, async_result_callback, data, nbytes,
owned_buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: more arguments can be moved.

@OliLay OliLay requested review from kou, pitrou and mapleFU May 17, 2024 07:39
@mapleFU
Copy link
Member

mapleFU commented May 24, 2024

Sorry for delaying review! Would merge after other committers approve

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants