Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Python] Default value for CompressedInputStream kChunkSize might be too small #41604

Open
wjzhou opened this issue May 9, 2024 · 14 comments

Comments

@wjzhou
Copy link

wjzhou commented May 9, 2024

Describe the bug, including details regarding any error messages, version, and platform.

I'm using pyarrow.csv.open_csv to stream read a 15GB gz csv file over S3. The speed is unusable slow.

file_src = pyarrow_s3fs.open_input_stream(path_src)

read_options = pyarrow.csv.ReadOptions(block_size=5_000_000, encoding="latin1")
csv = pyarrow.csv.open_csv(
            file_src,
            read_options=read_options,
            parse_options=parse_options,
            convert_options=convert_options,
        )
for batch in self._csv:
    ... 

Here, even when I set the block_size=5_000_000, the reader are issuing 65K ranged read over S3.

This is bad for two reason:

  1. the S3 has cost for each get request ($0.0004/1k req)
  2. compute the authentication header etc has computation cost

After digging into the code, I find this kChunkSize is hard coded in CompressedInputStream

static const int64_t kChunkSize = 64 * 1024;

Currently, my workaround is to use buffered stream
file_src = pyarrow_s3fs.open_input_stream(path_src, buffer_size=10_000_000)

But it is not obvious from the doc. Could we set this value higher? Or at least add some doc to clarify the usage.

Component(s)

C++, Python

@amoeba amoeba changed the title Default value for CompressedInputStream kChunkSize might be too small [C++][Python] Default value for CompressedInputStream kChunkSize might be too small May 11, 2024
@mapleFU
Copy link
Member

mapleFU commented May 11, 2024

Maybe adding a buffer layer here is better: https://arrow.apache.org/docs/python/generated/pyarrow.input_stream.html#pyarrow.input_stream ?

CompressInput just managers the decompress, rather than do-buffer for physical io

@wjzhou-ep
Copy link

wjzhou-ep commented May 15, 2024

@mapleFU Thank you for provide the alternative.

The buffer_size parameter in pyarrow_s3fs.open_input_stream(path_src, buffer_size=10_000_000) will do the same thing as pyarrow_s3fs.open_input_stream.

My point is,

  1. It is not obvious from doc that once we are using gz file instead of plain csv file, the read speed would slow down by 10x, e.g. assuming 5ms latency with negligibale trasfering time, the 65K read will max out at 13M/s speed. For me, my reading slow down from around 100M/s to around 5M/s.
  2. The default 65K might be a little bit too small for nowaday computers, e.g., when not using gz file, the default batch size for csv streaming is 1M,
    int32_t block_size = 1 << 20; // 1 MB
    ,

@mapleFU
Copy link
Member

mapleFU commented May 15, 2024

The buffer_size parameter in pyarrow_s3fs.open_input_stream(path_src, buffer_size=10_000_000) will do the same thing as pyarrow_s3fs.open_input_stream.

Hmmm so the input buffer of s3fs is useless? It doesn't to underlying buffering

The default 65K might be a little bit too small for nowaday computers, e.g., when not using gz file, the default batch size for csv streaming is 1M,

I agree fixed size buffering is weird, but I think the CompressedInputStream's buffer-size is just "decompress-input-buffer-size" rather than "s3-io-size"

This can be add in C++ firstly and making a chunk-size as input argument asmax(kChunkSize, input-chunk-size)? Or maybe try again with a input stream wrapped with a buffer_size: https://arrow.apache.org/docs/python/generated/pyarrow.input_stream.html#pyarrow-input-stream or f = pa.BufferedInputStream(raw, buffer_size=1024*1024)?

@wjzhou
Copy link
Author

wjzhou commented May 15, 2024

set buffer_size=10_000_000 explicitly in pyarrow_s3fs.open_input_stream works, just the default is None

def _wrap_input_stream(self, stream, path, compression, buffer_size):
if buffer_size is not None and buffer_size != 0:
stream = BufferedInputStream(stream, buffer_size)

Acturally, after thinking of it, I think the acturally problem is when CompressedInputStream is in use, it will convert a
a upper layer Read(1_000_000, out) into multiply lower layer calling of Read(64 * 1024, out)
e.g.

Result<int64_t> Read(int64_t nbytes, void* out) {
   ...
  while (nbytes - total_read > 0 && decompressor_has_data) {
      ...
     ARROW_ASSIGN_OR_RAISE(decompressor_has_data, RefillDecompressed());
     ...
  }
}

Result<bool> RefillDecompressed() {
  ...
  RETURN_NOT_OK(EnsureCompressedData());
 ...
}

Status EnsureCompressedData() {
  ...
  raw_->Read(kChunkSize,
                       compressed_for_non_zero_copy_->mutable_data_as<void>()));
 // where kChunkSize is 64k
  ...
}

https://github.com/apache/arrow/blob/657c4faf21700c0899703a4759bde76235c38199/cpp/src/arrow/io/compressed.cc#L388C2-L408C4

Or maybe try again with a input stream wrapped with a buffer_size:
Once know the reason of the problem, the adding of BufferedInputStream is a good solution.

But for a new pyarrow user, it is hard to know that the pa.BufferedInputStream is needed.

For example, the following code is totally fine if the path_src is not a compressed file, it will issue one read to load the whole file

path_src = "file.csv"
with pyarrow_s3fs.open_input_stream(path_src) as f:
    buff = f.read()

However, if the path_src = "file.csv.gz", the user needs to add the BufferedInputStream wrapper in order to prevent the 64K ranged reading

@mapleFU
Copy link
Member

mapleFU commented May 15, 2024

