Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tool_diff_scan #104

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 81 additions & 31 deletions src/kudu/tools/table_scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/string_case.h"
#include "kudu/util/threadpool.h"

using kudu::client::KuduClient;
using kudu::client::KuduColumnSchema;
Expand Down Expand Up @@ -128,6 +129,10 @@ DEFINE_bool(report_scanner_stats, false,
"Whether to report scanner statistics");
DEFINE_bool(show_values, false,
"Whether to show values of scanned rows.");
DEFINE_bool(enable_diff_scan, false,
"Whether to enable diff scan.");
DEFINE_uint64(diff_scan_interval_ms, 10000,
"The interval of diff scan in milliseconds.");
DEFINE_string(write_type, "insert",
"Write operation type to use when populating the destination "
"table with the rows from the source table. Choose from "
Expand All @@ -146,6 +151,8 @@ DECLARE_int64(timeout_ms);
DECLARE_string(columns);
DECLARE_string(tablets);

METRIC_DECLARE_entity(server);

namespace {

bool IsFlagValueAcceptable(const char* flag_name,
Expand Down Expand Up @@ -563,16 +570,24 @@ TableScanner::TableScanner(
std::string table_name,
optional<client::sp::shared_ptr<client::KuduClient>> dst_client,
optional<std::string> dst_table_name)
: total_count_(0),
: metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "table_scanner")),
clock_(metric_entity_),
last_scan_ts_(0),
total_count_(0),
client_(std::move(client)),
table_name_(std::move(table_name)),
dst_client_(std::move(dst_client)),
dst_table_name_(std::move(dst_table_name)),
scan_batch_size_(-1),
num_threads_(FLAGS_num_threads),
out_(nullptr) {
CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
}

TableScanner::~TableScanner() {
ElementDeleter deleter(&all_tokens_);
}

Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens,
const function<Status(const KuduScanBatch& batch)>& cb) {
for (const auto* token : tokens) {
Expand Down Expand Up @@ -703,24 +718,43 @@ void TableScanner::SetScanBatchSize(int32_t scan_batch_size) {
scan_batch_size_ = scan_batch_size;
}

Status TableScanner::StartWork(WorkType work_type) {
client::sp::shared_ptr<KuduTable> src_table;
RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
Status TableScanner::InitScanners(WorkType work_type, ScanMode scan_mode) {
RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table_));

// Create destination table if needed.
if (work_type == WorkType::kCopy) {
RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_, *dst_table_name_));
if (work_type == WorkType::kCopy && scan_mode == ScanMode::kBaseScan) {
RETURN_NOT_OK(CreateDstTableIfNeeded(src_table_, *dst_client_, *dst_table_name_));
if (FLAGS_write_type.empty()) {
// Create table only.
return Status::OK();
}
}

KuduScanTokenBuilder builder(src_table.get());
KuduScanTokenBuilder builder(src_table_.get());
RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache));

// Set read mode.
if (mode_) {
RETURN_NOT_OK(builder.SetReadMode(*mode_));
}
if (FLAGS_enable_diff_scan) {
if (mode_) {
CHECK_EQ(KuduScanner::READ_AT_SNAPSHOT, *mode_);
} else {
RETURN_NOT_OK(builder.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
}

uint64_t now = clock_.Now().ToUint64();
if (scan_mode == ScanMode::kBaseScan) {
RETURN_NOT_OK(builder.SetSnapshotRaw(now));
} else {
CHECK(ScanMode::kDiffScan == scan_mode);
CHECK_GT(last_scan_ts_, 0);
RETURN_NOT_OK(builder.SetDiffScan(last_scan_ts_, now));
}
last_scan_ts_ = now;
}

if (scan_batch_size_ >= 0) {
// Batch size of 0 is valid and has special semantics: the server sends
// zero rows (i.e. no data) in the very first scan batch sent back to the
Expand All @@ -733,7 +767,7 @@ Status TableScanner::StartWork(WorkType work_type) {
// TODO(yingchun): push down this judgement to ScanConfiguration::SetFaultTolerant
if (mode_ && *mode_ != KuduScanner::READ_AT_SNAPSHOT) {
return Status::InvalidArgument(Substitute("--fault_tolerant conflicts with "
"the non-READ_AT_SNAPSHOT read mode"));
"the non-READ_AT_SNAPSHOT read mode"));
}
RETURN_NOT_OK(builder.SetFaultTolerant());
}
Expand Down Expand Up @@ -772,36 +806,38 @@ Status TableScanner::StartWork(WorkType work_type) {
}

// Set predicates.
RETURN_NOT_OK(AddPredicates(src_table, &builder));

vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
RETURN_NOT_OK(builder.Build(&tokens));
RETURN_NOT_OK(AddPredicates(src_table_, &builder));

const int num_threads = FLAGS_num_threads;
all_tokens_.clear();
thread_tokens_.clear();
RETURN_NOT_OK(builder.Build(&all_tokens_));

// Set tablet filter.
const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
map<int, vector<KuduScanToken*>> thread_tokens;
int i = 0;
for (auto* token : tokens) {
for (auto* token : all_tokens_) {
if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
thread_tokens[i++ % num_threads].emplace_back(token);
thread_tokens_[i++ % num_threads_].emplace_back(token);
}
}

return Status::OK();
}

Status TableScanner::StartWork(WorkType work_type) {
RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
.set_max_threads(num_threads)
.set_idle_timeout(MonoDelta::FromMilliseconds(1))
.Build(&thread_pool_));
.set_max_threads(thread_tokens_.size())
.set_idle_timeout(MonoDelta::FromMilliseconds(1))
.Build(&thread_pool_));

// Initialize statuses for each thread.
vector<Status> thread_statuses(num_threads);
vector<Status> thread_statuses(thread_tokens_.size());

Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
for (i = 0; i < num_threads; ++i) {
auto* t_tokens = &thread_tokens[i];
for (int i = 0; i < thread_tokens_.size(); ++i) {
auto* t_tokens = &thread_tokens_[i];
CHECK(!t_tokens->empty());
auto* t_status = &thread_statuses[i];
if (work_type == WorkType::kScan) {
RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
Expand All @@ -826,27 +862,41 @@ Status TableScanner::StartWork(WorkType work_type) {
const auto& operation = work_type == WorkType::kScan ? "Scanning" : "Copying";
Status result_status;
for (const auto& s : thread_statuses) {
if (!s.ok()) {
if (out_) {
*out_ << operation << " failed: " << s.ToString() << endl;
}
if (result_status.ok()) {
result_status = s;
}
if (s.ok()) {
continue;
}

if (out_) {
*out_ << operation << " failed: " << s.ToString() << endl;
}
if (result_status.ok()) {
result_status = s;
}
}

return result_status;
}

Status TableScanner::StartScan() {
return StartWork(WorkType::kScan);
RETURN_NOT_OK(clock_.Init());
RETURN_NOT_OK(InitScanners(WorkType::kScan, ScanMode::kBaseScan));
do {
RETURN_NOT_OK(StartWork(WorkType::kScan));
if (!FLAGS_enable_diff_scan) {
break;
}
RETURN_NOT_OK(InitScanners(WorkType::kScan, ScanMode::kDiffScan));
SleepFor(MonoDelta::FromMilliseconds(FLAGS_diff_scan_interval_ms));
} while (true);

return Status::OK();
}

Status TableScanner::StartCopy() {
CHECK(dst_client_);
CHECK(dst_table_name_);

RETURN_NOT_OK(InitScanners(WorkType::kCopy, ScanMode::kBaseScan));
return StartWork(WorkType::kCopy);
}

Expand Down
22 changes: 21 additions & 1 deletion src/kudu/tools/table_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <functional>
#include <iosfwd>
#include <map>
#include <memory>
#include <optional>
#include <string>
Expand All @@ -30,11 +31,13 @@
#include "kudu/client/scan_batch.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"

namespace kudu {
class ThreadPool;

namespace tools {

// This class is not thread-safe.
Expand All @@ -46,6 +49,8 @@ class TableScanner {
std::nullopt,
std::optional<std::string> dst_table_name = std::nullopt);

virtual ~TableScanner();

// Set output stream of this tool, or disable output if not set.
// 'out' must remain valid for the lifetime of this class.
void SetOutput(std::ostream* out);
Expand Down Expand Up @@ -86,13 +91,23 @@ class TableScanner {
client::KuduClient::ReplicaSelection* selection);

Status StartWork(WorkType work_type);
enum class ScanMode {
kBaseScan,
kDiffScan
};
Status InitScanners(WorkType work_type, ScanMode scan_mode);
Status ScanData(const std::vector<client::KuduScanToken*>& tokens,
const std::function<Status(const client::KuduScanBatch& batch)>& cb);
void ScanTask(const std::vector<client::KuduScanToken*>& tokens,
Status* thread_status);
void CopyTask(const std::vector<client::KuduScanToken*>& tokens,
Status* thread_status);

MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;

clock::HybridClock clock_;
uint64_t last_scan_ts_;
std::atomic<uint64_t> total_count_;
std::optional<client::KuduScanner::ReadMode> mode_;
client::sp::shared_ptr<client::KuduClient> client_;
Expand All @@ -101,6 +116,11 @@ class TableScanner {
std::optional<client::sp::shared_ptr<client::KuduClient>> dst_client_;
std::optional<std::string> dst_table_name_;
int32_t scan_batch_size_;

client::sp::shared_ptr<client::KuduTable> src_table_;
std::vector<client::KuduScanToken*> all_tokens_;
const int num_threads_;
std::map<int, std::vector<client::KuduScanToken*>> thread_tokens_;
std::unique_ptr<ThreadPool> thread_pool_;

// Protects output to 'out_' so that rows don't get interleaved.
Expand Down
4 changes: 4 additions & 0 deletions src/kudu/tools/tool_action_perf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ DEFINE_bool(txn_rollback, false,
"the inserted rows. Setting --txn_rollback=true implies setting "
"--txn_start=true as well.");

DECLARE_bool(diff_scan_interval_ms);
DECLARE_bool(enable_diff_scan);
DECLARE_bool(show_values);
DECLARE_int32(num_threads);
DECLARE_int32(scan_batch_size);
Expand Down Expand Up @@ -1079,6 +1081,8 @@ unique_ptr<Mode> BuildPerfMode() {
"or whether there is a long latency tail when scanning different tables.")
.AddRequiredParameter({ kTableNameArg, "Name of the table to scan"})
.AddOptionalParameter("columns")
.AddOptionalParameter("diff_scan_interval_ms")
.AddOptionalParameter("enable_diff_scan")
.AddOptionalParameter("row_count_only")
.AddOptionalParameter("report_scanner_stats")
.AddOptionalParameter("scan_batch_size")
Expand Down
4 changes: 4 additions & 0 deletions src/kudu/tools/tool_action_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ DECLARE_bool(fault_tolerant);
DECLARE_int32(create_table_replication_factor);
DECLARE_bool(row_count_only);
DECLARE_bool(show_scanner_stats);
DECLARE_bool(diff_scan_interval_ms);
DECLARE_bool(enable_diff_scan);

DEFINE_string(encoding_type, "AUTO_ENCODING",
"Type of encoding for the column including AUTO_ENCODING, PLAIN_ENCODING, "
Expand Down Expand Up @@ -1933,6 +1935,8 @@ unique_ptr<Mode> BuildTableMode() {
"for the --predicates flag on how predicates can be specified.")
.AddRequiredParameter({ kTableNameArg, "Name of the table to scan"})
.AddOptionalParameter("columns")
.AddOptionalParameter("diff_scan_interval_ms")
.AddOptionalParameter("enable_diff_scan")
.AddOptionalParameter("row_count_only")
.AddOptionalParameter("report_scanner_stats")
.AddOptionalParameter("scan_batch_size")
Expand Down