New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format #24730
base: master
Are you sure you want to change the base?
Conversation
…void flush per record for csv format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with nits
} | ||
|
||
@Override | ||
public void flush() throws IOException { | ||
generator.flush(); | ||
stream.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generator.flush()
also flushes the stream, so the following stream.flush()
is redundant.
// Prevent Jackson's writeValue() method calls from closing the stream. | ||
mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); | ||
try { | ||
this.generator = csvWriter.createGenerator(stream, JsonEncoding.UTF8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Generator manages some resources with their own buffers so I think we should close
the generator during finish()
. The underlying CsvEncoder
uses a char buffer that jackson can recycle using a threadlocal pool.
Disabling AUTO_CLOSE_TARGET
will still prevent the closing of the underlying stream.
@robobario Thanks for your kind suggestions. PR is updated, PTAL~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @GOODBOY008 . The overall approach looks good, but we are missing two things:
- we need to find a better way to release the generator resources without closing the underlying stream (see my comment in code)
- we need a test(s) to verify the desired behavior, i.e. to check that writing a single record does not cause the data written into the .part file and that it only happens when either flush() or finish() get called.
} | ||
|
||
@Override | ||
public void finish() throws IOException { | ||
generator.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I am not sure this is safe, see:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java#L69-L70
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CsvGenerator honours AUTO_CLOSE_TARGET so generator.close() will flush the underlying Writer
but not close it.
It would be great to have unit tests proving this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add unit test to verify the desired behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@GOODBOY008 properly testing this behavior with a unit test might be tricky. I wrote a quick sketch of an integration test that you could consider making use of:
https://github.com/afedulov/flink/blob/fix-csv-flush-test/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
I did not spend much time coming up with proper assertions, you can surely come up with something more elegant.
Changes:
FLUSH_AFTER_WRITE_VALUE
ObjectWriter#writeValue(org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator, java.lang.Object)
instead ofObjectWriter#writeValue(java.io.OutputStream, java.lang.Object)
JsonGenerator