-
Notifications
You must be signed in to change notification settings - Fork 7.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ZOOKEEPER-4643, ZOOKEEPER-4646 & ZOOKEEPER-4394: Committed txns lost / NullPointerException in Learner.syncWithLeader() #2152
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -556,7 +556,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
readPacket(qp); | ||
Deque<Long> packetsCommitted = new ArrayDeque<>(); | ||
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); | ||
Deque<Request> requestsToAck = new ArrayDeque<>(); | ||
|
||
synchronized (zk) { | ||
if (qp.getType() == Leader.DIFF) { | ||
|
@@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
isPreZAB1_0 = false; | ||
|
||
// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). | ||
sock.setSoTimeout(self.tickTime * self.syncLimit); | ||
self.setSyncMode(QuorumPeer.SyncMode.NONE); | ||
zk.startupWithoutServing(); | ||
if (zk instanceof FollowerZooKeeperServer) { | ||
if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) { | ||
long startTime = Time.currentElapsedTime(); | ||
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; | ||
for (PacketInFlight p : packetsNotCommitted) { | ||
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest); | ||
requestsToAck.add(request); | ||
|
||
/* | ||
* @see https://github.com/apache/zookeeper/pull/1848 | ||
* Persist and process the committed txns in "packetsNotCommitted" | ||
* according to "packetsCommitted", which have been committed by | ||
* the leader. For these committed proposals, there is no need to | ||
* reply ack. | ||
* | ||
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394 | ||
* Keep the outstanding proposals in "packetsNotCommitted" to avoid | ||
* NullPointerException when the follower receives COMMIT packet(s) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given my comments above, then we should not clear |
||
* right after replying NEWLEADER ack. | ||
*/ | ||
while (!packetsCommitted.isEmpty()) { | ||
long zxid = packetsCommitted.removeFirst(); | ||
pif = packetsNotCommitted.peekFirst(); | ||
if (pif == null) { | ||
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid)); | ||
continue; | ||
} else if (pif.hdr.getZxid() != zxid) { | ||
LOG.warn("Committing 0x{}, but next proposal is 0x{}", | ||
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid())); | ||
continue; | ||
} | ||
packetsNotCommitted.removeFirst(); | ||
fzk.appendRequest(pif.hdr, pif.rec, pif.digest); | ||
fzk.processTxn(pif.hdr, pif.rec); | ||
} | ||
|
||
// persist the txns to disk | ||
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 | ||
// Make sure to persist the txns to disk before replying NEWLEADER ack. | ||
fzk.getZKDatabase().commit(); | ||
LOG.info("{} txns have been persisted and it took {}ms", | ||
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime); | ||
packetsNotCommitted.clear(); | ||
LOG.info("It took {}ms to persist and commit txns in packetsCommitted. " | ||
+ "{} outstanding txns left in packetsNotCommitted", | ||
Time.currentElapsedTime() - startTime, packetsNotCommitted.size()); | ||
} | ||
|
||
// set the current epoch after all the tnxs are persisted | ||
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643 | ||
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785 | ||
// Update current epoch after the committed txns are persisted | ||
self.setCurrentEpoch(newEpoch); | ||
LOG.info("Set the current epoch to {}", newEpoch); | ||
|
||
// send NEWLEADER ack after all the tnxs are persisted | ||
// Now we almost complete the synchronization phase. Start RequestProcessors | ||
// to asynchronously process the pending txns in "packetsNotCommitted" and | ||
// "packetsCommitted" later. | ||
sock.setSoTimeout(self.tickTime * self.syncLimit); | ||
self.setSyncMode(QuorumPeer.SyncMode.NONE); | ||
zk.startupWithoutServing(); | ||
|
||
// send NEWLEADER ack after the committed txns are persisted | ||
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); | ||
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid)); | ||
break; | ||
|
@@ -796,15 +826,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
|
||
// We need to log the stuff that came in between the snapshot and the uptodate | ||
if (zk instanceof FollowerZooKeeperServer) { | ||
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout | ||
// on waiting for a quorum of followers | ||
for (final Request request : requestsToAck) { | ||
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null); | ||
writePacket(ackPacket, false); | ||
} | ||
writePacket(null, true); | ||
requestsToAck.clear(); | ||
|
||
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; | ||
for (PacketInFlight p : packetsNotCommitted) { | ||
fzk.logRequest(p.hdr, p.rec, p.digest); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -741,8 +741,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) | |
|
||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACKEPOCH, qp.getType()); | ||
assertEquals(0, qp.getZxid()); | ||
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); | ||
assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid()); | ||
assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt()); | ||
assertEquals(1, f.self.getAcceptedEpoch()); | ||
assertEquals(0, f.self.getCurrentEpoch()); | ||
|
||
|
@@ -765,36 +765,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) | |
qp.setZxid(0); | ||
oa.writeRecord(qp, null); | ||
|
||
// Read the uptodate ack | ||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACK, qp.getType()); | ||
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); | ||
|
||
// Get the ack of the new leader | ||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACK, qp.getType()); | ||
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); | ||
assertEquals(1, f.self.getAcceptedEpoch()); | ||
assertEquals(1, f.self.getCurrentEpoch()); | ||
|
||
//Wait for the transactions to be written out. The thread that writes them out | ||
// does not send anything back when it is done. | ||
long start = System.currentTimeMillis(); | ||
while (createSessionZxid != f.fzk.getLastProcessedZxid() | ||
&& (System.currentTimeMillis() - start) < 50) { | ||
Thread.sleep(1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I did not see ZOOKEEPER-3023 after #2111. But if you are verifying this using TLA, this is doomed to failure. I am +1 to revert this to pre ZOOKEEPER-2678. |
||
} | ||
|
||
assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); | ||
|
||
// Read the uptodate ack | ||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACK, qp.getType()); | ||
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); | ||
|
||
// Make sure the data was recorded in the filesystem ok | ||
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); | ||
start = System.currentTimeMillis(); | ||
zkDb2.loadDataBase(); | ||
while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { | ||
Thread.sleep(1); | ||
zkDb2.loadDataBase(); | ||
} | ||
LOG.info("zkdb2 sessions:{}", zkDb2.getSessions()); | ||
LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts()); | ||
assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); | ||
|
@@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long zxid, long sessionId) throw | |
}, testData); | ||
} | ||
|
||
@Test | ||
public void testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File testData) throws Exception { | ||
testFollowerConversation(new FollowerConversation() { | ||
@Override | ||
public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception { | ||
File tmpDir = File.createTempFile("test", "dir", testData); | ||
tmpDir.delete(); | ||
tmpDir.mkdir(); | ||
File logDir = f.fzk.getTxnLogFactory().getDataLogDir().getParentFile(); | ||
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); | ||
//Spy on ZK so we can check if a snapshot happened or not. | ||
f.zk = spy(f.zk); | ||
try { | ||
assertEquals(0, f.self.getAcceptedEpoch()); | ||
assertEquals(0, f.self.getCurrentEpoch()); | ||
|
||
// Setup a database with a single /foo node | ||
ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir)); | ||
final long firstZxid = ZxidUtils.makeZxid(1, 1); | ||
zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null); | ||
Stat stat = new Stat(); | ||
assertEquals("data1", new String(zkDb.getData("/foo", stat, null))); | ||
|
||
QuorumPacket qp = new QuorumPacket(); | ||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.FOLLOWERINFO, qp.getType()); | ||
assertEquals(qp.getZxid(), 0); | ||
LearnerInfo learnInfo = new LearnerInfo(); | ||
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); | ||
assertEquals(learnInfo.getProtocolVersion(), 0x10000); | ||
assertEquals(learnInfo.getServerid(), 0); | ||
|
||
// We are simulating an established leader, so the epoch is 1 | ||
qp.setType(Leader.LEADERINFO); | ||
qp.setZxid(ZxidUtils.makeZxid(1, 0)); | ||
byte[] protoBytes = new byte[4]; | ||
ByteBuffer.wrap(protoBytes).putInt(0x10000); | ||
qp.setData(protoBytes); | ||
oa.writeRecord(qp, null); | ||
|
||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACKEPOCH, qp.getType()); | ||
assertEquals(0, qp.getZxid()); | ||
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); | ||
assertEquals(1, f.self.getAcceptedEpoch()); | ||
assertEquals(0, f.self.getCurrentEpoch()); | ||
|
||
// Send the snapshot we created earlier | ||
qp.setType(Leader.SNAP); | ||
qp.setData(new byte[0]); | ||
qp.setZxid(zkDb.getDataTreeLastProcessedZxid()); | ||
oa.writeRecord(qp, null); | ||
zkDb.serializeSnapshot(oa); | ||
oa.writeString("BenWasHere", null); | ||
Thread.sleep(10); //Give it some time to process the snap | ||
//No Snapshot taken yet, the SNAP was applied in memory | ||
verify(f.zk, never()).takeSnapshot(); | ||
|
||
// Leader sends an outstanding proposal | ||
long proposalZxid = ZxidUtils.makeZxid(1, 1001); | ||
proposeSetData(qp, proposalZxid, "data2", 2); | ||
oa.writeRecord(qp, null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is what ZOOKEEPER-4394 tried to report. New proposals are issued before |
||
|
||
qp.setType(Leader.NEWLEADER); | ||
qp.setZxid(ZxidUtils.makeZxid(1, 0)); | ||
oa.writeRecord(qp, null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
// Get the ack of the new leader | ||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACK, qp.getType()); | ||
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); | ||
assertEquals(1, f.self.getAcceptedEpoch()); | ||
assertEquals(1, f.self.getCurrentEpoch()); | ||
//Make sure that we did take the snapshot now | ||
verify(f.zk).takeSnapshot(true); | ||
assertEquals(firstZxid, f.fzk.getLastProcessedZxid()); | ||
|
||
// The outstanding proposal has not been persisted yet | ||
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); | ||
long lastZxid = zkDb2.loadDataBase(); | ||
assertEquals("data1", new String(zkDb2.getData("/foo", stat, null))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems an extra enforce to the paper. But given that we are driving the test step by step and we are testing implementation, I am +1 on this. By the paper, we should not have this problem and assertions should still hold anyway as there are no new proposals from new leader before |
||
assertEquals(firstZxid, lastZxid); | ||
|
||
TrackerWatcher watcher = new TrackerWatcher(); | ||
|
||
// The change should not have happened yet | ||
assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher))); | ||
|
||
// Leader commits proposalZxid right after it sends NEWLEADER to follower | ||
qp.setType(Leader.COMMIT); | ||
qp.setZxid(proposalZxid); | ||
oa.writeRecord(qp, null); | ||
|
||
qp.setType(Leader.UPTODATE); | ||
qp.setZxid(0); | ||
oa.writeRecord(qp, null); | ||
|
||
// Read the uptodate ack | ||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACK, qp.getType()); | ||
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); | ||
|
||
readPacketSkippingPing(ia, qp); | ||
assertEquals(Leader.ACK, qp.getType()); | ||
assertEquals(proposalZxid, qp.getZxid()); | ||
|
||
// The change should happen now | ||
watcher.waitForChange(); | ||
assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null))); | ||
|
||
// check and make sure the change is persisted | ||
zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); | ||
lastZxid = zkDb2.loadDataBase(); | ||
assertEquals("data2", new String(zkDb2.getData("/foo", stat, null))); | ||
assertEquals(proposalZxid, lastZxid); | ||
} finally { | ||
TestUtils.deleteFileRecursively(tmpDir); | ||
} | ||
} | ||
|
||
private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException { | ||
qp.setType(Leader.PROPOSAL); | ||
qp.setZxid(zxid); | ||
TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData); | ||
SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version); | ||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
OutputArchive boa = BinaryOutputArchive.getArchive(baos); | ||
boa.writeRecord(hdr, null); | ||
boa.writeRecord(sdt, null); | ||
qp.setData(baos.toByteArray()); | ||
} | ||
}, testData); | ||
} | ||
|
||
@Test | ||
public void testNormalRun(@TempDir File testData) throws Exception { | ||
testLeaderConversation(new LeaderConversation() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is somewhat misunderstanding. The key is to log these committed ones, they are considered committed before election by the paper. All the reason we touch
packetsNotCommitted
here is to make sure it is notlogRequest
again inbroadcast
phase. I think it might be better to renamepacketsNotCommitted
topacketsNotLogged
as @jeffrey-xiao did in #1930. "log" is a disk operation, "commit" is an agreement. What we want here should be "log committed txns agreed in election".Coming into the implementation, new proposals could still be committed before
NEWLEADER
sinceLearnerHandler
does not issueNEWLEADER
right after these committed txns. But it does not harm us here as we are potentially to persist more but not less and new leader expect noack
for committed ones.