Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Fix the issue that CheckTxnStatus didn't make rollback on optimistic transaction's primary protected, which may break transaction atomicity (#16621) #16954

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 29 additions & 2 deletions components/resolved_ts/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ pub(crate) fn decode_write(key: &[u8], value: &[u8], is_apply: bool) -> Option<W
// gc_fence exists.
if is_apply && write.gc_fence.is_some() {
// `gc_fence` is set means the write record has been rewritten.
// Currently the only case is writing overlapped_rollback. And in this case
// Currently the only case is writing overlapped_rollback. And in this case we
// can safely ignore the writing operation.
assert!(write.has_overlapped_rollback);
assert_ne!(write.write_type, WriteType::Rollback);
return None;
Expand Down Expand Up @@ -334,9 +335,15 @@ mod tests {
must_prewrite_put(&mut engine, b"k1", &[b'v'; 512], b"k1", 4);
must_commit(&mut engine, b"k1", 4, 5);

must_prewrite_put(&mut engine, b"k1", b"v3", b"k1", 5);
must_prewrite_put(&mut engine, b"k1", b"v3", b"pk", 5);
must_rollback(&mut engine, b"k1", 5, false);

must_prewrite_put(&mut engine, b"k1", b"v4", b"k1", 6);
must_commit(&mut engine, b"k1", 6, 7);

must_prewrite_put(&mut engine, b"k1", b"v5", b"k1", 7);
must_rollback(&mut engine, b"k1", 7, true);

let k1 = Key::from_raw(b"k1");
let rows: Vec<_> = engine
.take_last_modifies()
Expand Down Expand Up @@ -398,6 +405,26 @@ mod tests {
commit_ts: None,
write_type: WriteType::Rollback,
},
ChangeRow::Prewrite {
key: k1.clone(),
start_ts: 6.into(),
value: Some(b"v4".to_vec()),
lock_type: LockType::Put,
},
ChangeRow::Commit {
key: k1.clone(),
start_ts: Some(6.into()),
commit_ts: Some(7.into()),
write_type: WriteType::Put,
},
ChangeRow::Prewrite {
key: k1.clone(),
start_ts: 7.into(),
value: Some(b"v5".to_vec()),
lock_type: LockType::Put,
},
// Rollback of the txn@start_ts=7 will be missing as overlapped rollback is not
// hanlded.
];
assert_eq!(rows, expected);

Expand Down
16 changes: 11 additions & 5 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,10 @@ pub(crate) mod tests {

// Rollback lock
must_rollback(&mut engine, k, 15, false);
// Rollbacks of optimistic transactions needn't be protected
must_get_rollback_protected(&mut engine, k, 15, false);
// Rollbacks of optimistic transactions need to be protected
// TODO: Re-check how the test can be better written after refinement of
// `must_rollback`'s semantics.
must_get_rollback_protected(&mut engine, k, 15, true);
}

#[test]
Expand Down Expand Up @@ -896,16 +898,20 @@ pub(crate) mod tests {
#[test]
fn test_collapse_prev_rollback() {
let mut engine = TestEngineBuilder::new().build().unwrap();
let (key, value) = (b"key", b"value");
let (key, pk, value) = (b"key", b"pk", b"value");

// Worked around the problem that `must_rollback` always protects primary lock
// by setting different PK.
// TODO: Cover primary when working on https://github.com/tikv/tikv/issues/16625

// Add a Rollback whose start ts is 1.
must_prewrite_put(&mut engine, key, value, key, 1);
must_prewrite_put(&mut engine, key, value, pk, 1);
must_rollback(&mut engine, key, 1, false);
must_get_rollback_ts(&mut engine, key, 1);

// Add a Rollback whose start ts is 2, the previous Rollback whose
// start ts is 1 will be collapsed.
must_prewrite_put(&mut engine, key, value, key, 2);
must_prewrite_put(&mut engine, key, value, pk, 2);
must_rollback(&mut engine, key, 2, false);
must_get_none(&mut engine, key, 2);
must_get_rollback_ts(&mut engine, key, 2);
Expand Down
4 changes: 2 additions & 2 deletions src/storage/txn/actions/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ pub fn rollback_lock(
txn.delete_value(key.clone(), lock.ts);
}

// Only the primary key of a pessimistic transaction needs to be protected.
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
// The primary key of a transaction needs to be protected.
let protected: bool = key.is_encoded_from(&lock.primary);
if let Some(write) = make_rollback(reader.start_ts, protected, overlapped_write) {
txn.put_write(key.clone(), reader.start_ts, write.as_ref().to_bytes());
}
Expand Down
5 changes: 3 additions & 2 deletions src/storage/txn/actions/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ pub mod tests {
// TTL expired. The lock should be removed.
must_succeed(&mut engine, k, ts(10, 0), ts(120, 0));
must_unlocked(&mut engine, k);
// Rollbacks of optimistic transactions needn't be protected
must_get_rollback_protected(&mut engine, k, ts(10, 0), false);
// Rollbacks of optimistic transactions need to be protected
// See: https://github.com/tikv/tikv/issues/16620
must_get_rollback_protected(&mut engine, k, ts(10, 0), true);
must_get_rollback_ts(&mut engine, k, ts(10, 0));

// Rollbacks of primary keys in pessimistic transactions should be protected
Expand Down
73 changes: 69 additions & 4 deletions src/storage/txn/commands/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {
#[cfg(test)]
pub mod tests {
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::{self, Context, LockInfo, PrewriteRequestPessimisticAction::*};
use kvproto::kvrpcpb::{
self, Context, LockInfo, PrewriteRequestPessimisticAction::*, WriteConflictReason,
};
use tikv_util::deadline::Deadline;
use txn_types::{Key, LastChange, WriteType};

Expand All @@ -168,7 +170,7 @@ pub mod tests {
kv::Engine,
lock_manager::MockLockManager,
mvcc,
mvcc::tests::*,
mvcc::{tests::*, ErrorInner},
txn::{
self,
actions::acquire_pessimistic_lock::tests::acquire_pessimistic_lock_allow_lock_with_conflict,
Expand Down Expand Up @@ -224,7 +226,12 @@ pub mod tests {
)
.unwrap();
if let ProcessResult::TxnStatus { txn_status } = result.pr {
assert!(status_pred(txn_status));
let formatted_txn_status = format!("{:?}", txn_status);
assert!(
status_pred(txn_status),
"txn_status returned by check_txn_status ({}) doesn't pass the check",
formatted_txn_status
);
} else {
unreachable!();
}
Expand Down Expand Up @@ -414,7 +421,7 @@ pub mod tests {
|s| s == TtlExpire,
);
must_unlocked(&mut engine, b"k1");
must_get_rollback_protected(&mut engine, b"k1", 1, false);
must_get_rollback_protected(&mut engine, b"k1", 1, true);

// case 2: primary is prewritten (pessimistic)
must_acquire_pessimistic_lock(&mut engine, b"k2", b"k2", 15, 15);
Expand Down Expand Up @@ -829,6 +836,7 @@ pub mod tests {
ts(20, 0),
WriteType::Rollback,
);
must_get_rollback_protected(&mut engine, k, ts(20, 0), true);

// Push the min_commit_ts of pessimistic locks.
must_acquire_pessimistic_lock_for_large_txn(&mut engine, k, k, ts(4, 0), ts(130, 0), 200);
Expand Down Expand Up @@ -1437,4 +1445,61 @@ pub mod tests {
)
.unwrap_err();
}

#[test]
fn test_check_txn_status_rollback_optimistic() {
let mut engine = TestEngineBuilder::new().build().unwrap();
let k = b"k1";
let (v1, v2) = (b"v1", b"v2");

let ts = TimeStamp::compose;

must_prewrite_put_async_commit(&mut engine, k, v1, k, &Some(vec![]), ts(1, 0), ts(1, 1));
must_commit(&mut engine, k, ts(1, 0), ts(2, 0));

must_prewrite_put(&mut engine, k, v2, k, ts(2, 0));
assert!(!must_have_write(&mut engine, k, ts(2, 0)).has_overlapped_rollback);

must_success(
&mut engine,
k,
ts(2, 0),
ts(3, 0),
ts(3, 0),
true,
false,
false,
|s| s == TtlExpire,
);
must_get_overlapped_rollback(
&mut engine,
k,
ts(2, 0),
ts(1, 0),
WriteType::Put,
Some(0.into()),
);

let e = must_prewrite_put_err(&mut engine, k, v2, k, ts(2, 0));
match &*e.0 {
ErrorInner::WriteConflict {
start_ts,
conflict_start_ts,
conflict_commit_ts,
key,
primary,
reason,
} => {
assert_eq!(*start_ts, ts(2, 0));
assert_eq!(*conflict_start_ts, ts(1, 0));
assert_eq!(*conflict_commit_ts, ts(2, 0));
assert_eq!(key.as_slice(), k);
assert_eq!(primary.as_slice(), k);
assert_eq!(*reason, WriteConflictReason::SelfRolledBack);
}
e => {
panic!("unexpected error: {:?}", e);
}
}
}
}