Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16702: Fix producer leaks in KafkaLog4jAppenderTest #15922

Merged
merged 3 commits into from May 14, 2024

Conversation

gharris1727
Copy link
Contributor

The tests testRealProducerConfigWithSyncSendShouldNotThrowException and testRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException create real producer instances, which are leaked when the test exits.

Instead, each test should be followed by a cleanup operation where the registered appender is removed and closed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@chia7712
Copy link
Contributor

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

@chia7712 I added the null check. I did notice that the append(LoggingEvent) method could also run into an NPE when called before activateOptions() or a IllegalStateException if called after close()

The first state doesn't appear to be possible, because activateOptions is called during initialization, before any append call. Only an improperly-initialized appender could cause an NPE. I didn't add a null check because it would never trigger in a real environment, but let me know if you think we should add a null check, or catch NullPointerException.

The IllegalStateException seems possible though, because log4j is using a copy-on-write array internally. I think that means that there's a short window after an appender is removed that it can still be accessed by other threads. I changed the implementation to catch it so that if another thread is using log4j at the same time, it won't accidentally get an exception.

@@ -349,8 +349,14 @@ protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
protected void append(LoggingEvent event) {
String message = subAppend(event);
LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
Future<RecordMetadata> response = producer.send(
new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8)));
Future<RecordMetadata> response;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice finding. one question: should this function log message instead of sending data by closed producer if closed is true? for example:

        if (closed) {
            LogLog.debug("sending to kafka by closed producer");
            return;
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The internal state of the producer is the source of truth on whether this is closed, not the closed variable. A race condition could still cause an IllegalStateException to occur even if there's an if-condition first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The internal state of the producer is the source of truth on whether this is closed, not the closed variable.

That makes sense. one more question: Is IllegalStateException caused only by "closed" producer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the javadoc for send:

     * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
     *                               when send is invoked after producer has been closed.

Idempotence and transactions are not in use for this producer. I am not aware of another cause of IllegalStateException that isn't documented, but the documentation could be wrong or there could be a bug, I have no idea.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit 8ac32d6 into apache:trunk May 14, 2024
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests Test fixes (including flaky tests)
Projects
None yet
2 participants