Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format #24730

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

GOODBOY008
Copy link
Member

Changes:

  • Disable FLUSH_AFTER_WRITE_VALUE
  • Use ObjectWriter#writeValue(org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator, java.lang.Object) instead of ObjectWriter#writeValue(java.io.OutputStream, java.lang.Object)
  • Manual flush JsonGenerator

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 26, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link

@robobario robobario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with nits

}

@Override
public void flush() throws IOException {
generator.flush();
stream.flush();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generator.flush() also flushes the stream, so the following stream.flush() is redundant.

// Prevent Jackson's writeValue() method calls from closing the stream.
mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
try {
this.generator = csvWriter.createGenerator(stream, JsonEncoding.UTF8);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Generator manages some resources with their own buffers so I think we should close the generator during finish(). The underlying CsvEncoder uses a char buffer that jackson can recycle using a threadlocal pool.

Disabling AUTO_CLOSE_TARGET will still prevent the closing of the underlying stream.

@GOODBOY008
Copy link
Member Author

@robobario Thanks for your kind suggestions. PR is updated, PTAL~

Copy link

@robobario robobario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good, thanks

Copy link
Contributor

@afedulov afedulov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @GOODBOY008 . The overall approach looks good, but we are missing two things:

  • we need to find a better way to release the generator resources without closing the underlying stream (see my comment in code)
  • we need a test(s) to verify the desired behavior, i.e. to check that writing a single record does not cause the data written into the .part file and that it only happens when either flush() or finish() get called.

}

@Override
public void finish() throws IOException {
generator.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CsvGenerator honours AUTO_CLOSE_TARGET so generator.close() will flush the underlying Writer but not close it.

It would be great to have unit tests proving this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add unit test to verify the desired behavior.

Copy link
Contributor

@afedulov afedulov May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GOODBOY008 properly testing this behavior with a unit test might be tricky. I wrote a quick sketch of an integration test that you could consider making use of:
https://github.com/afedulov/flink/blob/fix-csv-flush-test/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
I did not spend much time coming up with proper assertions, you can surely come up with something more elegant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants