Skip to content

Commit

Permalink
KUDU-3254 fix bug in meta-cache exposed by KUDU-1802
Browse files Browse the repository at this point in the history
This patch fixes an issue resulting in a SIGABRT crash in Kudu client
when working with stale scan tokens which contain information about
tablet locations for a table (see KUDU-1802) whose range partition
was dropped.  The patch also adds a test scenario reproducing the crash;
now it passes and can catch future regressions.

This patch is a follow-up to d23ee5d.

Prior the change in src/kudu/client/meta_cache.cc was back-ported from
Kudu 1.14 as part of this fix, the scenario crashed with SIGABRT when
running with the stack trace similar to the following (this one below
was captured on macOS):

  * frame #0: 0x00007fff7035833a libsystem_kernel.dylib`__pthread_kill + 10
    frame #1: 0x00007fff70414e60 libsystem_pthread.dylib`pthread_kill + 430
    frame #2: 0x00007fff702df808 libsystem_c.dylib`abort + 120
    frame #3: 0x000000010ca1a259 libglog.0.dylib`google::logging_fail() at logging.cc:1474:3
    frame #4: 0x000000010ca19121 libglog.0.dylib`google::LogMessage::SendToLog() [inlined] google::LogMessage::Fail() at logging.cc:1488:3
    frame #5: 0x000000010ca1911b libglog.0.dylib`google::LogMessage::SendToLog() at logging.cc:1442
    frame #6: 0x000000010ca19815 libglog.0.dylib`google::LogMessage::Flush() at logging.cc:1311:5
    frame #7: 0x000000010ca1d76f libglog.0.dylib`google::LogMessageFatal::~LogMessageFatal() at logging.cc:2023:5
    frame #8: 0x000000010ca1a5f9 libglog.0.dylib`google::LogMessageFatal::~LogMessageFatal() at logging.cc:2022:37
    frame #9: 0x0000000103e365e3 libkudu_client.dylib`std::__1::map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, kudu::client::internal::MetaCacheEntry, std::__1::less<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, kudu::client::internal::MetaCacheEntry> > >::mapped_type& FindOrDie<std::__1::map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, kudu::client::internal::MetaCacheEntry, std::__1::less<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, kudu::client::internal::MetaCacheEntry> > > >() at map-util.h:109:3
    frame #10: 0x0000000103e34cbb libkudu_client.dylib`kudu::client::internal::MetaCache::ProcessGetTableLocationsResponse() at meta_cache.cc:943:23
    frame #11: 0x0000000103e86166 libkudu_client.dylib`kudu::client::KuduScanToken::Data::PBIntoScanner() at scan_token-internal.cc:192:35
    frame #12: 0x0000000103e88051 libkudu_client.dylib`kudu::client::KuduScanToken::Data::DeserializeIntoScanner() at scan_token-internal.cc:111:10
    frame #13: 0x0000000103d55d3c libkudu_client.dylib`kudu::client::KuduScanToken::DeserializeIntoScanner() at client.cc:1879:10

Change-Id: I5b8370290c13b1e496f461ed5bc2e0193bdf4b19
Reviewed-on: http://gerrit.cloudera.org:8080/17152
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
(cherry picked from commit 7c8dca6)
  Conflicts:
    src/kudu/client/meta_cache.cc
    src/kudu/client/scan_token-test.cc
Reviewed-on: http://gerrit.cloudera.org:8080/17158
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <[email protected]>
  • Loading branch information
alexeyserbin committed Mar 8, 2021
1 parent 4897758 commit 074e7e9
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 2 deletions.
7 changes: 5 additions & 2 deletions src/kudu/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1121,9 +1121,12 @@ Status MetaCache::ProcessGetTableLocationsResponse(const KuduTable* table,
entry->refresh_expiration_time(expiration_time);
} else {
// A remote tablet exists, but isn't indexed for key-based lookups.
// Index it now.
// This might happen if the entry was removed after tablet range
// was dropped, but then a scan token with stale information on tablet
// locations was provided to start a scan. Let's index it now.
MetaCacheEntry entry(expiration_time, remote);
VLOG(3) << Substitute("Caching '$0' entry $1", table->name(), entry.DebugString(table));
VLOG(3) << Substitute("Caching '$0' entry $1",
table->name(), entry.DebugString(table));
EmplaceOrDie(&tablets_by_key, tablet_lower_bound, std::move(entry));
}
continue;
Expand Down
234 changes: 234 additions & 0 deletions src/kudu/client/scan_token-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
Expand Down Expand Up @@ -126,6 +128,28 @@ class ScanTokenTest : public KuduTest {
return rows;
}

// Similar to CountRows() above, but use the specified client handle
// and run all the scanners sequentially, one by one.
Status CountRowsSeq(KuduClient* client,
vector<KuduScanToken*> tokens,
int64_t* row_count) {
int64_t count = 0;
for (auto* t : tokens) {
unique_ptr<KuduScanToken> token(t);
unique_ptr<KuduScanner> scanner;
RETURN_NOT_OK(IntoUniqueScanner(client, *token, &scanner));

RETURN_NOT_OK(scanner->Open());
while (scanner->HasMoreRows()) {
KuduScanBatch batch;
RETURN_NOT_OK(scanner->NextBatch(&batch));
count += batch.NumRows();
}
}
*row_count = count;
return Status::OK();
}

void VerifyTabletInfo(const vector<KuduScanToken*>& tokens) {
unordered_set<string> tablet_ids;
for (auto t : tokens) {
Expand Down Expand Up @@ -828,5 +852,215 @@ TEST_F(ScanTokenTest, TestMasterRequestsNoMetadata) {
ASSERT_EQ(init_location_requests + 1, NumGetTableLocationsRequests());
}

enum FirstRangeChangeMode {
BEGIN = 0,
RANGE_DROPPED = 0,
RANGE_DROPPED_AND_PRECEDING_RANGE_ADDED = 1,
RANGE_DROPPED_AND_LARGER_ONE_ADDED = 2,
RANGE_DROPPED_AND_SMALLER_ONE_ADDED = 3,
RANGE_REPLACED_WITH_SAME = 4,
RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED = 5,
RANGE_REPLACED_WITH_TWO_SMALLER_ONES = 6,
END = 7,
};

class StaleScanTokensParamTest :
public ScanTokenTest,
public ::testing::WithParamInterface<FirstRangeChangeMode> {
};

// Create scan tokens for one state of the table and store it for future use.
// Use the tokens to scan the table. Alter the table dropping first range
// partition, optionally replacing it according with FirstRangeChangeMode
// enum. Open the altered table via the client handle which was used to run
// the token-based scan prior. Now, attempt to scan the table using stale
// tokens generated with the original state of the table.
TEST_P(StaleScanTokensParamTest, DroppingFirstRange) {
constexpr const char* const kTableName = "stale-scan-tokens-dfr";
KuduSchema schema;
{
KuduSchemaBuilder builder;
builder.AddColumn("key")->
NotNull()->
Type(KuduColumnSchema::INT64)->
PrimaryKey();
ASSERT_OK(builder.Build(&schema));
}

shared_ptr<KuduTable> table;
{
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
{
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
ASSERT_OK(lower_bound->SetInt64("key", -100));
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
ASSERT_OK(upper_bound->SetInt64("key", 0));
table_creator->add_range_partition(
lower_bound.release(), upper_bound.release());
}
{
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
ASSERT_OK(lower_bound->SetInt64("key", 0));
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
ASSERT_OK(upper_bound->SetInt64("key", 100));
table_creator->add_range_partition(
lower_bound.release(), upper_bound.release());
}

ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema)
.set_range_partition_columns({ "key" })
.num_replicas(1)
.Create());
ASSERT_OK(client_->OpenTable(kTableName, &table));
}

// Populate the table with data.
{
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(10000);
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));

for (int i = -50; i < 50; ++i) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
ASSERT_OK(session->Apply(insert.release()));
}
}

// Prepare two sets of scan tokens.
vector<KuduScanToken*> tokens_a;
{
KuduScanTokenBuilder builder(table.get());
ASSERT_OK(builder.IncludeTableMetadata(true));
ASSERT_OK(builder.IncludeTabletMetadata(true));
ASSERT_OK(builder.Build(&tokens_a));
}
ASSERT_EQ(2, tokens_a.size());

vector<KuduScanToken*> tokens_b;
{
KuduScanTokenBuilder builder(table.get());
ASSERT_OK(builder.IncludeTableMetadata(true));
ASSERT_OK(builder.IncludeTabletMetadata(true));
ASSERT_OK(builder.Build(&tokens_b));
}
ASSERT_EQ(2, tokens_b.size());

// Drop the first range partition, running the operation via the 'client_'
// handle.
{
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
ASSERT_OK(lower_bound->SetInt64("key", -100));
ASSERT_OK(upper_bound->SetInt64("key", 0));
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
ASSERT_OK(alterer->DropRangePartition(
lower_bound.release(), upper_bound.release())->Alter());
}

shared_ptr<KuduClient> new_client;
ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));

int64_t row_count_a = 0;
ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_a), &row_count_a));
ASSERT_EQ(50, row_count_a);

// Open the test table via 'new_client' handle to populate the metadata
// with actual table metadata, including non-covered ranges. This purges
// an entry for the [-100, 0) range from the 'tablet_by_key_' map, still
// keeping corresponding RemoteTable entry in the 'tablets_by_id_' map.
{
shared_ptr<KuduTable> t;
ASSERT_OK(new_client->OpenTable(kTableName, &t));
}

const auto range_adder = [&schema](
KuduClient* c, int64_t range_beg, int64_t range_end) {
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
RETURN_NOT_OK(lower_bound->SetInt64("key", range_beg));
RETURN_NOT_OK(upper_bound->SetInt64("key", range_end));
unique_ptr<KuduTableAlterer> alterer(c->NewTableAlterer(kTableName));
return alterer->AddRangePartition(lower_bound.release(),
upper_bound.release())->Alter();
};

// The bifurcation point.
const auto mode = GetParam();
switch (mode) {
case RANGE_DROPPED:
break;
case RANGE_DROPPED_AND_PRECEDING_RANGE_ADDED:
ASSERT_OK(range_adder(client_.get(), -200, -100));
break;
case RANGE_DROPPED_AND_LARGER_ONE_ADDED:
ASSERT_OK(range_adder(client_.get(), -200, 0));
break;
case RANGE_DROPPED_AND_SMALLER_ONE_ADDED:
ASSERT_OK(range_adder(client_.get(), -50, 0));
break;
case RANGE_REPLACED_WITH_SAME:
ASSERT_OK(range_adder(client_.get(), -100, 0));
break;
case RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED:
ASSERT_OK(range_adder(client_.get(), -100, 0));
ASSERT_OK(range_adder(client_.get(), -200, -100));
break;
case RANGE_REPLACED_WITH_TWO_SMALLER_ONES:
ASSERT_OK(range_adder(client_.get(), -100, -50));
ASSERT_OK(range_adder(client_.get(), -50, 0));
break;
default:
FAIL() << strings::Substitute("$0: unsupported partition change mode",
static_cast<uint16_t>(mode));
}

shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(10000);
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));

int64_t expected_row_count = 50;
switch (mode) {
case RANGE_DROPPED_AND_LARGER_ONE_ADDED:
expected_row_count += 100;
[[fallthrough]];
case RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED:
for (int i = -200; i < -100; ++i) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
ASSERT_OK(session->Apply(insert.release()));
}
// The rows in the preceeding range should not be read if using the
// token for the [-100, 0) original range.
[[fallthrough]];
case RANGE_DROPPED_AND_SMALLER_ONE_ADDED:
case RANGE_REPLACED_WITH_SAME:
case RANGE_REPLACED_WITH_TWO_SMALLER_ONES:
for (int i = -25; i < 0; ++i) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
ASSERT_OK(session->Apply(insert.release()));
}
expected_row_count += 25;
break;
default:
break;
}

// Start another tablet scan using the other identical set of scan tokens.
// The client metacache should not produce any errors: it should re-fetch
// the information about the current partitioning scheme and scan the table
// within the range of the new partitions which correspond to the originally
// supplied range.
int64_t row_count_b = -1;
ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_b), &row_count_b));
ASSERT_EQ(expected_row_count, row_count_b);
}

INSTANTIATE_TEST_CASE_P(FirstRangeDropped, StaleScanTokensParamTest,
testing::Range(FirstRangeChangeMode::BEGIN,
FirstRangeChangeMode::END));

} // namespace client
} // namespace kudu

0 comments on commit 074e7e9

Please sign in to comment.