Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… #15910

Open
wants to merge 15 commits into
base: trunk
Choose a base branch
from

Conversation

edoardocomar
Copy link
Contributor

KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation

MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end.

Add test case simulating restarted task where the store is reinitialized with later OffsetSyncs and check that emitted Checkpoint do not rewind.

Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end.

Co-Authored-By: Adrian Preston prestona@uk.ibm.com

…nterrupt offset translation

MirrorCheckpointTask reloads the last checkpoint at start,
OffsetSyncStore stores OffsetSyncs before reading till end.

Add test case simulating restarted task where the store is reinitialized
with later OffsetSyncs and check that emitted Checkpoint do not rewind.

Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until
consumer group fully catches up once because the OffsetSyncStore store
is populated before reading to log end.

Co-Authored-By: Adrian Preston <prestona@uk.ibm.com>
@gharris1727
Copy link
Contributor

Hey @edoardocomar and @prestona thanks for the PR!

One of the reasons I thought this might require a KIP is because it requires additional permissions that the current MM2 doesn't need: If an operator has already configured ACLs such that MM2 has write permissions for the checkpoints topic but no read permissions, it could be operating today and then failing after an upgrade with this change. I don't know if that is a common configuration or even a recommended one, but it does seem possible in the wild.

Perhaps this can be configuration-less and backwards-compatible if we fallback to the old behavior if reading the checkpoints fails for any reason, including insufficient permissions.

Co-Authored-By: Adrioan Preston <prestona@uk.ibm.com>
@edoardocomar
Copy link
Contributor Author

Hi @gharris1727 we're now handling errors in loading the Checkpoints topic.
(we still have to add unit tests)

Specifically we tested with the not authorized to read case - which the existing KafkaBasedLog was not handling well.
At this current stage the task start would fail, which to us seems an improvement as it is detectable and actionable (expecting the change to be noted in the release notes).

This looks to us a better behavior than reverting to the old one in case of failure, as maintaining and testing two modes of operation seems too complex.

Do you still think we need a KIP - to introduce yet another config to choose between the old behavior (default) and the new one (arguably better in the eyes of this PR authors ...) ?

@gharris1727
Copy link
Contributor

@edoardocomar In general connectors do have to add a configuration like this eventually, because users have different tolerances for errors. Some users want the errors to cause the connector to become FAILED, so that they can see the exception in the REST API and retry it explicitly. Other users want the connector to retry internally infinitely, and not fail for any reason.

MM2 has a lot of operations that can fail, and virtually none of them cause the connector to fail. The reason for this is that MM2 has dedicated mode, where there isn't a REST API to surface errors or perform external retries, so external retries are very expensive. It is definitely something that could be fixed eventually with like a "strict mode"? configuration or similar. We've also considered ways to address this from the framework side, with retry policies and automatic restarts, but none of that has been fully designed or implemented yet.

I think we should not block this fix on solving that more general problem. If there is a permissions error loading the checkpoints, MM2 should log that, and then degrade gracefully to the current behavior. We can have a KIP that adds "strict mode" make this failure surface, to make this new permission required.

In practical terms, without a configuration and with the graceful degradation implementation, we can get this into 3.8.
If you're interested in the configuration, that will delay this feature until 4.0. I'm fine with either, but I think the current behavior has caused such considerable friction in the community that we should prefer a 3.8 release.

use previous OffsetSyncStore "pessimistic" load logic

Co-Authored-By: Adrian Preston <prestona@uk.ibm.com>
@edoardocomar
Copy link
Contributor Author

@gharris1727 thanks for your feedback. we've added another commit to allow for the old OffsetSyncs load behavior in case the task cannot read the checkpoints

@edoardocomar
Copy link
Contributor Author

testing results in the following scenario:

  • produce 10000 records
  • consume them 250 at a time
  • stop mm2
  • produce 10000 records
  • restart consuming
  • restart mm2

Emitted Checkpoints:

% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
  --topic source.checkpoints.internal \
  --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
   --from-beginning

NEW implementation with checkpoints read by Checkpoint task

Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=500, downstreamOffset=1, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=6250, downstreamOffset=5820, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=7250, downstreamOffset=7228, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=8750, downstreamOffset=8658, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=9500, downstreamOffset=9362, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=9750, downstreamOffset=9714, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=10000, downstreamOffset=10000, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=12250, downstreamOffset=11519, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=14500, downstreamOffset=14335, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=17500, downstreamOffset=17195, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18750, downstreamOffset=18603, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19500, downstreamOffset=19307, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19750, downstreamOffset=19659, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=20000, downstreamOffset=20000, metadata=}

NEW implementation with checkpoints FAILED read by Checkpoint task

Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=10000, downstreamOffset=10000, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=11250, downstreamOffset=10000, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16000, downstreamOffset=15919, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=17500, downstreamOffset=17349, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19000, downstreamOffset=18757, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19500, downstreamOffset=19461, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=20000, downstreamOffset=20000, metadata=}

Original implementation (prior to this PR)

Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=10000, downstreamOffset=10000, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=11250, downstreamOffset=10000, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16250, downstreamOffset=16249, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18000, downstreamOffset=17987, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19000, downstreamOffset=18768, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19750, downstreamOffset=19747, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=20000, downstreamOffset=20000, metadata=}

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @edoardocomar I definitely think this can be included in 3.8.

@edoardocomar
Copy link
Contributor Author

edoardocomar commented May 15, 2024

Hi @gharris1727 commit e33edd2 hopefully addresses most of your comments. Thanks for the quick feedback.

We also noticed that the loading of the checkpoints must complete before the task start method completes.
This to avoid the checkpointsPerConsumerGroup map to be accessed during the task active polls and not be completely initialized. So we moved that before scheduler.execute

Use a wrapper to the checkpointsPerGroup map to ensure thread safety on restart.

The callback rethrows the error as a way to stop the KafkaBasedLog looping forever in readtoEnd.
This occurs when unauthorized to READ from the topic, as KafkaBasedLog retries indefinitely.
Unauthorized to DESCRIBE instead fails immediately before fetching.

All such failures result in a warning log message about degraded offset translation.
@edoardocomar
Copy link
Contributor Author

Hi @gharris1727 we worked out the asynchronous loading using a wrapper to the checkpointsPerGroupMap.
however when testing with different level of authorizations to see the fallback behaviour,
the simplest approach was to have the callback rethrow. It's all encapsulated so it's not spoiling the task IMHO !

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is getting really close! I think the error handling is where it needs to be.

@prestona
Copy link

Hi @gharris1727, hopefully the latest commits address your review comments. Once again, really appreciate all your feedback and suggestions.

for consistency with OffsetSyncStore

CheckpointStore started debug/trace message
@edoardocomar
Copy link
Contributor Author

Hi @gharris1727 if you have the time, can you please have a look again ? thanks

@@ -271,4 +284,102 @@ private Map<TopicPartition, Checkpoint> assertCheckpointForTopic(
assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync");
return checkpoints;
}

@Test
public void testCheckpointsTaskRestartUsesExistingCheckpoints() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using "real checkpoints" generated by the first MirrorCheckpointTask to test the second MirrorCheckpointTask is not necessary, and you can use simulated checkpoints instead.

Reassigning variables and copy-pasting sections in tests is typo-prone and I think we can avoid it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - we fixed the reassignments.
We already load the OffsetSyncStore with different OffsetSync, but we think the CheckpointStore at restart of the task should contain the exact last checkpoint emitted by the previous instance of the task

