New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16667 Avoid stale read in KRaftMigrationDriver #15918
base: trunk
Are you sure you want to change the base?
KAFKA-16667 Avoid stale read in KRaftMigrationDriver #15918
Conversation
* Fix stale read on /migration when becoming active by first writing to it * Don't run old events in the migration driver when we've seen a new epoch * Fix tense of log message for writing things to ZK (logging happens after the write) * New integration test for epoch check
8879745
to
2b28443
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL
Read operations in ZooKeeper are not linearizable since they can return potentially stale data. This is because a read in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a read. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are sequentially consistent, because read operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a sync before issuing a read. This too does not strictly guarantee up-to-date data because sync is not currently a quorum operation. To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than syncLimit * tickTime. Note that this is unlikely to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the sync is served by the “leader” with stale data, thereby allowing the following read to be stale as well. The stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a write) is performed before a read.
https://zookeeper.apache.org/doc/current/zookeeperInternals.html
Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") | ||
zookeeper.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this makes sense?
A call to .close()
in EmbeddedZookeeper
delegates to .shutdown()
. In addition to a direct call to .close()
, Utils.closeQuietly()
checks for null and logs any exception in a catch-all.
If we want the exception to be thrown, .shutdown()
is enough and closeQuietly
can be removed, if instead we want to catch and log a possible exception, then .shutdown()
needs to be removed.
Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") | ||
zookeeper.shutdown() | ||
if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here about the double call to .shutdown()
.
Also, there is a gap in between the try-catch and try-finally blocks, where a throwable would take execution out of this method without closing these.
Since the only other test in this class – testControllerFailoverZkRace – also makes use of EmbeddedZookeeper
and KafkaZkClient
, it might be worth moving this to a common teardown method.
}, "waiting for topics to be created in ZK.") | ||
|
||
assertTrue(topicClient1.createdTopics.nonEmpty, "Expect first leader to write some topics") | ||
assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to write some topics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to write some topics") | |
assertTrue(topicClient2.createdTopics.nonEmpty, "Expect second leader to write some topics") |
val delta = new MetadataDelta(image) | ||
delta.replay(new FeatureLevelRecord() | ||
.setName(MetadataVersion.FEATURE_NAME) | ||
.setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we test against MetadataVersion.latestProduction()
instead?
if (afterState.loggableChangeSinceState(beforeState) || alwaysLog) { | ||
log.info("{} in {} ns. Transitioned migration state from {} to {}", | ||
name, durationNs, beforeState, afterState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If alwaysLog == true
, do we still want to log this if afterState.equals(beforeState)
?
@@ -491,6 +536,13 @@ public void run() throws Exception { | |||
return; | |||
} | |||
|
|||
if (!curLeaderAndEpoch.equals(leaderAndEpoch)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are any of the previous actions/checks in this method worth running if the epoch is stale? Why not make also add MetadataChangeEvent
to the set of events that should be skipped in this situation? Might need a different name than MigrationWriteEvent
, but does the generalization make sense?
// between writes and reads on different ZNodes, we need to write something to the /migration ZNode to | ||
// ensure we have the latest /migration zkVersion. | ||
applyMigrationOperation("Updated migration state", state -> { | ||
state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion, as it's not obvious how this leads to the unconditional update.
state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration | |
state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration via KafkaZkClient#retryRequestsUntilConnected |
When becoming the active KRaftMigrationDriver, there is another race condition similar to KAFKA-16171. This time, the race is due to a stale read from ZK. After writing to
/controller
and/controller_epoch
, it is possible that a read on/migration
is not linear with the writes that were just made. In other words, we get a stale read on/migration
. This leads to an inability to sync metadata to ZK due to incorrect zkVersion on the migration Znode.The non-linearizability of reads is in fact documented behavior for ZK, so we need to handle it.
To fix the stale read, this patch adds a write to
/migration
after updating/controller
and/controller_epoch
. This allows us to learn the correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER state.This patch also adds a check on the current leader epoch when running certain events in KRaftMigrationDriver. Historically, we did not include this check because it is not necessary for correctness. Writes to ZK are gated on the
/controller_epoch
zkVersion, and RPCs sent to brokers are gated on the controller epoch. However, during a time of rapid failover, there is a lot of processing happening on the controller (i.e., full metadata sync to ZK and full UMRs sent to brokers), so it is best to avoid running events we know will fail.There is also a small fix in here to improve the logging of ZK operations. The log message are changed to past tense to reflect the fact that they have already happened by the time the log message is created.