Skip to content

Commit

Permalink
[txns] fix UB in TxnSystemClient when adding max timeout to now
Browse files Browse the repository at this point in the history
This patch addresses the following UB found in a pre-commit:

/home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:220:10: runtime error: signed integer overflow: 271833850110 + 9223372036854775807 cannot be represented in type 'long'
    #0 0x7f225fca9b31 in kudu::MonoTime::AddDelta(kudu::MonoDelta const&) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:220:10
    #1 0x7f225fcaaafe in kudu::operator+(kudu::MonoTime const&, kudu::MonoDelta const&) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:335:7
    #2 0x7f226fa3d6ff in kudu::transactions::TxnSystemClient::CoordinateTransactionAsync(kudu::tserver::CoordinatorOpPB, kudu::MonoDelta, std::function<void (kudu::Status const&)> const&, kudu::tserver::CoordinatorOpResultPB*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/transactions/txn_system_client.cc:331:45
    #3 0x7f226fa3feca in kudu::transactions::TxnSystemClient::KeepTransactionAlive(long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, kudu::MonoDelta) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/transactions/txn_system_client.cc:320:3
    #4 0x7f2271211629 in kudu::transactions::TxnManager::KeepTransactionAlive(long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, kudu::MonoTime const&) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/master/txn_manager.cc:238:27
    #5 0x7f227121535f in kudu::transactions::TxnManagerServiceImpl::KeepTransactionAlive(kudu::transactions::KeepTransactionAliveRequestPB const*, kudu::transactions::KeepTransactionAliveResponsePB*, kudu::rpc::RpcContext*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/master/txn_manager_service.cc:159:42
    #6 0x7f2265d5749e in std::function<void (google::protobuf::Message const*, google::protobuf::Message*, kudu::rpc::RpcContext*)>::operator()(google::protobuf::Message const*, google::protobuf::Message*, kudu::rpc::RpcContext*) const ../../../include/c++/7.5.0/bits/std_function.h:706:14
    #7 0x7f2265d5648c in kudu::rpc::GeneratedServiceIf::Handle(kudu::rpc::InboundCall*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/rpc/service_if.cc:137:3
    #8 0x7f2265d5c42d in kudu::rpc::ServicePool::RunThread() /home/jenkins-slave/workspace/kudu-master/1/src/kudu/rpc/service_pool.cc:232:15
    #9 0x7f225fd596ba in kudu::Thread::SuperviseThread(void*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/thread.cc:674:3
    #10 0x7f22625026da in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x76da)
    #11 0x7f225bfea71e in clone (/lib/x86_64-linux-gnu/libc.so.6+0x12171e)

SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:220:10 in

Previously, we converted an initial deadline to a timeout, potentially
rejiggering the value in case of the maximal timeout, and then
recomputed the deadline. This patch addresses the UB by addressing a
TODO to pass deadlines in the context of the TxnSystemClient instead of
timeouts.

Change-Id: I1e5d4d06e8c0801c7f6b2399f7622e6f039f988e
Reviewed-on: http://gerrit.cloudera.org:8080/17993
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
  • Loading branch information
andrwng committed Nov 3, 2021
1 parent 3e24e1b commit f334e6e
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 120 deletions.
2 changes: 1 addition & 1 deletion src/kudu/integration-tests/txn_commit-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
op_pb.set_txn_id(0);
op_pb.set_type(ParticipantOpPB::ABORT_TXN);
ASSERT_OK(txn_client_->ParticipateInTransaction(
participant_ids_[0], op_pb, MonoDelta::FromSeconds(3)));
participant_ids_[0], op_pb, MonoTime::Now() + MonoDelta::FromSeconds(3)));

// When we try to commit, we should end up not completing.
ASSERT_OK(txn->StartCommit());
Expand Down
99 changes: 58 additions & 41 deletions src/kudu/integration-tests/txn_participant-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -847,32 +847,39 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) {
// Get commit-related metadata.
TxnMetadataPB meta_pb;
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout,
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA),
MonoTime::Now() + kDefaultTimeout,
/*begin_commit_timestamp*/nullptr, &meta_pb);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();

ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout,
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA),
MonoTime::Now() + kDefaultTimeout,
/*begin_commit_timestamp*/nullptr, &meta_pb));
ASSERT_FALSE(meta_pb.has_aborted());
ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp());
ASSERT_FALSE(meta_pb.has_commit_timestamp());

ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT), kDefaultTimeout));
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT),
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout,
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA),
MonoTime::Now() + kDefaultTimeout,
/*begin_commit_timestamp*/nullptr, &meta_pb));
ASSERT_FALSE(meta_pb.has_aborted());
ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp());
ASSERT_FALSE(meta_pb.has_commit_timestamp());

ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), kDefaultTimeout));
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT),
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout,
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA),
MonoTime::Now() + kDefaultTimeout,
/*begin_commit_timestamp*/nullptr, &meta_pb));
ASSERT_FALSE(meta_pb.has_aborted());
ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp());
Expand All @@ -881,11 +888,14 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) {
// Get abort-related metadata.
constexpr const auto kAbortedTxnId = 1;
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN),
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN),
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout,
tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::GET_METADATA),
MonoTime::Now() + kDefaultTimeout,
/*begin_commit_timestamp*/nullptr, &meta_pb));
ASSERT_TRUE(meta_pb.has_aborted());
ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp());
Expand All @@ -909,13 +919,15 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) {
cluster_->messenger()->sasl_proto_name(),
&txn_client));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN),
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 } }));

// Begin another transaction with a lower txn ID. This is allowed, since
// partition locks are only taken once we write.
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kSecondTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(kSecondTxn, ParticipantOpPB::BEGIN_TXN),
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas,
{ { kFirstTxn, kOpen, -1 }, { kSecondTxn, kOpen, -1 } }));
}
Expand All @@ -935,30 +947,34 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
cluster_->messenger()->sasl_proto_name(),
&txn_client));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));

// Try some illegal ops and ensure we get an error.
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), kDefaultTimeout);
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT),
MonoTime::Now() + kDefaultTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));

ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));

// Progress the transaction forward, and perform similar checks that we get
// errors when we attempt illegal ops.
Timestamp begin_commit_ts;
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT),
kDefaultTimeout, &begin_commit_ts));
MonoTime::Now() + kDefaultTimeout, &begin_commit_ts));
ASSERT_NE(Timestamp::kInvalidTimestamp, begin_commit_ts);
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } }));

s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout);
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
MonoTime::Now() + kDefaultTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } }));

Expand All @@ -967,25 +983,25 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
Timestamp refetched_begin_commit_ts;
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT),
kDefaultTimeout, &refetched_begin_commit_ts));
MonoTime::Now() + kDefaultTimeout, &refetched_begin_commit_ts));
ASSERT_EQ(refetched_begin_commit_ts, begin_commit_ts);
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } }));

// Once we finish committing, we should be unable to begin or abort.
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, kDummyCommitTimestamp),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::ABORT_TXN }) {
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, type), kDefaultTimeout);
tablet_id, MakeParticipantOp(kTxnId, type), MonoTime::Now() + kDefaultTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
}
NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
for (const auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) {
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, type, kDummyCommitTimestamp),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
}
NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
}
Expand All @@ -1007,45 +1023,45 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) {
&txn_client));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnOne, kOpen, -1 }, { kTxnTwo, kOpen, -1 } }));

// Once we abort, we should be unable to do anything further.
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::ABORT_TXN),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas,
{ { kTxnOne, kAborted, -1 }, { kTxnTwo, kCommitInProgress, -1 } }));

ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::ABORT_TXN),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas,
{ { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::FINALIZE_COMMIT }) {
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnOne, type), kDefaultTimeout);
tablet_id, MakeParticipantOp(kTxnOne, type), MonoTime::Now() + kDefaultTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnTwo, type), kDefaultTimeout);
tablet_id, MakeParticipantOp(kTxnTwo, type), MonoTime::Now() + kDefaultTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
}
NO_FATALS(CheckReplicasMatchTxns(replicas,
{ { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
// Repeated abort calls are idempotent.
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::ABORT_TXN),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::ABORT_TXN),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas,
{ { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
}
Expand All @@ -1065,13 +1081,14 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) {
for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::FINALIZE_COMMIT }) {
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(txn_id++, type), kDefaultTimeout);
tablet_id, MakeParticipantOp(txn_id++, type), MonoTime::Now() + kDefaultTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
NO_FATALS(CheckReplicasMatchTxns(replicas, {}));
}

ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(txn_id++, ParticipantOpPB::ABORT_TXN), kDefaultTimeout));
tablet_id, MakeParticipantOp(txn_id++, ParticipantOpPB::ABORT_TXN),
MonoTime::Now() + kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, { { 2, kAborted, -1 } }));
}

Expand All @@ -1092,18 +1109,18 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientRepeatCalls) {
for (const auto& type : kCommitSequence) {
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
}
for (const auto& type : kAbortSequence) {
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp),
kDefaultTimeout));
MonoTime::Now() + kDefaultTimeout));
}
NO_FATALS(CheckReplicasMatchTxns(
replicas, { { kTxnOne, kCommitted, kDummyCommitTimestamp }, { kTxnTwo, kAborted, -1 } }));
Expand All @@ -1127,7 +1144,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) {
&txn_client));
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
MonoDelta::FromSeconds(1));
MonoTime::Now() + MonoDelta::FromSeconds(1));
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();

// We should have an initializing transaction until a majority is achieved,
Expand Down Expand Up @@ -1162,7 +1179,7 @@ Status SendParticipantOps(TxnSystemClient* txn_client, const string& tablet_id,
int txn_id = (*next_txn_id)++;
for (const auto& op : kCommitSequence) {
RETURN_NOT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(txn_id, op), kLongTimeout));
tablet_id, MakeParticipantOp(txn_id, op), MonoTime::Now() + kLongTimeout));
}
}
return Status::OK();
Expand Down Expand Up @@ -1457,17 +1474,17 @@ TEST_F(TxnParticipantElectionStormITest, TestTxnSystemClientRetriesThroughStorm)

