diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc index 35b87f70f1..78dec70302 100644 --- a/src/kudu/tools/table_scanner.cc +++ b/src/kudu/tools/table_scanner.cc @@ -61,6 +61,7 @@ #include "kudu/util/slice.h" #include "kudu/util/stopwatch.h" #include "kudu/util/string_case.h" +#include "kudu/util/threadpool.h" using kudu::client::KuduClient; using kudu::client::KuduColumnSchema; @@ -128,6 +129,10 @@ DEFINE_bool(report_scanner_stats, false, "Whether to report scanner statistics"); DEFINE_bool(show_values, false, "Whether to show values of scanned rows."); +DEFINE_bool(enable_diff_scan, false, + "Whether to enable diff scan."); +DEFINE_uint64(diff_scan_interval_ms, 10000, + "The interval of diff scan in milliseconds."); DEFINE_string(write_type, "insert", "Write operation type to use when populating the destination " "table with the rows from the source table. Choose from " @@ -146,6 +151,8 @@ DECLARE_int64(timeout_ms); DECLARE_string(columns); DECLARE_string(tablets); +METRIC_DECLARE_entity(server); + namespace { bool IsFlagValueAcceptable(const char* flag_name, @@ -563,16 +570,24 @@ TableScanner::TableScanner( std::string table_name, optional> dst_client, optional dst_table_name) - : total_count_(0), + : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "table_scanner")), + clock_(metric_entity_), + last_scan_ts_(0), + total_count_(0), client_(std::move(client)), table_name_(std::move(table_name)), dst_client_(std::move(dst_client)), dst_table_name_(std::move(dst_table_name)), scan_batch_size_(-1), + num_threads_(FLAGS_num_threads), out_(nullptr) { CHECK_OK(SetReplicaSelection(FLAGS_replica_selection)); } +TableScanner::~TableScanner() { + ElementDeleter deleter(&all_tokens_); +} + Status TableScanner::ScanData(const vector& tokens, const function& cb) { for (const auto* token : tokens) { @@ -703,24 +718,43 @@ void TableScanner::SetScanBatchSize(int32_t scan_batch_size) { scan_batch_size_ = scan_batch_size; } -Status TableScanner::StartWork(WorkType work_type) { - client::sp::shared_ptr src_table; - RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table)); +Status TableScanner::InitScanners(WorkType work_type, ScanMode scan_mode) { + RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table_)); // Create destination table if needed. - if (work_type == WorkType::kCopy) { - RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_, *dst_table_name_)); + if (work_type == WorkType::kCopy && scan_mode == ScanMode::kBaseScan) { + RETURN_NOT_OK(CreateDstTableIfNeeded(src_table_, *dst_client_, *dst_table_name_)); if (FLAGS_write_type.empty()) { // Create table only. return Status::OK(); } } - KuduScanTokenBuilder builder(src_table.get()); + KuduScanTokenBuilder builder(src_table_.get()); RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache)); + + // Set read mode. if (mode_) { RETURN_NOT_OK(builder.SetReadMode(*mode_)); } + if (FLAGS_enable_diff_scan) { + if (mode_) { + CHECK_EQ(KuduScanner::READ_AT_SNAPSHOT, *mode_); + } else { + RETURN_NOT_OK(builder.SetReadMode(KuduScanner::READ_AT_SNAPSHOT)); + } + + uint64_t now = clock_.Now().ToUint64(); + if (scan_mode == ScanMode::kBaseScan) { + RETURN_NOT_OK(builder.SetSnapshotRaw(now)); + } else { + CHECK(ScanMode::kDiffScan == scan_mode); + CHECK_GT(last_scan_ts_, 0); + RETURN_NOT_OK(builder.SetDiffScan(last_scan_ts_, now)); + } + last_scan_ts_ = now; + } + if (scan_batch_size_ >= 0) { // Batch size of 0 is valid and has special semantics: the server sends // zero rows (i.e. no data) in the very first scan batch sent back to the @@ -733,7 +767,7 @@ Status TableScanner::StartWork(WorkType work_type) { // TODO(yingchun): push down this judgement to ScanConfiguration::SetFaultTolerant if (mode_ && *mode_ != KuduScanner::READ_AT_SNAPSHOT) { return Status::InvalidArgument(Substitute("--fault_tolerant conflicts with " - "the non-READ_AT_SNAPSHOT read mode")); + "the non-READ_AT_SNAPSHOT read mode")); } RETURN_NOT_OK(builder.SetFaultTolerant()); } @@ -772,36 +806,38 @@ Status TableScanner::StartWork(WorkType work_type) { } // Set predicates. - RETURN_NOT_OK(AddPredicates(src_table, &builder)); - - vector tokens; - ElementDeleter deleter(&tokens); - RETURN_NOT_OK(builder.Build(&tokens)); + RETURN_NOT_OK(AddPredicates(src_table_, &builder)); - const int num_threads = FLAGS_num_threads; + all_tokens_.clear(); + thread_tokens_.clear(); + RETURN_NOT_OK(builder.Build(&all_tokens_)); // Set tablet filter. const set& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace()); - map> thread_tokens; int i = 0; - for (auto* token : tokens) { + for (auto* token : all_tokens_) { if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) { - thread_tokens[i++ % num_threads].emplace_back(token); + thread_tokens_[i++ % num_threads_].emplace_back(token); } } + return Status::OK(); +} + +Status TableScanner::StartWork(WorkType work_type) { RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool") - .set_max_threads(num_threads) - .set_idle_timeout(MonoDelta::FromMilliseconds(1)) - .Build(&thread_pool_)); + .set_max_threads(thread_tokens_.size()) + .set_idle_timeout(MonoDelta::FromMilliseconds(1)) + .Build(&thread_pool_)); // Initialize statuses for each thread. - vector thread_statuses(num_threads); + vector thread_statuses(thread_tokens_.size()); Stopwatch sw(Stopwatch::THIS_THREAD); sw.start(); - for (i = 0; i < num_threads; ++i) { - auto* t_tokens = &thread_tokens[i]; + for (int i = 0; i < thread_tokens_.size(); ++i) { + auto* t_tokens = &thread_tokens_[i]; + CHECK(!t_tokens->empty()); auto* t_status = &thread_statuses[i]; if (work_type == WorkType::kScan) { RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]() @@ -826,13 +862,15 @@ Status TableScanner::StartWork(WorkType work_type) { const auto& operation = work_type == WorkType::kScan ? "Scanning" : "Copying"; Status result_status; for (const auto& s : thread_statuses) { - if (!s.ok()) { - if (out_) { - *out_ << operation << " failed: " << s.ToString() << endl; - } - if (result_status.ok()) { - result_status = s; - } + if (s.ok()) { + continue; + } + + if (out_) { + *out_ << operation << " failed: " << s.ToString() << endl; + } + if (result_status.ok()) { + result_status = s; } } @@ -840,13 +878,25 @@ Status TableScanner::StartWork(WorkType work_type) { } Status TableScanner::StartScan() { - return StartWork(WorkType::kScan); + RETURN_NOT_OK(clock_.Init()); + RETURN_NOT_OK(InitScanners(WorkType::kScan, ScanMode::kBaseScan)); + do { + RETURN_NOT_OK(StartWork(WorkType::kScan)); + if (!FLAGS_enable_diff_scan) { + break; + } + RETURN_NOT_OK(InitScanners(WorkType::kScan, ScanMode::kDiffScan)); + SleepFor(MonoDelta::FromMilliseconds(FLAGS_diff_scan_interval_ms)); + } while (true); + + return Status::OK(); } Status TableScanner::StartCopy() { CHECK(dst_client_); CHECK(dst_table_name_); + RETURN_NOT_OK(InitScanners(WorkType::kCopy, ScanMode::kBaseScan)); return StartWork(WorkType::kCopy); } diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h index 13dd39d397..fc8112c07d 100644 --- a/src/kudu/tools/table_scanner.h +++ b/src/kudu/tools/table_scanner.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -30,11 +31,13 @@ #include "kudu/client/scan_batch.h" #include "kudu/client/shared_ptr.h" // IWYU pragma: keep #include "kudu/client/write_op.h" +#include "kudu/clock/hybrid_clock.h" #include "kudu/util/mutex.h" #include "kudu/util/status.h" -#include "kudu/util/threadpool.h" namespace kudu { +class ThreadPool; + namespace tools { // This class is not thread-safe. @@ -46,6 +49,8 @@ class TableScanner { std::nullopt, std::optional dst_table_name = std::nullopt); + virtual ~TableScanner(); + // Set output stream of this tool, or disable output if not set. // 'out' must remain valid for the lifetime of this class. void SetOutput(std::ostream* out); @@ -86,6 +91,11 @@ class TableScanner { client::KuduClient::ReplicaSelection* selection); Status StartWork(WorkType work_type); + enum class ScanMode { + kBaseScan, + kDiffScan + }; + Status InitScanners(WorkType work_type, ScanMode scan_mode); Status ScanData(const std::vector& tokens, const std::function& cb); void ScanTask(const std::vector& tokens, @@ -93,6 +103,11 @@ class TableScanner { void CopyTask(const std::vector& tokens, Status* thread_status); + MetricRegistry metric_registry_; + scoped_refptr metric_entity_; + + clock::HybridClock clock_; + uint64_t last_scan_ts_; std::atomic total_count_; std::optional mode_; client::sp::shared_ptr client_; @@ -101,6 +116,11 @@ class TableScanner { std::optional> dst_client_; std::optional dst_table_name_; int32_t scan_batch_size_; + + client::sp::shared_ptr src_table_; + std::vector all_tokens_; + const int num_threads_; + std::map> thread_tokens_; std::unique_ptr thread_pool_; // Protects output to 'out_' so that rows don't get interleaved. diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc index 499c9aaaad..a14a51adbc 100644 --- a/src/kudu/tools/tool_action_perf.cc +++ b/src/kudu/tools/tool_action_perf.cc @@ -386,6 +386,8 @@ DEFINE_bool(txn_rollback, false, "the inserted rows. Setting --txn_rollback=true implies setting " "--txn_start=true as well."); +DECLARE_bool(diff_scan_interval_ms); +DECLARE_bool(enable_diff_scan); DECLARE_bool(show_values); DECLARE_int32(num_threads); DECLARE_int32(scan_batch_size); @@ -1079,6 +1081,8 @@ unique_ptr BuildPerfMode() { "or whether there is a long latency tail when scanning different tables.") .AddRequiredParameter({ kTableNameArg, "Name of the table to scan"}) .AddOptionalParameter("columns") + .AddOptionalParameter("diff_scan_interval_ms") + .AddOptionalParameter("enable_diff_scan") .AddOptionalParameter("row_count_only") .AddOptionalParameter("report_scanner_stats") .AddOptionalParameter("scan_batch_size") diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index 218209f6ae..98aaf272ca 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -214,6 +214,8 @@ DECLARE_bool(fault_tolerant); DECLARE_int32(create_table_replication_factor); DECLARE_bool(row_count_only); DECLARE_bool(show_scanner_stats); +DECLARE_bool(diff_scan_interval_ms); +DECLARE_bool(enable_diff_scan); DEFINE_string(encoding_type, "AUTO_ENCODING", "Type of encoding for the column including AUTO_ENCODING, PLAIN_ENCODING, " @@ -1933,6 +1935,8 @@ unique_ptr BuildTableMode() { "for the --predicates flag on how predicates can be specified.") .AddRequiredParameter({ kTableNameArg, "Name of the table to scan"}) .AddOptionalParameter("columns") + .AddOptionalParameter("diff_scan_interval_ms") + .AddOptionalParameter("enable_diff_scan") .AddOptionalParameter("row_count_only") .AddOptionalParameter("report_scanner_stats") .AddOptionalParameter("scan_batch_size")