Comment on lines 118 to 119
store.sync(tp, offset, offset);
assertSparseSyncInvariant(store, tp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this into backingStoreStart, so that it happens when initializationMustReadToEnd is properly initialized? Here it's relying on the default value set by construction, not the value passed into start.

Actually this seems to be the case in a lot of the tests here. Can you look through the tests, and whenever there are assertions or sync calls before start, apply the same fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - we can assert sync is only called after start

@edoardocomar
Copy link
Contributor Author

@gharris1727 please review, thanks

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think this is the last round. I only had a nit for the tests. Thank you @edoardocomar and @prestona so much for your patience!

assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: My IDE gives me warnings about this being an AutoCloseable used without a try-with-resources, and in this particular case, it was already in a try-with-resources.

I know that the FakeOffsetStore has nothing to leak if close is not called, but i'd like to prevent any future leaks, and prevent the IDE from nagging people about it in the future.

I would probably have made this change because the indeting was getting out-of-hand, maybe you get the same feeling? Would you consider having a FakeOffsetSyncStore constructor that takes a Consumer, and calls it during backingStoreStart?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gharris1727
I use IntelliJ too and saw the warning.
I could have used a @suppress annotation but I am very reluctant to make code less readable because of limited insight by linters. Similarly to make the fake store more complex.

Using try-with-resource with a local class results in horrible indentation as you said.
I don't share a strong worry of future leaks in testing - seems speculative to me.
In this instance unless you have very strong feelings, I'd really leave the test as-is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on another note, if you approve I'd also backport the fix to 3.7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have somewhat strong feelings, I wouldn't call them very strong. If someone noticed this IDE warning and created a ticket and a PR to fix it, I would review that. Am I going to make the build enforce this warning? No, but I have seen other situations where the warning did point out real resource leaks...

I just wanted to save the effort required to go and rework this later, and prevent this PR from introducing an easily avoidable warning. I agree with you about suppressing warnings, I don't think that is a healthy practice to have.

I just tried making this a try-with-resources and the indenting turned out fine. The body of backingStoreStart is at the exact same indentation as it is currently.

        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
            @Override
            void backingStoreStart() {
                // read a sync during startup
                sync(tp, 100, 200);
                assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 0));
                assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 100));
                assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 200));
            }
        }) {
            // no offsets exist and store is not started
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));

            // After the store is started all offsets are visible
            store.start(true);

            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
        }

Here's the Consumer alternative I thought about, which uses one less indentation level at the cost of a variable, a field, and two constructors:

       Consumer<FakeOffsetSyncStore> init = store -> {
            // read a sync during startup
            store.sync(tp, 100, 200);
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
        };
        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(init)) {
            // no offsets exist and store is not started
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));

            // After the store is started all offsets are visible
            store.start(true);

            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
        }

Either of these is preferable to having the warning or suppressing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you approve I'd also backport the fix to 3.7

I'm on the fence about that, leaning towards yes. I regret backporting KAFKA-12468 so far and introducing this issue, and I didn't communicate it properly to users. I think you can backport this once you have a full release note written that can be backported at the same time.

@edoardocomar
Copy link
Contributor Author

edoardocomar commented May 21, 2024

@gharris1727 I gave up and used the ugly try.
That warning is not occurring in every test... But I went all the way in OffsetSyncStoreTest as I prefer consistency to beauty.
Removed a couple of warnings in MirrorCheckpointTaskTest too in the second commit

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks @edoardocomar and @prestona!

I'll wait for the CI to stabilize on this before merging.

backingStore.start();
public void start(boolean initializationMustReadToEnd) {
this.initializationMustReadToEnd = initializationMustReadToEnd;
log.debug("OffsetSyncStore starting - must read to OffsetSync end = ", initializationMustReadToEnd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noticed one typo here, the log message actually doesn't print the boolean

@edoardocomar
Copy link
Contributor Author

Thanks @gharris1727

@edoardocomar
Copy link
Contributor Author

edoardocomar commented May 22, 2024

Hi @gharris1727 ... more about warnings. there are two Java21 compiler warnings that result in a compile failure

[2024-05-22T02:09:24.247Z] > Task :connect:mirror:compileJava
[2024-05-22T02:09:24.247Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15910@2/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:78: warning: [this-escape] possible 'this' escape before subclass is fully initialized
[2024-05-22T02:09:24.247Z]             store = createBackingStore(config, consumer, admin);
[2024-05-22T02:09:24.247Z]                                       ^
[2024-05-22T02:09:24.247Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15910@2/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:94: warning: [this-escape] previous possible 'this' escape happens here via invocation
[2024-05-22T02:09:24.247Z]                 (error, record) -> this.handleRecord(record),
[2024-05-22T02:09:24.247Z]                 ^
[2024-05-22T02:09:24.247Z] error: warnings found and -Werror specified
[2024-05-22T02:09:24.247Z] 1 error
[2024-05-22T02:09:24.247Z] 2 warnings
[2024-05-22T02:09:24.247Z] 
[2024-05-22T02:09:24.247Z] > Task :connect:mirror:compileJava FAILED

This failure is due to us having made the constructor of OffsetSyncStore public for consistency with CheckpointStore.
So we either annotate it with
@SuppressWarnings("this-escape")
or we revert the constructor to be package access.
(in either case I'd add a comment)

Do you have preferences ?

@gharris1727
Copy link
Contributor

@edoardocomar Since we're not substantially changing that class, I think it's acceptable to keep the old visibility or add the suppression, rather than fix the this-escape. It's up to you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants