Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16667 Avoid stale read in KRaftMigrationDriver #15918

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

mumrah
Copy link
Contributor

@mumrah mumrah commented May 10, 2024

When becoming the active KRaftMigrationDriver, there is another race condition similar to KAFKA-16171. This time, the race is due to a stale read from ZK. After writing to /controller and /controller_epoch, it is possible that a read on /migration is not linear with the writes that were just made. In other words, we get a stale read on /migration. This leads to an inability to sync metadata to ZK due to incorrect zkVersion on the migration Znode.

The non-linearizability of reads is in fact documented behavior for ZK, so we need to handle it.

To fix the stale read, this patch adds a write to /migration after updating /controller and /controller_epoch. This allows us to learn the correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER state.

This patch also adds a check on the current leader epoch when running certain events in KRaftMigrationDriver. Historically, we did not include this check because it is not necessary for correctness. Writes to ZK are gated on the /controller_epoch zkVersion, and RPCs sent to brokers are gated on the controller epoch. However, during a time of rapid failover, there is a lot of processing happening on the controller (i.e., full metadata sync to ZK and full UMRs sent to brokers), so it is best to avoid running events we know will fail.

There is also a small fix in here to improve the logging of ZK operations. The log message are changed to past tense to reflect the fact that they have already happened by the time the log message is created.

* Fix stale read on /migration when becoming active by first writing to it
* Don't run old events in the migration driver when we've seen a new epoch
* Fix tense of log message for writing things to ZK (logging happens after the write)
* New integration test for epoch check
@mumrah mumrah force-pushed the KAFKA-16667-consistent-migration-state branch from 8879745 to 2b28443 Compare May 10, 2024 16:14
Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL

Read operations in ZooKeeper are not linearizable since they can return potentially stale data. This is because a read in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a read. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are sequentially consistent, because read operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a sync before issuing a read. This too does not strictly guarantee up-to-date data because sync is not currently a quorum operation. To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than syncLimit * tickTime. Note that this is unlikely to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the sync is served by the “leader” with stale data, thereby allowing the following read to be stale as well. The stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a write) is performed before a read.

https://zookeeper.apache.org/doc/current/zookeeperInternals.html

Comment on lines +309 to +310
Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
zookeeper.shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this makes sense?

A call to .close() in EmbeddedZookeeper delegates to .shutdown(). In addition to a direct call to .close(), Utils.closeQuietly() checks for null and logs any exception in a catch-all.
If we want the exception to be thrown, .shutdown() is enough and closeQuietly can be removed, if instead we want to catch and log a possible exception, then .shutdown() needs to be removed.

Comment on lines +417 to +419
Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
zookeeper.shutdown()
if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about the double call to .shutdown().

Also, there is a gap in between the try-catch and try-finally blocks, where a throwable would take execution out of this method without closing these.

Since the only other test in this class – testControllerFailoverZkRace – also makes use of EmbeddedZookeeper and KafkaZkClient, it might be worth moving this to a common teardown method.

}, "waiting for topics to be created in ZK.")

assertTrue(topicClient1.createdTopics.nonEmpty, "Expect first leader to write some topics")
assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to write some topics")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to write some topics")
assertTrue(topicClient2.createdTopics.nonEmpty, "Expect second leader to write some topics")

val delta = new MetadataDelta(image)
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test against MetadataVersion.latestProduction() instead?

Comment on lines +247 to +249
if (afterState.loggableChangeSinceState(beforeState) || alwaysLog) {
log.info("{} in {} ns. Transitioned migration state from {} to {}",
name, durationNs, beforeState, afterState);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If alwaysLog == true, do we still want to log this if afterState.equals(beforeState)?

@@ -491,6 +536,13 @@ public void run() throws Exception {
return;
}

if (!curLeaderAndEpoch.equals(leaderAndEpoch)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are any of the previous actions/checks in this method worth running if the epoch is stale? Why not make also add MetadataChangeEvent to the set of events that should be skipped in this situation? Might need a different name than MigrationWriteEvent, but does the generalization make sense?

// between writes and reads on different ZNodes, we need to write something to the /migration ZNode to
// ensure we have the latest /migration zkVersion.
applyMigrationOperation("Updated migration state", state -> {
state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion, as it's not obvious how this leads to the unconditional update.

Suggested change
state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration
state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration via KafkaZkClient#retryRequestsUntilConnected

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants