Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metrics] Merge metrics by table #26

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/kudu/server/default_path_handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,30 @@ static void WriteMetricsAsJson(const MetricRegistry* const metrics,
MetricJsonOptions opts;
opts.include_raw_histograms = ParseBool(req.parsed_args, "include_raw_histograms");
opts.include_schema_info = ParseBool(req.parsed_args, "include_schema");
opts.entity_types = ParseArray(req.parsed_args, "types");
opts.entity_ids = ParseArray(req.parsed_args, "ids");
opts.entity_attrs = ParseArray(req.parsed_args, "attributes");
opts.entity_metrics = ParseArray(req.parsed_args, "metrics");

MetricFilters& filters = opts.filters;
filters.entity_types = ParseArray(req.parsed_args, "types");
filters.entity_ids = ParseArray(req.parsed_args, "ids");
filters.entity_attrs = ParseArray(req.parsed_args, "attributes");
filters.entity_metrics = ParseArray(req.parsed_args, "metrics");
vector<string> merge_rules = ParseArray(req.parsed_args, "merge_rules");
for (const auto& merge_rule : merge_rules) {
vector<string> values;
SplitStringUsing(merge_rule, "|", &values);
if (values.size() == 3) {
// Index 0: entity type needed to be merged.
// Index 1: 'merge_to' field of MergeAttributes.
// Index 2: 'attribute_to_merge_by' field of MergeAttributes.
EmplaceIfNotPresent(&opts.merge_rules, values[0], MergeAttributes(values[1], values[2]));
}
}

JsonWriter::Mode json_mode = ParseBool(req.parsed_args, "compact") ?
JsonWriter::COMPACT : JsonWriter::PRETTY;

// The number of entity_attrs should always be even because
// each pair represents a key and a value.
if (opts.entity_attrs.size() % 2 != 0) {
if (filters.entity_attrs.size() % 2 != 0) {
resp->status_code = HttpStatusCode::BadRequest;
WARN_NOT_OK(Status::InvalidArgument(""), "The parameter of 'attributes' is wrong");
} else {
Expand Down
28 changes: 28 additions & 0 deletions src/kudu/util/hdr_histogram-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,32 @@ TEST_F(HdrHistogramTest, PercentileAndCopyTest) {
ASSERT_EQ(hist.TotalSum(), copy.TotalSum());
}

void PopulateHistogram(HdrHistogram* histogram, uint64_t low, uint64_t high) {
for (uint64_t i = low; i <= high; i++) {
histogram->Increment(i);
}
}

TEST_F(HdrHistogramTest, MergeTest) {
uint64_t highest_val = 10000LU;

HdrHistogram hist(highest_val, kSigDigits);
HdrHistogram other(highest_val, kSigDigits);

PopulateHistogram(&hist, 1, 100);
PopulateHistogram(&other, 101, 250);
HdrHistogram old(hist);
hist.MergeFrom(other);

ASSERT_EQ(hist.TotalCount(), old.TotalCount() + other.TotalCount());
ASSERT_EQ(hist.TotalSum(), old.TotalSum() + other.TotalSum());
ASSERT_EQ(hist.MinValue(), 1);
ASSERT_EQ(hist.MaxValue(), 250);
ASSERT_NEAR(hist.MeanValue(), (1 + 250) / 2.0, 1e3);
ASSERT_EQ(hist.ValueAtPercentile(100.0), 250);
ASSERT_NEAR(hist.ValueAtPercentile(99.0), 250 * 99.0 / 100, 1e3);
ASSERT_NEAR(hist.ValueAtPercentile(95.0), 250 * 95.0 / 100, 1e3);
ASSERT_NEAR(hist.ValueAtPercentile(50.0), 250 * 50.0 / 100, 1e3);
}

} // namespace kudu
61 changes: 44 additions & 17 deletions src/kudu/util/hdr_histogram.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ void HdrHistogram::Increment(int64_t value) {
IncrementBy(value, 1);
}

void HdrHistogram::UpdateMinMax(int64_t min, int64_t max) {
// Update min, if needed.
{
Atomic64 min_val;
while (PREDICT_FALSE(min < (min_val = MinValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&min_value_, min_val, min);
if (PREDICT_TRUE(old_val == min_val)) break; // CAS success.
}
}

// Update max, if needed.
{
Atomic64 max_val;
while (PREDICT_FALSE(max > (max_val = MaxValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&max_value_, max_val, max);
if (PREDICT_TRUE(old_val == max_val)) break; // CAS success.
}
}
}

void HdrHistogram::IncrementBy(int64_t value, int64_t count) {
DCHECK_GE(value, 0);
DCHECK_GE(count, 0);
Expand All @@ -170,23 +190,7 @@ void HdrHistogram::IncrementBy(int64_t value, int64_t count) {
NoBarrier_AtomicIncrement(&total_count_, count);
NoBarrier_AtomicIncrement(&total_sum_, value * count);

// Update min, if needed.
{
Atomic64 min_val;
while (PREDICT_FALSE(value < (min_val = MinValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&min_value_, min_val, value);
if (PREDICT_TRUE(old_val == min_val)) break; // CAS success.
}
}

// Update max, if needed.
{
Atomic64 max_val;
while (PREDICT_FALSE(value > (max_val = MaxValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&max_value_, max_val, value);
if (PREDICT_TRUE(old_val == max_val)) break; // CAS success.
}
}
UpdateMinMax(value, value);
}

void HdrHistogram::IncrementWithExpectedInterval(int64_t value,
Expand Down Expand Up @@ -343,6 +347,29 @@ void HdrHistogram::DumpHumanReadable(std::ostream* out) const {
}
}

void HdrHistogram::MergeFrom(const HdrHistogram& other) {
DCHECK_EQ(highest_trackable_value_, other.highest_trackable_value());
DCHECK_EQ(num_significant_digits_, other.num_significant_digits());
DCHECK_EQ(counts_array_length_, other.counts_array_length_);
DCHECK_EQ(bucket_count_, other.bucket_count_);
DCHECK_EQ(sub_bucket_count_, other.sub_bucket_count_);
DCHECK_EQ(sub_bucket_half_count_magnitude_, other.sub_bucket_half_count_magnitude_);
DCHECK_EQ(sub_bucket_half_count_, other.sub_bucket_half_count_);
DCHECK_EQ(sub_bucket_mask_, other.sub_bucket_mask_);

NoBarrier_AtomicIncrement(&total_count_, other.total_count_);
NoBarrier_AtomicIncrement(&total_sum_, other.total_sum_);

UpdateMinMax(other.min_value_, other.max_value_);

for (int i = 0; i < counts_array_length_; i++) {
Atomic64 count = NoBarrier_Load(&other.counts_[i]);
if (count > 0) {
NoBarrier_AtomicIncrement(&counts_[i], count);
}
}
}

///////////////////////////////////////////////////////////////////////
// AbstractHistogramIterator
///////////////////////////////////////////////////////////////////////
Expand Down
7 changes: 7 additions & 0 deletions src/kudu/util/hdr_histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ class HdrHistogram {

// Dump a formatted, multiline string describing this histogram to 'out'.
void DumpHumanReadable(std::ostream* out) const;

// Merges 'other' into this HdrHistogram. Values in each 'counts_' array
// bucket will be added up, and the related 'min_value_', 'max_value_',
// 'total_count_' and 'total_sum_' will be updated if needed.
void MergeFrom(const HdrHistogram& other);

private:
friend class AbstractHistogramIterator;

Expand All @@ -179,6 +185,7 @@ class HdrHistogram {

void Init();
int CountsArrayIndex(int bucket_index, int sub_bucket_index) const;
void UpdateMinMax(int64_t min, int64_t max);

uint64_t highest_trackable_value_;
int num_significant_digits_;
Expand Down
Loading