// Start injecting latency to Raft-related traffic to spur elections.
FLAGS_raft_enable_pre_election = false;
FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 * FLAGS_raft_heartbeat_interval_ms;;
FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 * FLAGS_raft_heartbeat_interval_ms;
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2));
constexpr const int64_t kCommittedTxnId = 0;
constexpr const int64_t kAbortedTxnId = 1;
for (const auto& op : kCommitSequence) {
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kCommittedTxnId, op), kTimeout));
tablet_id, MakeParticipantOp(kCommittedTxnId, op), MonoTime::Now() + kTimeout));
}
for (const auto& op : kAbortSequence) {
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kAbortedTxnId, op), kTimeout));
tablet_id, MakeParticipantOp(kAbortedTxnId, op), MonoTime::Now() + kTimeout));
}
const vector<TxnParticipant::TxnEntry> expected_txns = {
{ kCommittedTxnId, kCommitted, kDummyCommitTimestamp },
Expand Down
12 changes: 6 additions & 6 deletions src/kudu/integration-tests/txn_status_table-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientMasterDown) {
int64_t highest_seen_txn_id = -1;
auto s = txn_sys_client_->BeginTransaction(
1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
MonoDelta::FromMilliseconds(100));
MonoTime::Now() + MonoDelta::FromMilliseconds(100));
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// The 'highest_seen_txn_id' should be left untouched.
ASSERT_EQ(-1, highest_seen_txn_id);
Expand All @@ -503,7 +503,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientMasterDown) {
int64_t highest_seen_txn_id = -1;
ASSERT_OK(txn_sys_client_->BeginTransaction(
1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
MonoDelta::FromSeconds(10)));
MonoTime::Now() + MonoDelta::FromSeconds(10)));
// Make sure the highest txn ID we've seen matches the one we just started.
ASSERT_EQ(1, highest_seen_txn_id);
}
Expand All @@ -520,7 +520,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
int64_t highest_seen_txn_id = -1;
auto s = txn_sys_client_->BeginTransaction(
1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
MonoDelta::FromMilliseconds(100));
MonoTime::Now() + MonoDelta::FromMilliseconds(100));
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// The 'highest_seen_txn_id' should be left untouched.
ASSERT_EQ(-1, highest_seen_txn_id);
Expand All @@ -541,7 +541,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
int64_t highest_seen_txn_id = -1;
ASSERT_OK(txn_sys_client_->BeginTransaction(
1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
MonoDelta::FromSeconds(10)));
MonoTime::Now() + MonoDelta::FromSeconds(10)));
// Make sure the highest txn ID we've seen matches the one we just started.
ASSERT_EQ(1, highest_seen_txn_id);
}
Expand Down Expand Up @@ -823,7 +823,7 @@ TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTable) {
//
// TODO(aserbin): change this to expected Status::OK() after implementing that
auto s = txn_sys_client_->RegisterParticipant(
kNewTxnId, "txn_participant", kUser, MonoDelta::FromSeconds(10));
kNewTxnId, "txn_participant", kUser, MonoTime::Now() + MonoDelta::FromSeconds(10));
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"No tablet covering the requested range partition");
Expand All @@ -850,7 +850,7 @@ TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTableConcurrent) {
++success_count_begin;
}
s = txn_sys_client_->RegisterParticipant(
0, kParticipant, kUser, MonoDelta::FromSeconds(10));
0, kParticipant, kUser, MonoTime::Now() + MonoDelta::FromSeconds(10));
if (s.ok()) {
++success_count_register;
}
Expand Down
6 changes: 3 additions & 3 deletions src/kudu/master/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ Status Master::InitTxnManager() {
return Status::OK();
}

Status Master::WaitForTxnManagerInit(const MonoDelta& timeout) const {
if (timeout.Initialized()) {
const Status* s = txn_manager_init_status_.WaitFor(timeout);
Status Master::WaitForTxnManagerInit(MonoTime deadline) const {
if (deadline.Initialized()) {
const Status* s = txn_manager_init_status_.WaitFor(deadline - MonoTime::Now());
if (!s) {
return Status::TimedOut("timed out waiting for TxnManager to initialize");
}
Expand Down
3 changes: 2 additions & 1 deletion src/kudu/master/master.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace kudu {
class HostPort;
class MaintenanceManager;
class MonoDelta;
class MonoTime;
class ThreadPool;

namespace rpc {
Expand Down Expand Up @@ -76,7 +77,7 @@ class Master : public kserver::KuduServer {

Status StartAsync();
Status WaitForCatalogManagerInit() const;
Status WaitForTxnManagerInit(const MonoDelta& timeout = {}) const;
Status WaitForTxnManagerInit(MonoTime deadline = {}) const;

// Wait until this Master's catalog manager instance is the leader and is ready.
// This method is intended for use by unit tests.
Expand Down
Loading

0 comments on commit f334e6e

Please sign in to comment.