Skip to content

Commit

Permalink
ZOOKEEPER-4643: Committed txns may be improperly truncated if followe…
Browse files Browse the repository at this point in the history
…r crashes right after updating currentEpoch but before persisting txns to disk
  • Loading branch information
AlphaCanisMajoris committed Mar 28, 2024
1 parent d12aba5 commit dad7545
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<Request> requestsToAck = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
Expand Down Expand Up @@ -756,26 +755,51 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
requestsToAck.add(request);

/*
* @see https://github.com/apache/zookeeper/pull/1848
* Log and process the pending txns in "packetsNotCommitted"
* according to "packetsCommitted", which have been committed
* by the leader. For these committed proposals, there is no
* need to reply ack.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394
* Keep the outstanding proposals in "packetsNotCommitted" to
* avoid NullPointerException when receiving COMMIT packet(s)
* right after replying NEWLEADER ack.
*/
while (!packetsCommitted.isEmpty()) {
long zxid = packetsCommitted.removeFirst();
pif = packetsNotCommitted.peekFirst();
if (pif == null) {
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid));
continue;
} else if (pif.hdr.getZxid() != zxid) {
LOG.warn("Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid()));
continue;
}
packetsNotCommitted.removeFirst();
fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
fzk.commit(zxid);
}

// persist the txns to disk
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
// Make sure to persist the txns to disk before replying NEWLEADER ack.
fzk.getZKDatabase().commit();
LOG.info("{} txns have been persisted and it took {}ms",
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
packetsNotCommitted.clear();
LOG.info("It took {}ms to log and commit pending txns in Leader's committed history. " +
"{} txns left in packetsNotCommitted",
Time.currentElapsedTime() - startTime, packetsNotCommitted.size());
}

// set the current epoch after all the tnxs are persisted
// ZOOKEEPER-4643 / ZOOKEEPER-4785: set the current epoch after txns are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);

// send NEWLEADER ack after all the tnxs are persisted
// send NEWLEADER ack after the txns are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
Expand All @@ -796,15 +820,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
// on waiting for a quorum of followers
for (final Request request : requestsToAck) {
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
writePacket(ackPacket, false);
}
writePacket(null, true);
requestsToAck.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)

readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
assertEquals(0, qp.getZxid());
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid());
assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());

Expand All @@ -765,36 +765,23 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());

//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
long start = System.currentTimeMillis();
while (createSessionZxid != f.fzk.getLastProcessedZxid()
&& (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
}
// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());

// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
start = System.currentTimeMillis();
zkDb2.loadDataBase();
while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
zkDb2.loadDataBase();
}
LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts());
assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
Expand Down

0 comments on commit dad7545

Please sign in to comment.