So we should enhance the "csv" module, and adjust the io-size before opening the CompressedInputStream. We should do a minor refactor here:

static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(
auto fragment_scan_options,
GetFragmentScanOptions<CsvFragmentScanOptions>(
kCsvTypeName, scan_options.get(), format.default_fragment_scan_options));
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
if (fragment_scan_options->stream_transform_func) {
ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input));
}
const auto& path = source.path();

This also raised when it's a Json format. @pitrou @lidavidm should we change OpenCompressed to WrapCompressed() to wrap the input, or change it to OpenBufferedAndCompressed(std::optional<Compression>, std::optional<int64_t> blockSize)?

@pitrou
Copy link
Member

pitrou commented May 15, 2024

The bottom line is that CompressedInputStream::kChunkSize is inadequate for some input file backends (such as S3).

I think we should add a new InputStream method that advertises a preferred chunk size:

  /// \brief Return the preferred chunk size for reading at least `nbytes`
  ///
  /// Different file backends have different performance characteristics
  /// (especially on the latency / bandwidth spectrum).
  /// This method informs the caller on a well-performing read size
  /// for the given logical read size.
  ///
  /// Implementations of this method are free to ignore the input `nbytes`
  /// when computing the return value. The return value might be smaller,
  /// larger or equal to the input value.
  ///
  /// This method should be deterministic: multiple calls on the same object
  /// with the same input argument will return the same value. Therefore,
  /// calling it once on a given file should be sufficient.
  ///
  /// There are two ways for callers to use this method:
  /// 1) callers which support readahead into an internal buffer will
  ///    use the return value as a hint for their internal buffer's size;
  /// 2) callers which require exact read sizes will use the return value as
  ///    an advisory chunk size when reading.
  ///
  /// \param[in] nbytes the logical number of bytes desired by the caller
  /// \return an advisory physical chunk size, in bytes
  virtual int64_t preferred_read_size(int64_t nbytes) const;

Then the CompressedInputStream implementation can call preferred_read_size on its input to decide its compressed buffer size.

@felipecrv What do you think?

@mapleFU
Copy link
Member

mapleFU commented May 15, 2024

@pitrou Note that:

  1. If the compress is not enabled, the file_csv will be S3 <-- Buffered <-- Csv reader. Here it would provide block_size io size for csv reader
  2. Otherwise, if compress enabled, the file_csv will be S3 <-- Compress <-- Buffered <-- Csv reader. I prefer changing it to S3 <-- Buffered <-- Compress <-- Csv reader

@pitrou
Copy link
Member

pitrou commented May 15, 2024

  • Otherwise, if compress enabled, the file_csv will be S3 <-- Compress <-- Buffered <-- Csv reader. I prefer changing it to S3 <-- Buffered <-- Compress <-- Csv reader

Or we could even remove the additional buffering and have S3 <-- Compress <-- CSV reader. I have forgotten why I added the buffered layer in this case...

@felipecrv
Copy link
Contributor

Then the CompressedInputStream implementation can call preferred_read_size on its input to decide its compressed buffer size.

@felipecrv What do you think?

Could work well, but a more flexible alternative to this could be a Readable::Read version that can mutate the size requested by caller to match what was actually read. I've seen this idea in the protobuf stream interfaces [1]. Or a min/max constraint to let the implementation decide the best fit [2] size_t read(void* buffer, size_t minBytes, size_t maxBytes);

[1] https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/io/zero_copy_stream.h
[2] https://github.com/capnproto/capnproto/blob/d0c1ad5442831b1e441daa74622f3ea9f92a305c/c%2B%2B/src/kj/io.h#L41-L43

@pitrou
Copy link
Member

pitrou commented May 15, 2024

Or a min/max constraint to let the implementation decide the best fit [2] size_t read(void* buffer, size_t minBytes, size_t maxBytes);

That's another possibility, but it forces the caller to allocate maxBytes even if the result could be as small as minBytes.

Note that those are not either/or. We could start with one and add the other if needed/desired.

@mapleFU
Copy link
Member

mapleFU commented May 15, 2024

Any way, a block_size is config for input. Without compressed input, config would also being used. Wouldn;t we adapt by that?

@felipecrv
Copy link
Contributor

That's another possibility, but it forces the caller to allocate maxBytes even if the result could be as small as minBytes.

But the caller chooses maxBytes :) The existing Read(n, ptr) calls become Read(min_bytes, /*max_bytes=*/n, ptr) where min_bytes is the minimum amount of data that lets the caller make progress on whatever algorithm it's implementing.

The stream tries to fill buffer as much as possible if that is desirable like in the case of S3 calls.

Note that those are not either/or. We could start with one and add the other if needed/desired.

Which one you think is simpler?

@pitrou
Copy link
Member

pitrou commented May 15, 2024

But the caller chooses maxBytes :)

Right, but they have no idea what value could be reasonable. The caller could think "ok, I know that some filesystems really like large chunk sizes, so I'm going to allocate 100 MiB", only to get 64 kiB as result.

Or conversely, the caller could think "ok, let's be conservative so as not to waste memory unduly, so I'm going to allocate 512 kiB", and S3 will perform awfully.

The point here is to let the file implementation inform the caller before making any impacting decision (such as allocating a large memory area).

Which one you think is simpler?

Both are simple conceptually and should be simple implementation-wise. They have different implications though.

@mapleFU
Copy link
Member

mapleFU commented May 15, 2024

Anyway, I think shouldn't this:

  • Hint in S3 fs or local fs with different io-size
  • Using [S3 <-- Buffered] <-- Compress <-- Csv reader for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants