diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc index f296b4cecd..7bb692b522 100644 --- a/src/kudu/client/meta_cache.cc +++ b/src/kudu/client/meta_cache.cc @@ -1121,9 +1121,12 @@ Status MetaCache::ProcessGetTableLocationsResponse(const KuduTable* table, entry->refresh_expiration_time(expiration_time); } else { // A remote tablet exists, but isn't indexed for key-based lookups. - // Index it now. + // This might happen if the entry was removed after tablet range + // was dropped, but then a scan token with stale information on tablet + // locations was provided to start a scan. Let's index it now. MetaCacheEntry entry(expiration_time, remote); - VLOG(3) << Substitute("Caching '$0' entry $1", table->name(), entry.DebugString(table)); + VLOG(3) << Substitute("Caching '$0' entry $1", + table->name(), entry.DebugString(table)); EmplaceOrDie(&tablets_by_key, tablet_lower_bound, std::move(entry)); } continue; diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc index 54a0e02cdb..df49bf685f 100644 --- a/src/kudu/client/scan_token-test.cc +++ b/src/kudu/client/scan_token-test.cc @@ -43,8 +43,10 @@ #include "kudu/common/common.pb.h" #include "kudu/common/partial_row.h" #include "kudu/common/wire_protocol.pb.h" +#include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" #include "kudu/master/master.h" #include "kudu/master/mini_master.h" #include "kudu/mini-cluster/internal_mini_cluster.h" @@ -126,6 +128,28 @@ class ScanTokenTest : public KuduTest { return rows; } + // Similar to CountRows() above, but use the specified client handle + // and run all the scanners sequentially, one by one. + Status CountRowsSeq(KuduClient* client, + vector tokens, + int64_t* row_count) { + int64_t count = 0; + for (auto* t : tokens) { + unique_ptr token(t); + unique_ptr scanner; + RETURN_NOT_OK(IntoUniqueScanner(client, *token, &scanner)); + + RETURN_NOT_OK(scanner->Open()); + while (scanner->HasMoreRows()) { + KuduScanBatch batch; + RETURN_NOT_OK(scanner->NextBatch(&batch)); + count += batch.NumRows(); + } + } + *row_count = count; + return Status::OK(); + } + void VerifyTabletInfo(const vector& tokens) { unordered_set tablet_ids; for (auto t : tokens) { @@ -828,5 +852,215 @@ TEST_F(ScanTokenTest, TestMasterRequestsNoMetadata) { ASSERT_EQ(init_location_requests + 1, NumGetTableLocationsRequests()); } +enum FirstRangeChangeMode { + BEGIN = 0, + RANGE_DROPPED = 0, + RANGE_DROPPED_AND_PRECEDING_RANGE_ADDED = 1, + RANGE_DROPPED_AND_LARGER_ONE_ADDED = 2, + RANGE_DROPPED_AND_SMALLER_ONE_ADDED = 3, + RANGE_REPLACED_WITH_SAME = 4, + RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED = 5, + RANGE_REPLACED_WITH_TWO_SMALLER_ONES = 6, + END = 7, +}; + +class StaleScanTokensParamTest : + public ScanTokenTest, + public ::testing::WithParamInterface { +}; + +// Create scan tokens for one state of the table and store it for future use. +// Use the tokens to scan the table. Alter the table dropping first range +// partition, optionally replacing it according with FirstRangeChangeMode +// enum. Open the altered table via the client handle which was used to run +// the token-based scan prior. Now, attempt to scan the table using stale +// tokens generated with the original state of the table. +TEST_P(StaleScanTokensParamTest, DroppingFirstRange) { + constexpr const char* const kTableName = "stale-scan-tokens-dfr"; + KuduSchema schema; + { + KuduSchemaBuilder builder; + builder.AddColumn("key")-> + NotNull()-> + Type(KuduColumnSchema::INT64)-> + PrimaryKey(); + ASSERT_OK(builder.Build(&schema)); + } + + shared_ptr table; + { + unique_ptr table_creator(client_->NewTableCreator()); + { + unique_ptr lower_bound(schema.NewRow()); + ASSERT_OK(lower_bound->SetInt64("key", -100)); + unique_ptr upper_bound(schema.NewRow()); + ASSERT_OK(upper_bound->SetInt64("key", 0)); + table_creator->add_range_partition( + lower_bound.release(), upper_bound.release()); + } + { + unique_ptr lower_bound(schema.NewRow()); + ASSERT_OK(lower_bound->SetInt64("key", 0)); + unique_ptr upper_bound(schema.NewRow()); + ASSERT_OK(upper_bound->SetInt64("key", 100)); + table_creator->add_range_partition( + lower_bound.release(), upper_bound.release()); + } + + ASSERT_OK(table_creator->table_name(kTableName) + .schema(&schema) + .set_range_partition_columns({ "key" }) + .num_replicas(1) + .Create()); + ASSERT_OK(client_->OpenTable(kTableName, &table)); + } + + // Populate the table with data. + { + shared_ptr session = client_->NewSession(); + session->SetTimeoutMillis(10000); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); + + for (int i = -50; i < 50; ++i) { + unique_ptr insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt64("key", i)); + ASSERT_OK(session->Apply(insert.release())); + } + } + + // Prepare two sets of scan tokens. + vector tokens_a; + { + KuduScanTokenBuilder builder(table.get()); + ASSERT_OK(builder.IncludeTableMetadata(true)); + ASSERT_OK(builder.IncludeTabletMetadata(true)); + ASSERT_OK(builder.Build(&tokens_a)); + } + ASSERT_EQ(2, tokens_a.size()); + + vector tokens_b; + { + KuduScanTokenBuilder builder(table.get()); + ASSERT_OK(builder.IncludeTableMetadata(true)); + ASSERT_OK(builder.IncludeTabletMetadata(true)); + ASSERT_OK(builder.Build(&tokens_b)); + } + ASSERT_EQ(2, tokens_b.size()); + + // Drop the first range partition, running the operation via the 'client_' + // handle. + { + unique_ptr lower_bound(schema.NewRow()); + unique_ptr upper_bound(schema.NewRow()); + ASSERT_OK(lower_bound->SetInt64("key", -100)); + ASSERT_OK(upper_bound->SetInt64("key", 0)); + unique_ptr alterer(client_->NewTableAlterer(kTableName)); + ASSERT_OK(alterer->DropRangePartition( + lower_bound.release(), upper_bound.release())->Alter()); + } + + shared_ptr new_client; + ASSERT_OK(cluster_->CreateClient(nullptr, &new_client)); + + int64_t row_count_a = 0; + ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_a), &row_count_a)); + ASSERT_EQ(50, row_count_a); + + // Open the test table via 'new_client' handle to populate the metadata + // with actual table metadata, including non-covered ranges. This purges + // an entry for the [-100, 0) range from the 'tablet_by_key_' map, still + // keeping corresponding RemoteTable entry in the 'tablets_by_id_' map. + { + shared_ptr t; + ASSERT_OK(new_client->OpenTable(kTableName, &t)); + } + + const auto range_adder = [&schema]( + KuduClient* c, int64_t range_beg, int64_t range_end) { + unique_ptr lower_bound(schema.NewRow()); + unique_ptr upper_bound(schema.NewRow()); + RETURN_NOT_OK(lower_bound->SetInt64("key", range_beg)); + RETURN_NOT_OK(upper_bound->SetInt64("key", range_end)); + unique_ptr alterer(c->NewTableAlterer(kTableName)); + return alterer->AddRangePartition(lower_bound.release(), + upper_bound.release())->Alter(); + }; + + // The bifurcation point. + const auto mode = GetParam(); + switch (mode) { + case RANGE_DROPPED: + break; + case RANGE_DROPPED_AND_PRECEDING_RANGE_ADDED: + ASSERT_OK(range_adder(client_.get(), -200, -100)); + break; + case RANGE_DROPPED_AND_LARGER_ONE_ADDED: + ASSERT_OK(range_adder(client_.get(), -200, 0)); + break; + case RANGE_DROPPED_AND_SMALLER_ONE_ADDED: + ASSERT_OK(range_adder(client_.get(), -50, 0)); + break; + case RANGE_REPLACED_WITH_SAME: + ASSERT_OK(range_adder(client_.get(), -100, 0)); + break; + case RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED: + ASSERT_OK(range_adder(client_.get(), -100, 0)); + ASSERT_OK(range_adder(client_.get(), -200, -100)); + break; + case RANGE_REPLACED_WITH_TWO_SMALLER_ONES: + ASSERT_OK(range_adder(client_.get(), -100, -50)); + ASSERT_OK(range_adder(client_.get(), -50, 0)); + break; + default: + FAIL() << strings::Substitute("$0: unsupported partition change mode", + static_cast(mode)); + } + + shared_ptr session = client_->NewSession(); + session->SetTimeoutMillis(10000); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); + + int64_t expected_row_count = 50; + switch (mode) { + case RANGE_DROPPED_AND_LARGER_ONE_ADDED: + expected_row_count += 100; + [[fallthrough]]; + case RANGE_REPLACED_WITH_SAME_AND_PRECEDING_RANGE_ADDED: + for (int i = -200; i < -100; ++i) { + unique_ptr insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt64("key", i)); + ASSERT_OK(session->Apply(insert.release())); + } + // The rows in the preceeding range should not be read if using the + // token for the [-100, 0) original range. + [[fallthrough]]; + case RANGE_DROPPED_AND_SMALLER_ONE_ADDED: + case RANGE_REPLACED_WITH_SAME: + case RANGE_REPLACED_WITH_TWO_SMALLER_ONES: + for (int i = -25; i < 0; ++i) { + unique_ptr insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt64("key", i)); + ASSERT_OK(session->Apply(insert.release())); + } + expected_row_count += 25; + break; + default: + break; + } + + // Start another tablet scan using the other identical set of scan tokens. + // The client metacache should not produce any errors: it should re-fetch + // the information about the current partitioning scheme and scan the table + // within the range of the new partitions which correspond to the originally + // supplied range. + int64_t row_count_b = -1; + ASSERT_OK(CountRowsSeq(new_client.get(), std::move(tokens_b), &row_count_b)); + ASSERT_EQ(expected_row_count, row_count_b); +} + +INSTANTIATE_TEST_CASE_P(FirstRangeDropped, StaleScanTokensParamTest, + testing::Range(FirstRangeChangeMode::BEGIN, + FirstRangeChangeMode::END)); + } // namespace client } // namespace kudu