From fa1e67f19484a94f9e5f266cf5d622c90b6069b6 Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Tue, 7 May 2019 10:01:50 -0700 Subject: [PATCH 01/15] KUDU-2677 Raise default value for --tablet_history_max_age_sec This raises the default --tablet_history_max_age_sec to 1 week, to support incremental backup and the diff scans it relies on. Raising the default will cause clients to be able to scan further into the past, possibly into a region where the data has been GC'd. This would result in no rows returned, which might be surprising to some users. This condition will be temporary, and will end when the new default period has passed. We don't expect this to be a problem because we don't think many users are doing scans like this, and if they are they probably adjusted --tablet_history_max_age_sec and so won't be affected by the change in default. If they want to raise their own setting for the flag to accomodate incremental backups then they will be able to take steps to mitigate the above issue. Change-Id: I6398b57ec1abcd12c59a3588dd1a61900c0ccdeb Reviewed-on: http://gerrit.cloudera.org:8080/13265 Reviewed-by: Adar Dembo Tested-by: Kudu Jenkins --- src/kudu/tablet/tablet.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 7627ed43d2..7cb1053d37 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -134,11 +134,13 @@ DEFINE_double(tablet_throttler_burst_factor, 1.0f, "base rate."); TAG_FLAG(tablet_throttler_burst_factor, experimental); -DEFINE_int32(tablet_history_max_age_sec, 15 * 60, - "Number of seconds to retain tablet history. Reads initiated at a " - "snapshot that is older than this age will be rejected. " - "To disable history removal, set to -1."); +DEFINE_int32(tablet_history_max_age_sec, 60 * 60 * 24 * 7, + "Number of seconds to retain tablet history, including history " + "required to perform diff scans and incremental backups. Reads " + "initiated at a snapshot that is older than this age will be " + "rejected. To disable history removal, set to -1."); TAG_FLAG(tablet_history_max_age_sec, advanced); +TAG_FLAG(tablet_history_max_age_sec, stable); DEFINE_int32(max_cell_size_bytes, 64 * 1024, "The maximum size of any individual cell in a table. Attempting to store " From 252b6965dd816587ddccdffa5fe5f83e28504bac Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Tue, 7 May 2019 09:29:55 -0500 Subject: [PATCH 02/15] KUDU-2775: Deflake DefaultSourceTest repartition tests This deflakes testRepartition and testRepartitionAndSort by ensuring the Spark job is complete giving the Spark listener a chance to report all of the tasks. Change-Id: I2302170df3bf3ebac6cc06381d764419c2d48303 Reviewed-on: http://gerrit.cloudera.org:8080/13263 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo --- .../tools/DistributedDataGeneratorTest.scala | 32 ++++------- .../kudu/spark/kudu/DefaultSourceTest.scala | 24 +++----- .../kudu/spark/kudu/SparkListenerUtil.scala | 55 +++++++++++++++++++ 3 files changed, 74 insertions(+), 37 deletions(-) create mode 100644 java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala index 37e26248e4..902ababc6b 100644 --- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala +++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala @@ -22,9 +22,8 @@ import org.apache.kudu.spark.kudu.KuduTestSuite import org.apache.kudu.test.RandomUtils import org.apache.kudu.util.DecimalUtil import org.apache.kudu.util.SchemaGenerator +import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.scheduler.SparkListenerTaskEnd import org.apache.spark.sql.Row import org.junit.Test import org.junit.Assert.assertEquals @@ -88,15 +87,6 @@ class DistributedDataGeneratorTest extends KuduTestSuite { @Test def testNumTasks() { - // Add a SparkListener to count the number of tasks that end. - var actualNumTasks = 0 - val listener = new SparkListener { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - actualNumTasks += 1 - } - } - ss.sparkContext.addSparkListener(listener) - val numTasks = 8 val numRows = 100 val args = Array( @@ -104,22 +94,16 @@ class DistributedDataGeneratorTest extends KuduTestSuite { s"--num-tasks=$numTasks", randomTableName, harness.getMasterAddressesAsString) - runGeneratorTest(args) + // count the number of tasks that end. + val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ => + runGeneratorTest(args) + } assertEquals(numTasks, actualNumTasks) } @Test def testNumTasksRepartition(): Unit = { - // Add a SparkListener to count the number of tasks that end. - var actualNumTasks = 0 - val listener = new SparkListener { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - actualNumTasks += 1 - } - } - ss.sparkContext.addSparkListener(listener) - val numTasks = 8 val numRows = 100 val args = Array( @@ -128,7 +112,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite { "--repartition=true", randomTableName, harness.getMasterAddressesAsString) - runGeneratorTest(args) + + // count the number of tasks that end. + val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ => + runGeneratorTest(args) + } val table = kuduContext.syncClient.openTable(randomTableName) val numPartitions = new KuduPartitioner.KuduPartitionerBuilder(table).build().numPartitions() diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 6afd32cb3d..101546f171 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -33,8 +33,7 @@ import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.Schema import org.apache.kudu.Type import org.apache.kudu.test.RandomUtils -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.scheduler.SparkListenerTaskEnd +import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter import org.apache.spark.sql.execution.datasources.LogicalRelation import org.junit.Before import org.junit.Test @@ -428,15 +427,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { options.addSplitRow(split) val table = kuduClient.createTable(tableName, simpleSchema, options) - // Add a SparkListener to count the number of tasks that end. - var actualNumTasks = 0 - val listener = new SparkListener { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - actualNumTasks += 1 - } - } - ss.sparkContext.addSparkListener(listener) - val random = Random.javaRandomToRandom(RandomUtils.getRandom) val data = random.shuffle( Seq( @@ -459,10 +449,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // Capture the rows so we can validate the insert order. kuduContext.captureRows = true - kuduContext.insertRows( - dataDF, - tableName, - new KuduWriteOptions(repartition = true, repartitionSort = repartitionSort)) + // Count the number of tasks that end. + val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ => + kuduContext.insertRows( + dataDF, + tableName, + new KuduWriteOptions(repartition = true, repartitionSort = repartitionSort)) + } + // 2 tasks from the parallelize call, and 2 from the repartitioning. assertEquals(4, actualNumTasks) val rows = kuduContext.rowsAccumulator.value.asScala diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala new file mode 100644 index 0000000000..b9bb8857bf --- /dev/null +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kudu.spark.kudu + +import org.apache.kudu.test.junit.AssertHelpers +import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.scheduler.SparkListenerJobEnd +import org.apache.spark.scheduler.SparkListenerTaskEnd + +object SparkListenerUtil { + + // TODO: Use org.apache.spark.TestUtils.withListener if it becomes public test API + def withJobTaskCounter(sc: SparkContext)(body: Any => Unit): Int = { + // Add a SparkListener to count the number of tasks that end. + var numTasks = 0 + var jobDone = false + val listener: SparkListener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + numTasks += 1 + } + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobDone = true + } + } + sc.addSparkListener(listener) + try { + body() + } finally { + // Because the SparkListener events are processed on an async queue which is behind + // private API, we use the jobEnd event to know that all of the taskEnd events + // must have been processed. + AssertHelpers.assertEventuallyTrue("Spark job did not complete", new BooleanExpression { + override def get(): Boolean = jobDone + }, 5000) + sc.removeSparkListener(listener) + } + numTasks + } +} From 58f189dac6aa691bd7b8e5ebc1e89756385147a8 Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Tue, 7 May 2019 10:55:51 -0700 Subject: [PATCH 03/15] KUDU-2711 pt 4. Java support for GetTableLocations optimizations This adds support for the optimized GetTableLocations response format added in 586e957f76a547340f2ab93a7eebc3f116ff825e. There's no new tests, but as this changes the way GetTableLocations works, it's tested by all existing tests that do any writes or scans, so it's well-tested by existing tests. Additionally, to test backwards compatibility, I ran the Java client test suite while using a set of binaries compiled without support for the GetTableLocations optimization. Change-Id: I5af146fd1984ce683f056877129506cd2068e0e8 Reviewed-on: http://gerrit.cloudera.org:8080/13287 Reviewed-by: Adar Dembo Tested-by: Will Berkeley --- .../apache/kudu/client/AsyncKuduClient.java | 48 ++++++++++++++++--- .../kudu/client/GetTableLocationsRequest.java | 1 + .../org/apache/kudu/client/LocatedTablet.java | 2 + .../org/apache/kudu/client/RemoteTablet.java | 14 +++--- .../kudu/client/TestAsyncKuduClient.java | 6 ++- .../apache/kudu/client/TestRemoteTablet.java | 14 +++--- 6 files changed, 63 insertions(+), 22 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index afc54d1189..668f2387d1 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -35,6 +35,7 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Arrays; +import java.util.function.Consumer; import java.util.HashSet; import java.util.List; import java.util.Random; @@ -75,6 +76,7 @@ import org.apache.kudu.master.Master; import org.apache.kudu.master.Master.GetTableLocationsResponsePB; import org.apache.kudu.master.Master.TableIdentifierPB; +import org.apache.kudu.master.Master.TSInfoPB; import org.apache.kudu.util.AsyncUtil; import org.apache.kudu.util.NetUtil; import org.apache.kudu.util.Pair; @@ -2112,6 +2114,7 @@ public Object call(final GetTableLocationsResponsePB response) { partitionKey, requestedBatchSize, response.getTabletLocationsList(), + response.getTsInfosList(), response.getTtlMillis()); } catch (KuduException e) { return e; @@ -2146,12 +2149,13 @@ private void releaseMasterLookupPermit() { } /** - * Makes discovered tablet locations visible in the clients caches. + * Makes discovered tablet locations visible in the client's caches. * @param table the table which the locations belong to * @param requestPartitionKey the partition key of the table locations request * @param requestedBatchSize the number of tablet locations requested from the master in the * original request * @param locations the discovered locations + * @param tsInfosList a list of ts info that the replicas in 'locations' references by index. * @param ttl the ttl of the locations */ @InterfaceAudience.LimitedPrivate("Test") @@ -2159,8 +2163,8 @@ void discoverTablets(KuduTable table, byte[] requestPartitionKey, int requestedBatchSize, List locations, + List tsInfosList, long ttl) throws KuduException { - // TODO(todd): handle "interned" response here String tableId = table.getTableId(); String tableName = table.getName(); @@ -2179,20 +2183,48 @@ void discoverTablets(KuduTable table, // Build the list of discovered remote tablet instances. If we have // already discovered the tablet, its locations are refreshed. + int numTsInfos = tsInfosList.size(); List tablets = new ArrayList<>(locations.size()); for (Master.TabletLocationsPB tabletPb : locations) { - List lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount()); + List lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount()); List servers = new ArrayList<>(tabletPb.getReplicasCount()); - for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) { + + // Lambda that does the common handling of a ts info. + Consumer updateServersAndCollectExceptions = tsInfo -> { try { - ServerInfo serverInfo = resolveTS(replica.getTsInfo()); + ServerInfo serverInfo = resolveTS(tsInfo); if (serverInfo != null) { servers.add(serverInfo); } } catch (UnknownHostException ex) { lookupExceptions.add(ex); } + }; + + // Handle "old-style" non-interned replicas. + for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) { + updateServersAndCollectExceptions.accept(replica.getTsInfo()); + } + + // Handle interned replicas. As a shim, we also need to create a list of "old-style" ReplicaPBs + // to be stored inside the RemoteTablet. + // TODO(wdberkeley): Change this so ReplicaPBs aren't used by the client at all anymore. + List replicas = new ArrayList<>(); + for (Master.TabletLocationsPB.InternedReplicaPB replica : tabletPb.getInternedReplicasList()) { + int tsInfoIdx = replica.getTsInfoIdx(); + if (tsInfoIdx >= numTsInfos) { + lookupExceptions.add(new NonRecoverableException(Status.Corruption( + String.format("invalid response from master: referenced tablet idx %d but only %d present", + tsInfoIdx, numTsInfos)))); + continue; + } + TSInfoPB tsInfo = tsInfosList.get(tsInfoIdx); + updateServersAndCollectExceptions.accept(tsInfo); + Master.TabletLocationsPB.ReplicaPB.Builder builder = Master.TabletLocationsPB.ReplicaPB.newBuilder(); + builder.setRole(replica.getRole()); + builder.setTsInfo(tsInfo); + replicas.add(builder.build()); } if (!lookupExceptions.isEmpty() && @@ -2202,7 +2234,11 @@ void discoverTablets(KuduTable table, throw new NonRecoverableException(statusIOE); } - RemoteTablet rt = new RemoteTablet(tableId, tabletPb, servers); + RemoteTablet rt = new RemoteTablet(tableId, + tabletPb.getTabletId().toStringUtf8(), + ProtobufHelper.pbToPartition(tabletPb.getPartition()), + replicas.isEmpty() ? tabletPb.getReplicasList() : replicas, + servers); LOG.debug("Learned about tablet {} for table '{}' with partition {}", rt.getTabletId(), tableName, rt.getPartition()); diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java index a7fe825228..3be2770532 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java @@ -91,6 +91,7 @@ Message createRequestPB() { builder.setPartitionKeyEnd(UnsafeByteOperations.unsafeWrap(endKey)); } builder.setMaxReturnedLocations(maxReturnedLocations); + builder.setInternTsInfosInResponse(true); return builder.build(); } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java index 655f800692..ae319034b3 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java @@ -107,6 +107,8 @@ public String toString() { @InterfaceAudience.Public @InterfaceStability.Evolving public static class Replica { + // TODO(wdberkeley): The ReplicaPB is deprecated server-side, so we ought to redo how this + // class stores its information. private final ReplicaPB pb; Replica(ReplicaPB pb) { diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java index fd0eb3a758..2241f586cb 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java @@ -69,11 +69,13 @@ public class RemoteTablet implements Comparable { private String leaderUuid; RemoteTablet(String tableId, - Master.TabletLocationsPB tabletLocations, + String tabletId, + Partition partition, + List replicas, List serverInfos) { - this.tabletId = tabletLocations.getTabletId().toStringUtf8(); + this.tabletId = tabletId; this.tableId = tableId; - this.partition = ProtobufHelper.pbToPartition(tabletLocations.getPartition()); + this.partition = partition; this.tabletServers = new HashMap<>(serverInfos.size()); for (ServerInfo serverInfo : serverInfos) { @@ -81,18 +83,18 @@ public class RemoteTablet implements Comparable { } ImmutableList.Builder replicasBuilder = new ImmutableList.Builder<>(); - for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) { + for (Master.TabletLocationsPB.ReplicaPB replica : replicas) { String uuid = replica.getTsInfo().getPermanentUuid().toStringUtf8(); replicasBuilder.add(new LocatedTablet.Replica(replica)); if (replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) { - leaderUuid = uuid; + this.leaderUuid = uuid; } } if (leaderUuid == null) { LOG.warn("No leader provided for tablet {}", getTabletId()); } - replicas.set(replicasBuilder.build()); + this.replicas.set(replicasBuilder.build()); } @Override diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java index d79d114fb7..bef643d5b7 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java @@ -173,7 +173,8 @@ public void testBadHostnames() throws Exception { try { KuduTable badTable = new KuduTable(asyncClient, "Invalid table name", "Invalid table ID", null, null, 3); - asyncClient.discoverTablets(badTable, null, requestBatchSize, tabletLocations, 1000); + asyncClient.discoverTablets(badTable, null, requestBatchSize, + tabletLocations, new ArrayList<>(), 1000); fail("This should have failed quickly"); } catch (NonRecoverableException ex) { assertTrue(ex.getMessage().contains(badHostname)); @@ -205,7 +206,8 @@ public void testNoLeader() throws Exception { "master", leader.getRpcHost(), leader.getRpcPort(), Metadata.RaftPeerPB.Role.FOLLOWER)); tabletLocations.add(tabletPb.build()); try { - asyncClient.discoverTablets(table, new byte[0], requestBatchSize, tabletLocations, 1000); + asyncClient.discoverTablets(table, new byte[0], requestBatchSize, + tabletLocations, new ArrayList<>(), 1000); fail("discoverTablets should throw an exception if there's no leader"); } catch (NoLeaderFoundException ex) { // Expected. diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java index 43ca37ac38..c4372c1b47 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java @@ -223,10 +223,8 @@ private RemoteTablet getTablet(int leaderIndex) { static RemoteTablet getTablet(int leaderIndex, int localReplicaIndex, int sameLocationReplicaIndex) { - Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder(); - - tabletPb.setPartition(ProtobufUtils.getFakePartitionPB()); - tabletPb.setTabletId(ByteString.copyFromUtf8("fake tablet")); + Partition partition = ProtobufHelper.pbToPartition(ProtobufUtils.getFakePartitionPB().build()); + List replicas = new ArrayList<>(); List servers = new ArrayList<>(); for (int i = 0; i < 3; i++) { InetAddress addr; @@ -246,11 +244,11 @@ static RemoteTablet getTablet(int leaderIndex, new HostAndPort("host", 1000 + i), addr, location)); - tabletPb.addReplicas(ProtobufUtils.getFakeTabletReplicaPB( - uuid, "host", i, - leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : Metadata.RaftPeerPB.Role.FOLLOWER)); + Metadata.RaftPeerPB.Role role = leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : + Metadata.RaftPeerPB.Role.FOLLOWER; + replicas.add(ProtobufUtils.getFakeTabletReplicaPB(uuid, "host", i, role).build()); } - return new RemoteTablet("fake table", tabletPb.build(), servers); + return new RemoteTablet("fake table", "fake tablet", partition, replicas, servers); } } From 2acfd25b849a40e30654178b312d5d0bf720ffca Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Thu, 9 May 2019 14:15:19 -0500 Subject: [PATCH 04/15] [build] Fix Gradle Checkstyle tasks Checkstyle was broken in a recent upgrade due to backwards incompatible changes to Checkstyle rules. This patch adjust the Checkstyle task to be a bit more Gradle idiomatic and also updates the checkstyle rules to be more current. Change-Id: I8f4d3aac746240949a32c798e27cd708a77966a4 Reviewed-on: http://gerrit.cloudera.org:8080/13297 Reviewed-by: Adar Dembo Tested-by: Kudu Jenkins --- .../checkstyle/checkstyle.xml} | 91 ++++++++++++++----- .../checkstyle/suppressions.xml} | 0 java/gradle/quality.gradle | 6 +- 3 files changed, 68 insertions(+), 29 deletions(-) rename java/{kudu_style.xml => config/checkstyle/checkstyle.xml} (72%) rename java/{checkstyle_suppressions.xml => config/checkstyle/suppressions.xml} (100%) diff --git a/java/kudu_style.xml b/java/config/checkstyle/checkstyle.xml similarity index 72% rename from java/kudu_style.xml rename to java/config/checkstyle/checkstyle.xml index d0b8d6c2e6..562955d052 100644 --- a/java/kudu_style.xml +++ b/java/config/checkstyle/checkstyle.xml @@ -19,10 +19,10 @@ // under the License. --> + "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN" + "https://checkstyle.org/dtds/configuration_1_3.dtd"> - + + + + + - + - + - - - + + @@ -76,19 +82,27 @@ - + - - + + + + - + - + + @@ -108,13 +122,32 @@ + + + + + + + + + + + + + + + + + + + - + - + + + + + - + - + @@ -183,28 +221,32 @@ - - + - + - + + + @@ -213,7 +255,8 @@ - + diff --git a/java/checkstyle_suppressions.xml b/java/config/checkstyle/suppressions.xml similarity index 100% rename from java/checkstyle_suppressions.xml rename to java/config/checkstyle/suppressions.xml diff --git a/java/gradle/quality.gradle b/java/gradle/quality.gradle index c92d0b230a..8bdc21aa3f 100644 --- a/java/gradle/quality.gradle +++ b/java/gradle/quality.gradle @@ -25,12 +25,8 @@ apply plugin: "ru.vyarus.animalsniffer" // Ensures Java code uses APIs from a pa apply plugin: "scalafmt" // Automatically formats Scala code on each build. apply plugin: "net.ltgt.errorprone" // Performs static code analysis to look for bugs in Java code. - checkstyle { - configFile = file("$rootDir/kudu_style.xml") - configProperties = [ - "checkstyle.suppressions.file" : "$rootDir/checkstyle_suppressions.xml" - ] + configDir = file("$rootProject.projectDir/config/checkstyle") ignoreFailures = true showViolations = true } From 762f0fcc30803c20158cc1a75f6eac2fa530a44f Mon Sep 17 00:00:00 2001 From: Alexey Serbin Date: Mon, 6 May 2019 19:10:39 -0700 Subject: [PATCH 05/15] [util] introduce Synchronizer::WaitUntil() Added Synchronizer::WaitUntil() method to avoid converting MonoTime into MonoDelta when the deadline is specified as MonoTime. Updated corresponding tests for Synchronizer as well. Change-Id: I00586ac1ba49494ff08abae0d452ab9286a3e56f Reviewed-on: http://gerrit.cloudera.org:8080/13255 Reviewed-by: Andrew Wong Tested-by: Alexey Serbin --- .../master/hms_notification_log_listener.cc | 2 +- src/kudu/util/async_util-test.cc | 90 ++++++++++++++----- src/kudu/util/async_util.h | 9 +- 3 files changed, 75 insertions(+), 26 deletions(-) diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc index b34d2338b1..2237a4b47a 100644 --- a/src/kudu/master/hms_notification_log_listener.cc +++ b/src/kudu/master/hms_notification_log_listener.cc @@ -118,7 +118,7 @@ Status HmsNotificationLogListenerTask::WaitForCatchUp(const MonoTime& deadline) wake_up_cv_.Signal(); } - RETURN_NOT_OK_PREPEND(synchronizer.WaitFor(deadline - MonoTime::Now()), + RETURN_NOT_OK_PREPEND(synchronizer.WaitUntil(deadline), "failed to wait for Hive Metastore notification log listener to catch up"); return Status::OK(); } diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc index 5cb7a6372b..91f2baa11d 100644 --- a/src/kudu/util/async_util-test.cc +++ b/src/kudu/util/async_util-test.cc @@ -28,6 +28,7 @@ #include "kudu/gutil/basictypes.h" #include "kudu/gutil/callback.h" #include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -43,7 +44,7 @@ class AsyncUtilTest : public KuduTest { // Set up an alarm to fail the test in case of deadlock. alarm(30); } - ~AsyncUtilTest() { + virtual ~AsyncUtilTest() { // Disable the alarm on test exit. alarm(0); } @@ -99,31 +100,72 @@ TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) { } } -TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) { - thread waiter; - { - Synchronizer sync; - auto cb = sync.AsStatusCallback(); - waiter = thread([cb] { - SleepFor(MonoDelta::FromMilliseconds(5)); - cb.Run(Status::OK()); - }); - ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000))); - } - waiter.join(); +// Flavors of wait that Synchronizer is capable of: WaitFor() or WaitUntil(). +enum class TimedWaitFlavor { + WaitFor, + WaitUntil, +}; - { - Synchronizer sync; - auto cb = sync.AsStatusCallback(); - waiter = thread([cb] { - SleepFor(MonoDelta::FromMilliseconds(1000)); - cb.Run(Status::OK()); - }); - ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut()); +class AsyncUtilTimedWaitTest: + public AsyncUtilTest, + public ::testing::WithParamInterface { +}; + +TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitSuccess) { + const auto kWaitInterval = MonoDelta::FromMilliseconds(1000); + + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + auto waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(5)); + cb.Run(Status::OK()); + }); + SCOPED_CLEANUP({ + waiter.join(); + }); + const auto mode = GetParam(); + switch (mode) { + case TimedWaitFlavor::WaitFor: + ASSERT_OK(sync.WaitFor(kWaitInterval)); + break; + case TimedWaitFlavor::WaitUntil: + ASSERT_OK(sync.WaitUntil(MonoTime::Now() + kWaitInterval)); + break; + default: + FAIL() << "unsupported wait mode " << static_cast(mode); + break; } +} + +TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitTimeout) { + const auto kWaitInterval = MonoDelta::FromMilliseconds(5); - // Waiting on the thread gives TSAN to check that no thread safety issues - // occurred. - waiter.join(); + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + auto waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(1000)); + cb.Run(Status::OK()); + }); + SCOPED_CLEANUP({ + waiter.join(); + }); + const auto mode = GetParam(); + switch (mode) { + case TimedWaitFlavor::WaitFor: + ASSERT_TRUE(sync.WaitFor(kWaitInterval).IsTimedOut()); + break; + case TimedWaitFlavor::WaitUntil: + ASSERT_TRUE(sync.WaitUntil(MonoTime::Now() + kWaitInterval).IsTimedOut()); + break; + default: + FAIL() << "unsupported wait mode " << static_cast(mode); + break; + } } + +INSTANTIATE_TEST_CASE_P(WaitFlavors, + AsyncUtilTimedWaitTest, + ::testing::Values(TimedWaitFlavor::WaitFor, + TimedWaitFlavor::WaitUntil)); + } // namespace kudu diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h index 338c6c2973..61621d6b4f 100644 --- a/src/kudu/util/async_util.h +++ b/src/kudu/util/async_util.h @@ -44,7 +44,7 @@ namespace kudu { class Synchronizer { public: Synchronizer() - : data_(std::make_shared()) { + : data_(std::make_shared()) { } void StatusCB(const Status& status) { @@ -71,6 +71,13 @@ class Synchronizer { return data_->status; } + Status WaitUntil(const MonoTime& deadline) const { + if (PREDICT_FALSE(!data_->latch.WaitUntil(deadline))) { + return Status::TimedOut("timed out while waiting for the callback to be called"); + } + return data_->status; + } + void Reset() { data_->latch.Reset(1); } From f6f8bbf35aa33e668f9cc5ce9b1e80d202a7f736 Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Tue, 7 May 2019 09:31:49 -0700 Subject: [PATCH 06/15] KUDU-2807 Crash when flush or compaction overlaps with another compaction Commit d3684a7b2add8f06b7189adb9ce9222b8ae1eff5 introduced a metric for average rowset height. Computing this requires examining the rowsets in the rowset tree and briefly taking each one's `compact_flush_lock_`. However, any time a thread takes the `compact_flush_lock_` of a rowset, it must hold the `compact_select_lock_` of the tablet that rowset belongs to. This was not happening in two of the three places where the average height is computed: 1. When opening the tablet. 2. When updating the rowset tree during a flush or compaction. The first case is benign (as far as I know). The second case could cause a crash like F0429 07:26:56.918041 34043 tablet.cc:2268] Check failed: lock.owns_lock() RowSet(24130) unable to lock compact_flush_lock MM ops enforced the invariant above by try-locking the `compact_flush_lock_` and checking that they obtained the lock, while holding the `compact_select_lock_`. So, if a MM op try-locked a rowset at the same time as another MM op was holding its `compact_flush_lock_`, the above crash would result. This patch fixes the crash by ensuring that the `compact_select_lock_` is held whenever `ComputeCdfAndCheckOrdered`, which computes the average rowset height, is called. I also made a small modification to the scope of a `component_lock_` to avoid having to define a lock order for `component_lock_` and `compact_select_lock_`. Change-Id: Ic255f0466aa2c158fa32e8e38428eddfcf901b99 Reviewed-on: http://gerrit.cloudera.org:8080/13264 Reviewed-by: Adar Dembo Tested-by: Will Berkeley --- src/kudu/tablet/rowset_info.h | 2 ++ src/kudu/tablet/tablet.cc | 57 ++++++++++++++++++----------------- src/kudu/tablet/tablet.h | 5 ++- 3 files changed, 36 insertions(+), 28 deletions(-) diff --git a/src/kudu/tablet/rowset_info.h b/src/kudu/tablet/rowset_info.h index af2d07db3e..f133e3e1ee 100644 --- a/src/kudu/tablet/rowset_info.h +++ b/src/kudu/tablet/rowset_info.h @@ -51,6 +51,8 @@ class RowSetInfo { // rowset tree is set into 'average_height', if it is not nullptr. // If one of 'info_by_min_key' and 'info_by_max_key' is nullptr, the other // must be. + // Requires holding the compact_select_lock_ for the tablet that the + // rowsets in 'tree' references. static void ComputeCdfAndCollectOrdered(const RowSetTree& tree, double* average_height, std::vector* info_by_min_key, diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 7cb1053d37..daee9a5b18 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -266,7 +266,6 @@ Status Tablet::Open() { TRACE_EVENT0("tablet", "Tablet::Open"); RETURN_IF_STOPPED_OR_CHECK_STATE(kInitialized); - std::lock_guard lock(component_lock_); CHECK(schema()->has_column_ids()); next_mrs_id_ = metadata_->last_durable_mrs_id() + 1; @@ -293,15 +292,6 @@ Status Tablet::Open() { shared_ptr new_rowset_tree(new RowSetTree()); CHECK_OK(new_rowset_tree->Reset(rowsets_opened)); - if (metrics_) { - // Compute the initial average height of the rowset tree. - double avg_height; - RowSetInfo::ComputeCdfAndCollectOrdered(*new_rowset_tree, - &avg_height, - nullptr, - nullptr); - metrics_->average_diskrowset_height->set_value(avg_height); - } // Now that the current state is loaded, create the new MemRowSet with the next id. shared_ptr new_mrs; @@ -309,7 +299,13 @@ Status Tablet::Open() { log_anchor_registry_.get(), mem_trackers_.tablet_tracker, &new_mrs)); - components_ = new TabletComponents(new_mrs, new_rowset_tree); + { + std::lock_guard lock(component_lock_); + components_ = new TabletComponents(new_mrs, new_rowset_tree); + } + + // Compute the initial average rowset height. + UpdateAverageRowsetHeight(); { std::lock_guard l(state_lock_); @@ -1055,10 +1051,10 @@ void Tablet::ModifyRowSetTree(const RowSetTree& old_tree, CHECK_OK(new_tree->Reset(post_swap)); } -void Tablet::AtomicSwapRowSets(const RowSetVector &old_rowsets, - const RowSetVector &new_rowsets) { +void Tablet::AtomicSwapRowSets(const RowSetVector &to_remove, + const RowSetVector &to_add) { std::lock_guard lock(component_lock_); - AtomicSwapRowSetsUnlocked(old_rowsets, new_rowsets); + AtomicSwapRowSetsUnlocked(to_remove, to_add); } void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove, @@ -1070,19 +1066,6 @@ void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove, to_remove, to_add, new_tree.get()); components_ = new TabletComponents(components_->memrowset, new_tree); - - // Recompute the average rowset height. - // TODO(wdberkeley): We should be able to cache the computation of the CDF - // and average height and efficiently recompute it instead of doing it from - // scratch. - if (metrics_) { - double avg_height; - RowSetInfo::ComputeCdfAndCollectOrdered(*new_tree, - &avg_height, - nullptr, - nullptr); - metrics_->average_diskrowset_height->set_value(avg_height); - } } Status Tablet::DoMajorDeltaCompaction(const vector& col_ids, @@ -1725,6 +1708,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input, // Replace the compacted rowsets with the new on-disk rowsets, making them visible now that // their metadata was written to disk. AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets); + UpdateAverageRowsetHeight(); const auto rows_written = drsw.rows_written_count(); const auto drs_written = drsw.drs_written_count(); @@ -1755,9 +1739,28 @@ Status Tablet::HandleEmptyCompactionOrFlush(const RowSetVector& rowsets, "Failed to flush new tablet metadata"); AtomicSwapRowSets(rowsets, RowSetVector()); + UpdateAverageRowsetHeight(); return Status::OK(); } +void Tablet::UpdateAverageRowsetHeight() { + if (!metrics_) { + return; + } + // TODO(wdberkeley): We should be able to cache the computation of the CDF + // and average height and efficiently recompute it instead of doing it from + // scratch. + scoped_refptr comps; + GetComponents(&comps); + std::lock_guard l(compact_select_lock_); + double avg_height; + RowSetInfo::ComputeCdfAndCollectOrdered(*comps->rowsets, + &avg_height, + nullptr, + nullptr); + metrics_->average_diskrowset_height->set_value(avg_height); +} + Status Tablet::Compact(CompactFlags flags) { RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 87448f5da3..05c7c9b9b4 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -347,7 +347,6 @@ class Tablet { // memrowset in the current implementation. Status CountRows(uint64_t *count) const; - // Verbosely dump this entire tablet to the logs. This is only // really useful when debugging unit tests failures where the tablet // has a very small number of rows. @@ -599,6 +598,10 @@ class Tablet { Status HandleEmptyCompactionOrFlush(const RowSetVector& rowsets, int mrs_being_flushed); + // Updates the average rowset height metric. Acquires the tablet's + // compact_select_lock_. + void UpdateAverageRowsetHeight(); + Status FlushMetadata(const RowSetVector& to_remove, const RowSetMetadataVector& to_add, int64_t mrs_being_flushed); From 9379be841b42414c59e26d5479d810bad7d65a04 Mon Sep 17 00:00:00 2001 From: Hao Hao Date: Sun, 12 May 2019 19:50:45 -0700 Subject: [PATCH 07/15] [gradle] publish kudu-hive artifact This patch allows publishing kudu-hive artifact, since it is ready to be supported long term. And there is consumer such as apache Impala which depends on it for e2e testing. Change-Id: I43c7e9ded568e3903558156851fb24bb0a0432b6 Reviewed-on: http://gerrit.cloudera.org:8080/13316 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke --- java/kudu-hive/build.gradle | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/java/kudu-hive/build.gradle b/java/kudu-hive/build.gradle index 3d4bb2bf8f..31ce5775f3 100644 --- a/java/kudu-hive/build.gradle +++ b/java/kudu-hive/build.gradle @@ -36,7 +36,4 @@ dependencies { // kudu-hive has no public Javadoc. javadoc { enabled = false -} - -// Skip publishing kudu-hive until it's ready to be supported long-term. -uploadArchives.enabled = false \ No newline at end of file +} \ No newline at end of file From 88db1982ab0790363ad2e016a6fc83d67e6b088f Mon Sep 17 00:00:00 2001 From: Alexey Serbin Date: Fri, 26 Apr 2019 11:06:09 -0700 Subject: [PATCH 08/15] [master_sentry-itest] one more scenario for authz cache This patch adds an extra scenario to verify the functionality of Kudu authz system in case of HMS+Sentry integration. The newly added scenario creates tables in Kudu when Sentry is temporary shut down and the master authz cache is enabled. Change-Id: I3d3c04e4137afff407e4db8ee39a4495d9add3dc Reviewed-on: http://gerrit.cloudera.org:8080/13291 Tested-by: Alexey Serbin Reviewed-by: Hao Hao --- src/kudu/client/master_proxy_rpc.cc | 4 +- src/kudu/hms/mini_hms.cc | 35 +++++- src/kudu/hms/mini_hms.h | 12 +- src/kudu/integration-tests/hms_itest-base.cc | 22 ++-- src/kudu/integration-tests/hms_itest-base.h | 6 +- .../integration-tests/master_sentry-itest.cc | 110 ++++++++++++++++++ 6 files changed, 172 insertions(+), 17 deletions(-) diff --git a/src/kudu/client/master_proxy_rpc.cc b/src/kudu/client/master_proxy_rpc.cc index 326df12117..b873f1dc1c 100644 --- a/src/kudu/client/master_proxy_rpc.cc +++ b/src/kudu/client/master_proxy_rpc.cc @@ -202,8 +202,8 @@ bool AsyncLeaderMasterRpc::RetryOrReconnectIfNecessary( // negotiation. // Authorization errors during negotiation generally indicate failure to - // authenticate. If that failure was due to an invalid token, try to get a - // new one by reconnecting with the master. + // authenticate. If that failure was due to an invalid authn token, + // try to get a new one by establising a new connection to the master. const ErrorStatusPB* err = retrier().controller().error_response(); if (s.IsNotAuthorized()) { if (err && err->has_code() && diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc index e730f80653..7ccb4a9cb8 100644 --- a/src/kudu/hms/mini_hms.cc +++ b/src/kudu/hms/mini_hms.cc @@ -71,12 +71,16 @@ void MiniHms::EnableKerberos(string krb5_conf, } void MiniHms::EnableSentry(const HostPort& sentry_address, - string sentry_service_principal) { + string sentry_service_principal, + int sentry_client_rpc_retry_num, + int sentry_client_rpc_retry_interval_ms) { CHECK(!hms_process_); DCHECK(!sentry_service_principal.empty()); VLOG(1) << Substitute("Enabling Sentry, at $0, for HMS", sentry_address.ToString()); sentry_address_ = sentry_address.ToString(); sentry_service_principal_ = std::move(sentry_service_principal); + sentry_client_rpc_retry_num_ = sentry_client_rpc_retry_num; + sentry_client_rpc_retry_interval_ms_ = sentry_client_rpc_retry_interval_ms; } void MiniHms::SetDataRoot(string data_root) { @@ -366,6 +370,14 @@ Status MiniHms::CreateHiveSite() const { // - sentry.metastore.service.users // Set of service users whose access will be excluded from // Sentry authorization checks. + // + // - sentry.service.client.rpc.retry-total + // Maximum number of attempts that Sentry RPC client does while + // re-trying a remote call to Sentry. + // + // - sentry.service.client.rpc.retry.interval.msec + // Time interval between attempts of Sentry's client to retry a remote + // call to Sentry. static const string kSentryFileTemplate = R"( @@ -387,12 +399,25 @@ Status MiniHms::CreateHiveSite() const { sentry.metastore.service.users kudu + + + sentry.service.client.rpc.retry-total + $3 + + + + sentry.service.client.rpc.retry.interval.msec + $4 + )"; - string sentry_file_contents = Substitute(kSentryFileTemplate, - sentry_address_, - sentry_service_principal_, - "server1"); + auto sentry_file_contents = Substitute( + kSentryFileTemplate, + sentry_address_, + sentry_service_principal_, + "server1", + sentry_client_rpc_retry_num_, + sentry_client_rpc_retry_interval_ms_); RETURN_NOT_OK(WriteStringToFile(Env::Default(), sentry_file_contents, JoinPathSegments(data_root_, "hive-sentry-site.xml"))); diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h index 0adcb21042..7fb1c22089 100644 --- a/src/kudu/hms/mini_hms.h +++ b/src/kudu/hms/mini_hms.h @@ -48,8 +48,16 @@ class MiniHms { // Configures the mini HMS to enable the Sentry plugin, passing the // Sentry service's principal to be used in Kerberos environment. + // + // Parameters 'sentry_client_rpc_retry_num' and + // 'sentry_client_rpc_retry_interval_ms' are used to override default settings + // of the Sentry client used by HMS plugins. The default values for these two + // parameters are set to allow for shorter HMS --> Sentry RPC timeout + // (i.e. shorter than with the default Sentry v2.{0,1} client's settings). void EnableSentry(const HostPort& sentry_address, - std::string sentry_service_principal); + std::string sentry_service_principal, + int sentry_client_rpc_retry_num = 3, + int sentry_client_rpc_retry_interval_ms = 500); // Configures the mini HMS to store its data in the provided path. If not set, // it uses a test-only temporary directory. @@ -114,6 +122,8 @@ class MiniHms { // Sentry configuration std::string sentry_address_; std::string sentry_service_principal_; + int sentry_client_rpc_retry_num_; + int sentry_client_rpc_retry_interval_ms_; }; } // namespace hms diff --git a/src/kudu/integration-tests/hms_itest-base.cc b/src/kudu/integration-tests/hms_itest-base.cc index 03697921f0..64a635b40b 100644 --- a/src/kudu/integration-tests/hms_itest-base.cc +++ b/src/kudu/integration-tests/hms_itest-base.cc @@ -35,6 +35,7 @@ #include "kudu/hms/mini_hms.h" #include "kudu/mini-cluster/external_mini_cluster.h" #include "kudu/util/decimal_util.h" +#include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" @@ -73,9 +74,9 @@ Status HmsITestBase::CreateDatabase(const string& database_name) { } Status HmsITestBase::CreateKuduTable(const string& database_name, - const string& table_name) { + const string& table_name, + MonoDelta timeout) { // Get coverage of all column types. - KuduSchema schema; KuduSchemaBuilder b; b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); b.AddColumn("int8_val")->Type(KuduColumnSchema::INT8); @@ -94,14 +95,19 @@ Status HmsITestBase::CreateKuduTable(const string& database_name, ->Precision(kMaxDecimal64Precision); b.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL) ->Precision(kMaxDecimal128Precision); - + KuduSchema schema; RETURN_NOT_OK(b.Build(&schema)); unique_ptr table_creator(client_->NewTableCreator()); - return table_creator->table_name(Substitute("$0.$1", database_name, table_name)) - .schema(&schema) - .num_replicas(1) - .set_range_partition_columns({ "key" }) - .Create(); + if (timeout.Initialized()) { + // If specified, set the timeout for the operation. + table_creator->timeout(timeout); + } + return table_creator->table_name(Substitute("$0.$1", + database_name, table_name)) + .schema(&schema) + .num_replicas(1) + .set_range_partition_columns({ "key" }) + .Create(); } Status HmsITestBase::RenameHmsTable(const string& database_name, diff --git a/src/kudu/integration-tests/hms_itest-base.h b/src/kudu/integration-tests/hms_itest-base.h index b348031684..50c814c4e1 100644 --- a/src/kudu/integration-tests/hms_itest-base.h +++ b/src/kudu/integration-tests/hms_itest-base.h @@ -22,12 +22,15 @@ #include +#include "kudu/gutil/port.h" #include "kudu/hms/hms_client.h" #include "kudu/integration-tests/external_mini_cluster-itest-base.h" #include "kudu/util/status.h" namespace kudu { +class MonoDelta; + class HmsITestBase : public ExternalMiniClusterITestBase { public: Status StartHms() WARN_UNUSED_RESULT; @@ -38,7 +41,8 @@ class HmsITestBase : public ExternalMiniClusterITestBase { // Creates a table in Kudu. Status CreateKuduTable(const std::string& database_name, - const std::string& table_name) WARN_UNUSED_RESULT; + const std::string& table_name, + MonoDelta timeout = {}) WARN_UNUSED_RESULT; // Renames a table entry in the HMS catalog. Status RenameHmsTable(const std::string& database_name, diff --git a/src/kudu/integration-tests/master_sentry-itest.cc b/src/kudu/integration-tests/master_sentry-itest.cc index 02ea86a119..c42e91a71d 100644 --- a/src/kudu/integration-tests/master_sentry-itest.cc +++ b/src/kudu/integration-tests/master_sentry-itest.cc @@ -873,6 +873,116 @@ TEST_F(SentryAuthzProviderCacheITest, ResetAuthzCacheConcurrentAlterTable) { } } +// This test scenario documents an artifact of the Kudu+HMS+Sentry integration +// when authz cache is enabled (the cache is enabled by default). In essence, +// information on the ownership of a table created during a short period of +// Sentry's unavailability will not ever appear in Sentry. That might be +// misleading because CreateTable() reports a success to the client. The created +// table indeed exists and is fully functional otherwise, but the corresponding +// owner privilege record is absent in Sentry. +TEST_F(SentryAuthzProviderCacheITest, CreateTables) { + constexpr const char* const kGhostTables[] = { "t10", "t11" }; + + // Grant CREATE TABLE and METADATA privileges on the database. + ASSERT_OK(GrantCreateTablePrivilege({ kDatabaseName })); + ASSERT_OK(AlterRoleGrantPrivilege( + sentry_client_.get(), kDevRole, + GetDatabasePrivilege(kDatabaseName, "METADATA"))); + + // Make sure it's possible to create a table in the database. This also + // populates the privileges cache with information on the privileges + // granted on the database. + ASSERT_OK(CreateKuduTable(kDatabaseName, "t0")); + + // An attempt to open a not-yet-existing table will fetch the information + // on the granted privileges on the table into the privileges cache. + for (const auto& t : kGhostTables) { + shared_ptr kudu_table; + const auto s = client_->OpenTable(Substitute("$0.$1", kDatabaseName, t), + &kudu_table); + ASSERT_TRUE(s.IsNotFound()) << s.ToString(); + } + + ASSERT_OK(StopSentry()); + + // CreateTable() with operation timeout longer than HMS --> Sentry + // communication timeout successfully completes. After failing to push + // the information on the newly created table to Sentry due to the logic + // implemented in the SentrySyncHMSNotificationsPostEventListener plugin, + // HMS sends success response to Kudu master and Kudu successfully completes + // the rest of the steps. + ASSERT_OK(CreateKuduTable(kDatabaseName, kGhostTables[0])); + + // In this case, the timeout for the CreateTable RPC is set to be lower than + // the HMS --> Sentry communication timeout (see corresponding parameters + // of the MiniHms::EnableSentry() method). CreateTable() successfully passes + // the authz phase since the information on privileges is cached and no + // Sentry RPC calls are attempted. However, since Sentry is down, + // CreateTable() takes a long time on the HMS's side and the client's + // request times out, while the creation of the table continues in the + // background. + { + const auto s = CreateKuduTable(kDatabaseName, kGhostTables[1], + MonoDelta::FromSeconds(1)); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + } + + // Before starting Sentry, make sure the abandoned request to create the + // latter table succeeded even if CreateKuduTable() reported timeout. + // This is to make sure HMS stopped trying to push notification + // on table creation to Sentry anymore via the metastore plugin + // SentrySyncHMSNotificationsPostEventListener. + ASSERT_EVENTUALLY([&]{ + bool exists; + ASSERT_OK(client_->TableExists( + Substitute("$0.$1", kDatabaseName, kGhostTables[1]), &exists)); + ASSERT_TRUE(exists); + }); + + ASSERT_OK(ResetCache()); + + // After resetting the cache, it should not be possible to create another + // table: authz provider needs to fetch information on privileges directly + // from Sentry, but it's still down. + { + const auto s = CreateKuduTable(kDatabaseName, "t2"); + ASSERT_TRUE(s.IsNetworkError()) << s.ToString(); + } + + ASSERT_OK(StartSentry()); + + // Try to create the table after starting Sentry back: it should be a success. + ASSERT_OK(CreateKuduTable(kDatabaseName, "t2")); + + // Once table has been created, it should be possible to perform DDL operation + // on it since the user is the owner of the table. + { + unique_ptr table_alterer( + client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, "t2"))); + table_alterer->AddColumn("new_int8_columun")->Type(KuduColumnSchema::INT8); + ASSERT_OK(table_alterer->Alter()); + } + + // Try to run DDL against the tables created during Sentry's downtime. These + // should not be authorized since Sentry didn't received information on the + // ownership of those tables from HMS during their creation and there isn't + // any catch up for those events made after Sentry started. + for (const auto& t : kGhostTables) { + unique_ptr table_alterer( + client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, t))); + table_alterer->AddColumn("new_int8_columun")->Type(KuduColumnSchema::INT8); + auto s = table_alterer->Alter(); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + + // After granting the ALTER TABLE privilege the alteration should be + // successful. One caveat: it's necessary to reset the cache since the cache + // will not have the new record till the current entry hasn't yet expired. + ASSERT_OK(GrantAlterTablePrivilege({ kDatabaseName, t })); + ASSERT_OK(ResetCache()); + ASSERT_OK(table_alterer->Alter()); + } +} + // Basic test to verify access control and functionality of // the ResetAuthzCache(); integration with Sentry is not enabled. class AuthzCacheControlTest : public ExternalMiniClusterITestBase { From 7a6df6aa18ff31b7947890cbc44b49189a23a5bf Mon Sep 17 00:00:00 2001 From: Adar Dembo Date: Mon, 13 May 2019 14:39:52 -0700 Subject: [PATCH 09/15] system_ntp: wait for sync via kernel We've been having more and more trouble with dist-test slave clocks becoming desynchronized. Because slaves run inside docker containers, we can't use ntpd/chrony-based tools such as ntp-wait to wait for synchronization. Moreover, this highlights a general issue with the existing wait-for-sync approach: we can't differentiate between environments where ntpd isn't running but should be, and environments where it's not but synchronization is still expected. This patch switches from ntp-wait and friends to ntp_adjtime-based waiting. Originally it also cut the wait time from 60s to 30s (as an overture for users running in the first environment who would prefer not to wait a full minute to know that their ntpd isn't running), but this proved to be insufficient to ride out dist-test clock desynchronization. I also looked at the source code for ntp-wait and chronyc, but didn't see them doing anything fundamentally more interesting than what we're now doing with ntp_adjtime. Change-Id: Ic133c4f9b5fd933216fb27f9f01396a5fee8276b Reviewed-on: http://gerrit.cloudera.org:8080/13323 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley --- src/kudu/clock/system_ntp.cc | 45 +++++++++++++++--------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/src/kudu/clock/system_ntp.cc b/src/kudu/clock/system_ntp.cc index 108056a437..9993838c25 100644 --- a/src/kudu/clock/system_ntp.cc +++ b/src/kudu/clock/system_ntp.cc @@ -35,6 +35,7 @@ #include "kudu/util/errno.h" #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" +#include "kudu/util/monotime.h" #include "kudu/util/path_util.h" #include "kudu/util/status.h" #include "kudu/util/subprocess.h" @@ -124,36 +125,26 @@ Status WaitForNtp() { } LOG(INFO) << Substitute("Waiting up to --ntp_initial_sync_wait_secs=$0 " "seconds for the clock to synchronize", wait_secs); - vector cmd; - string exe; - Status s = FindExecutable("ntp-wait", {"/sbin", "/usr/sbin"}, &exe); - if (s.ok()) { - // -s is the number of seconds to sleep between retries. - // -n is the number of tries before giving up. - cmd = {exe, "-s", "1", "-n", std::to_string(wait_secs)}; - } else { - LOG(WARNING) << "Could not find ntp-wait; trying chrony waitsync instead: " - << s.ToString(); - s = FindExecutable("chronyc", {"/sbin", "/usr/sbin"}, &exe); - if (!s.ok()) { - LOG(WARNING) << "Could not find chronyc: " << s.ToString(); - return Status::NotFound("failed to find ntp-wait or chronyc"); + + // We previously relied on ntpd/chrony support tools to wait, but that + // approach doesn't work in environments where ntpd is unreachable but the + // clock is still synchronized (i.e. running inside a Linux container). + // + // Now we just interrogate the kernel directly. + Status s; + for (int i = 0; i < wait_secs; i++) { + timex timex; + s = CallAdjTime(&timex); + if (s.ok() || !s.IsServiceUnavailable()) { + return s; } - // Usage: waitsync max-tries max-correction max-skew interval. - // max-correction and max-skew parameters as 0 means no checks. - // The interval is measured in seconds. - cmd = {exe, "waitsync", std::to_string(wait_secs), "0", "0", "1"}; - } - // Unfortunately, neither ntp-wait nor chronyc waitsync print useful messages. - // Instead, rely on DumpDiagnostics. - s = Subprocess::Call(cmd); - if (!s.ok()) { - return s.CloneAndPrepend( - Substitute("failed to wait for clock sync using command '$0'", - JoinStrings(cmd, " "))); + SleepFor(MonoDelta::FromSeconds(1)); } - return Status::OK(); + + // Return the last failure. + return s.CloneAndPrepend("Timed out waiting for clock sync"); } + } // anonymous namespace void SystemNtp::DumpDiagnostics(vector* log) const { From b9a590a34ac219f1d5d1fe34176aece8d9f79d30 Mon Sep 17 00:00:00 2001 From: Adar Dembo Date: Tue, 14 May 2019 13:34:47 -0700 Subject: [PATCH 10/15] tablet_copy-test-base: fix a null ptr deref in TearDown If the mini tablet server fails to start (i.e. because of clock desynchronization), the test crashes in TearDown. tablet_copy_client-test: /data0/jenkins/workspace/kudu-pre-commit-unittest-TSAN/src/kudu/gutil/ref_counted.h:284: T *scoped_refptr::operator->() const [T = kudu::tablet::TabletReplica]: Assertion `ptr_ != __null' failed. *** Aborted at 1557801930 (unix time) try "date -d @1557801930" if you are using GNU date *** PC: @ 0x7fb2ab088c37 gsignal *** SIGABRT (@0x3e800001be3) received by PID 7139 (TID 0x7fb2b84f0440) from PID 7139; stack trace: *** @ 0x46d9bd __tsan::CallUserSignalHandler() at /data0/jenkins/workspace/kudu-pre-commit-unittest-TSAN/thirdparty/src/llvm-6.0.0.src/projects/compiler-rt/lib/tsan/rtl/tsan_interceptors.cc:1908 @ 0x46e9ab rtl_sigaction() at /data0/jenkins/workspace/kudu-pre-commit-unittest-TSAN/thirdparty/src/llvm-6.0.0.src/projects/compiler-rt/lib/tsan/rtl/tsan_interceptors.cc:1997 @ 0x7fb2b2ee2330 (unknown) at ??:0 @ 0x7fb2ab088c37 gsignal at ??:0 @ 0x7fb2ab08c028 abort at ??:0 @ 0x7fb2ab081bf6 (unknown) at ??:0 @ 0x7fb2ab081ca2 __assert_fail at ??:0 @ 0x4eb7f0 scoped_refptr<>::operator->() at /data0/jenkins/workspace/kudu-pre-commit-unittest-TSAN/src/kudu/tablet/tablet_replica.h:245 @ 0x4ed463 kudu::tserver::TabletCopyTest::TearDown() at /data0/jenkins/workspace/kudu-pre-commit-unittest-TSAN/src/kudu/tserver/tablet_copy-test-base.h:60 @ 0x7fb2b75e8300 testing::internal::HandleExceptionsInMethodIfSupported<>() at ??:0 @ 0x7fb2b75c7531 testing::Test::Run() at ??:0 @ 0x7fb2b75c875d testing::TestInfo::Run() at ??:0 @ 0x7fb2b75c9237 testing::TestCase::Run() at ??:0 @ 0x7fb2b75d560b testing::internal::UnitTestImpl::RunAllTests() at ??:0 @ 0x7fb2b75e9270 testing::internal::HandleExceptionsInMethodIfSupported<>() at ??:0 @ 0x7fb2b75d4ef3 testing::UnitTest::Run() at ??:0 @ 0x7fb2b7a3b4fc RUN_ALL_TESTS() at ??:0 @ 0x7fb2b7a3aca7 main at ??:0 @ 0x7fb2ab073f45 __libc_start_main at ??:0 @ 0x44473b (unknown) at ??:? Change-Id: I9257d20cf9728dfa3a58710e2dfc75f628c2daa5 Reviewed-on: http://gerrit.cloudera.org:8080/13332 Reviewed-by: Alexey Serbin Tested-by: Adar Dembo --- src/kudu/tserver/tablet_copy-test-base.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h index 99624b5943..b4db99c80d 100644 --- a/src/kudu/tserver/tablet_copy-test-base.h +++ b/src/kudu/tserver/tablet_copy-test-base.h @@ -57,7 +57,9 @@ class TabletCopyTest : public TabletServerTestBase { } virtual void TearDown() OVERRIDE { - ASSERT_OK(tablet_replica_->log_anchor_registry()->Unregister(&anchor_)); + if (tablet_replica_) { + ASSERT_OK(tablet_replica_->log_anchor_registry()->Unregister(&anchor_)); + } NO_FATALS(TabletServerTestBase::TearDown()); } From 39a598741f008fbc3cba830e2644160ce799ac7e Mon Sep 17 00:00:00 2001 From: Mitch Barnett Date: Mon, 13 May 2019 18:03:46 -0500 Subject: [PATCH 11/15] KUDU-2763: Eliminate confusing log message Whenever a new tablet is created, the first leader will always send its first message with "preceding opid" set to (1,1). This generates a "log matching property violated" message that can confuse operators, when the behavior seen is actually what we expect. I wrapped the log output in a conditional statement so that we don't output the "log matching property violated" INFO message when the preceding_opid from the leader is equal to (1,1) which indicates a new tablet replica, thus the message doesn't need to be shown for this particular scenario. Change-Id: I95dd73cb2876dc3def218d84316ca015ddc9f166 Reviewed-on: http://gerrit.cloudera.org:8080/13325 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong --- src/kudu/consensus/raft_consensus.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 3435d5d63a..a7b2057225 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -1178,8 +1178,12 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, Status::IllegalState(error_msg)); - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer " - << req.leader_uuid << ": " << error_msg; + // Adding a check to eliminate an unnecessary log message in the + // scenario where this is the first message from the Leader of a new tablet. + if (!OpIdEquals(MakeOpId(1,1), *req.preceding_opid)) { + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer " + << req.leader_uuid << ": " << error_msg; + } // If the terms mismatch we abort down to the index before the leader's preceding, // since we know that is the last opid that has a chance of not being overwritten. From 4ac5e5074112574bb980a5644c711daa5ab615fa Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Thu, 9 May 2019 03:22:44 -0400 Subject: [PATCH 12/15] [util] Introduce yaml-cpp to read config files KUDU-1948 notice that yaml is better than json when use it as a config file. yaml-cpp [https://github.com/jbeder/yaml-cpp] is the most popular C++ yaml library I can find on GitHub. This patch introduce yaml-cpp into Kudu, do some simple wrap, and add some unit tests. Change-Id: I8ef58befaffbcc880e13fa6fec61b8e94a189b5a Reviewed-on: http://gerrit.cloudera.org:8080/13294 Reviewed-by: Adar Dembo Tested-by: Kudu Jenkins --- CMakeLists.txt | 7 ++ cmake_modules/FindYaml.cmake | 31 ++++++ src/kudu/util/CMakeLists.txt | 4 +- src/kudu/util/yamlreader-test.cc | 176 ++++++++++++++++++++++++++++++ src/kudu/util/yamlreader.cc | 100 +++++++++++++++++ src/kudu/util/yamlreader.h | 129 ++++++++++++++++++++++ thirdparty/build-definitions.sh | 29 +++++ thirdparty/build-thirdparty.sh | 9 ++ thirdparty/download-thirdparty.sh | 6 + thirdparty/vars.sh | 5 + 10 files changed, 495 insertions(+), 1 deletion(-) create mode 100644 cmake_modules/FindYaml.cmake create mode 100644 src/kudu/util/yamlreader-test.cc create mode 100644 src/kudu/util/yamlreader.cc create mode 100644 src/kudu/util/yamlreader.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c08e4c2dd..f0548096d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1222,6 +1222,13 @@ else() set(KRB5_REALM_OVERRIDE -Wl,-U,krb5_realm_override_loaded krb5_realm_override) endif() +## yaml +find_package(Yaml REQUIRED) +include_directories(SYSTEM ${YAML_INCLUDE_DIR}) +ADD_THIRDPARTY_LIB(yaml + STATIC_LIB "${YAML_STATIC_LIB}" + SHARED_LIB "${YAML_SHARED_LIB}") + ## Boost # We use a custom cmake module and not cmake's FindBoost. diff --git a/cmake_modules/FindYaml.cmake b/cmake_modules/FindYaml.cmake new file mode 100644 index 0000000000..cc73805ced --- /dev/null +++ b/cmake_modules/FindYaml.cmake @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +find_path(YAML_INCLUDE_DIR yaml-cpp/yaml.h + # make sure we don't accidentally pick up a different version + NO_CMAKE_SYSTEM_PATH + NO_SYSTEM_ENVIRONMENT_PATH) +find_library(YAML_STATIC_LIB libyaml-cpp.a + NO_CMAKE_SYSTEM_PATH + NO_SYSTEM_ENVIRONMENT_PATH) +find_library(YAML_SHARED_LIB libyaml-cpp.so + NO_CMAKE_SYSTEM_PATH + NO_SYSTEM_ENVIRONMENT_PATH) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(YAML REQUIRED_VARS + YAML_STATIC_LIB YAML_SHARED_LIB YAML_INCLUDE_DIR) diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index f12d181e8e..47334c63b2 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -230,6 +230,7 @@ set(UTIL_SRCS version_info.cc version_util.cc website_util.cc + yamlreader.cc zlib.cc ) @@ -257,6 +258,7 @@ set(UTIL_LIBS pb_util_proto protobuf version_info_proto + yaml zlib) if(NOT APPLE) @@ -293,7 +295,6 @@ set(UTIL_COMPRESSION_SRCS set(UTIL_COMPRESSION_LIBS kudu_util util_compression_proto - glog gutil lz4 @@ -467,6 +468,7 @@ ADD_KUDU_TEST(ttl_cache-test) ADD_KUDU_TEST(url-coding-test) ADD_KUDU_TEST(user-test) ADD_KUDU_TEST(version_util-test) +ADD_KUDU_TEST(yamlreader-test) if (NOT APPLE) ADD_KUDU_TEST(minidump-test) diff --git a/src/kudu/util/yamlreader-test.cc b/src/kudu/util/yamlreader-test.cc new file mode 100644 index 0000000000..8394cf4440 --- /dev/null +++ b/src/kudu/util/yamlreader-test.cc @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/yamlreader.h" + +#include +#include +#include + +#include +#include +#include + +#include "kudu/util/env.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::unique_ptr; +using std::vector; + +namespace kudu { + +class YamlReaderTest : public KuduTest { +public: + Status GenerateYamlReader(const string& content, unique_ptr* result) { + string fname = GetTestPath("YamlReaderTest.json"); + unique_ptr writable_file; + RETURN_NOT_OK(env_->NewWritableFile(fname, &writable_file)); + RETURN_NOT_OK(writable_file->Append(Slice(content))); + RETURN_NOT_OK(writable_file->Close()); + result->reset(new YamlReader(fname)); + return Status::OK(); + } +}; + +TEST_F(YamlReaderTest, FileNotExist) { + YamlReader r("YamlReaderTest.NotExist"); + Status s = r.Init(); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_STR_CONTAINS( + s.ToString(), "YAML::LoadFile error"); +} + +TEST_F(YamlReaderTest, Corruption) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("foo", &r)); + ASSERT_OK(r->Init()); + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "foo", &val).IsCorruption()); +} + +TEST_F(YamlReaderTest, EmptyFile) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("", &r)); + ASSERT_OK(r->Init()); + + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "foo", &val).IsCorruption()); +} + +TEST_F(YamlReaderTest, KeyNotExist) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("foo: 1", &r)); + ASSERT_OK(r->Init()); + + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "bar", &val).IsNotFound()); +} + +TEST_F(YamlReaderTest, Scalar) { + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("bool_val: false", &r)); + ASSERT_OK(r->Init()); + bool val = true; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "bool_val", &val)); + ASSERT_EQ(val, false); + } + + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("int_val: 123", &r)); + ASSERT_OK(r->Init()); + int val = 0; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "int_val", &val)); + ASSERT_EQ(val, 123); + } + + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("double_val: 123.456", &r)); + ASSERT_OK(r->Init()); + double val = 0.0; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "double_val", &val)); + ASSERT_EQ(val, 123.456); + } + + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("string_val: hello yaml", &r)); + ASSERT_OK(r->Init()); + string val; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "string_val", &val)); + ASSERT_EQ(val, "hello yaml"); + } +} + +TEST_F(YamlReaderTest, Map) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader( + "map_val: { key1: hello yaml, key2: 123.456 , key3: 123, key4: false}", &r)); + ASSERT_OK(r->Init()); + + YAML::Node node; + ASSERT_OK(YamlReader::ExtractMap(r->node(), "map_val", &node)); + + { + string val; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key1", &val)); + ASSERT_EQ(val, "hello yaml"); + } + + { + double val = 0.0; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key2", &val)); + ASSERT_EQ(val, 123.456); + } + + { + int val = 0; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key3", &val)); + ASSERT_EQ(val, 123); + } + + { + bool val = true; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key4", &val)); + ASSERT_EQ(val, false); + } + + // Not exist key. + { + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(&node, "key5", &val).IsNotFound()); + } +} + +TEST_F(YamlReaderTest, Array) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("list_val: [1, 3, 5, 7, 9]", &r)); + ASSERT_OK(r->Init()); + const std::vector& expect_vals = { 1, 3, 5, 7, 9 }; + + std::vector vals; + ASSERT_OK(YamlReader::ExtractArray(r->node(), "list_val", &vals)); + ASSERT_EQ(vals, expect_vals); +} + +} // namespace kudu diff --git a/src/kudu/util/yamlreader.cc b/src/kudu/util/yamlreader.cc new file mode 100644 index 0000000000..a3ee8e0d88 --- /dev/null +++ b/src/kudu/util/yamlreader.cc @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/yamlreader.h" + +#include +#include + +#include +// IWYU pragma: no_include +// IWYU pragma: no_include + +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" + +using std::string; +using strings::Substitute; +using YAML::Node; +using YAML::NodeType; + +namespace kudu { + +YamlReader::YamlReader(string filename) : filename_(std::move(filename)) {} + +Status YamlReader::Init() { + try { + node_ = YAML::LoadFile(filename_); + } catch (std::exception& e) { + return Status::Corruption(Substitute("YAML::LoadFile error: $0", e.what())); + } + + return Status::OK(); +} + +Status YamlReader::ExtractMap(const Node* node, + const string& field, + Node* result) { + CHECK(result); + Node val; + RETURN_NOT_OK(ExtractField(node, field, &val)); + if (PREDICT_FALSE(!val.IsMap())) { + return Status::Corruption(Substitute( + "wrong type during field extraction: expected map but got $0", + TypeToString(val.Type()))); + } + *result = val; + return Status::OK(); +} + +Status YamlReader::ExtractField(const Node* node, + const string& field, + Node* result) { + if (PREDICT_FALSE(!node->IsDefined() || !node->IsMap())) { + return Status::Corruption("node is not map type"); + } + try { + *result = (*node)[field]; + } catch (std::exception& e) { + return Status::NotFound(Substitute("parse field $0 error: $1", field, e.what())); + } + if (PREDICT_FALSE(!result->IsDefined())) { + return Status::Corruption("Missing field", field); + } + + return Status::OK(); +} + +const char* YamlReader::TypeToString(NodeType::value t) { + switch (t) { + case NodeType::Undefined: + return "undefined"; + case NodeType::Null: + return "null"; + case NodeType::Scalar: + return "scalar"; + case NodeType::Sequence: + return "sequence"; + case NodeType::Map: + return "map"; + default: + LOG(FATAL) << "unexpected type: " << t; + } + return ""; +} + +} // namespace kudu diff --git a/src/kudu/util/yamlreader.h b/src/kudu/util/yamlreader.h new file mode 100644 index 0000000000..2888f9fb3f --- /dev/null +++ b/src/kudu/util/yamlreader.h @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include + +#include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +#include // IWYU pragma: keep + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/status.h" + +namespace kudu { + +// Wraps the YAML parsing functionality of YAML::Node. +// +// This class can read yaml content from a file, extract scalar, map and array type of values. +class YamlReader { + public: + explicit YamlReader(std::string filename); + ~YamlReader() = default; + + Status Init(); + + // Extractor methods. + // + // Look for a field with the name of 'field' in the given node. + // Return Status::OK if it can be found and extracted as the specified type, + // or return Status::NotFound if it cannot be found, or Status::Corruption if + // the node is not extractable or the field can not be extracted as the type. + + template + static Status ExtractScalar(const YAML::Node* node, + const std::string& field, + T* result); + + static Status ExtractMap(const YAML::Node* node, + const std::string& field, + YAML::Node* result); + + template + static Status ExtractArray(const YAML::Node* node, + const std::string& field, + std::vector* result); + + const YAML::Node* node() const { return &node_; } + + private: + static const char* TypeToString(YAML::NodeType::value t); + + static Status ExtractField(const YAML::Node* node, + const std::string& field, + YAML::Node* result); + + std::string filename_; + YAML::Node node_; + + DISALLOW_COPY_AND_ASSIGN(YamlReader); +}; + +template +Status YamlReader::ExtractScalar(const YAML::Node* node, + const std::string& field, + T* result) { + CHECK(result); + YAML::Node val; + RETURN_NOT_OK(ExtractField(node, field, &val)); + if (PREDICT_FALSE(!val.IsScalar())) { + return Status::Corruption(strings::Substitute( + "wrong type during field extraction: expected scalar but got $0", + TypeToString(val.Type()))); + } + *result = val.as(); + return Status::OK(); +} + +template +Status YamlReader::ExtractArray(const YAML::Node* node, + const std::string& field, + std::vector* result) { + CHECK(result); + YAML::Node val; + RETURN_NOT_OK(ExtractField(node, field, &val)); + if (PREDICT_FALSE(!val.IsSequence())) { + return Status::Corruption(strings::Substitute( + "wrong type during field extraction: expected sequence but got $0", + TypeToString(val.Type()))); + } + result->reserve(val.size()); + for (YAML::const_iterator iter = val.begin(); iter != val.end(); ++iter) { + try { + if (PREDICT_FALSE(!iter->IsScalar())) { + return Status::Corruption(strings::Substitute( + "wrong type during field extraction: expected scalar but got $0", + TypeToString(iter->Type()))); + } + result->push_back(iter->as()); + } catch (std::exception& e) { + return Status::Corruption(strings::Substitute("parse list element error: $0", e.what())); + } + } + return Status::OK(); +} + +} // namespace kudu diff --git a/thirdparty/build-definitions.sh b/thirdparty/build-definitions.sh index 2ae2f59ae5..609b7b4e77 100644 --- a/thirdparty/build-definitions.sh +++ b/thirdparty/build-definitions.sh @@ -919,3 +919,32 @@ build_bison() { make -j$PARALLEL $EXTRA_MAKEFLAGS install popd } + +build_yaml() { + YAML_SHARED_BDIR=$TP_BUILD_DIR/$YAML_NAME.shared$MODE_SUFFIX + YAML_STATIC_BDIR=$TP_BUILD_DIR/$YAML_NAME.static$MODE_SUFFIX + for SHARED in ON OFF; do + if [ $SHARED = "ON" ]; then + YAML_BDIR=$YAML_SHARED_BDIR + else + YAML_BDIR=$YAML_STATIC_BDIR + fi + mkdir -p $YAML_BDIR + pushd $YAML_BDIR + rm -rf CMakeCache.txt CMakeFiles/ + CFLAGS="$EXTRA_CFLAGS -fPIC" \ + CXXFLAGS="$EXTRA_CXXFLAGS -fPIC" \ + LDFLAGS="$EXTRA_LDFLAGS" \ + LIBS="$EXTRA_LIBS" \ + cmake \ + -DCMAKE_BUILD_TYPE=Release \ + -DYAML_CPP_BUILD_TESTS=OFF \ + -DYAML_CPP_BUILD_TOOLS=OFF \ + -DBUILD_SHARED_LIBS=$SHARED \ + -DCMAKE_INSTALL_PREFIX=$PREFIX \ + $EXTRA_CMAKE_FLAGS \ + $YAML_SOURCE + ${NINJA:-make} -j$PARALLEL $EXTRA_MAKEFLAGS install + popd + done +} diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 0fe31ca991..0c2d451896 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -100,6 +100,7 @@ else "hadoop") F_HADOOP=1 ;; "hive") F_HIVE=1 ;; "sentry") F_SENTRY=1 ;; + "yaml") F_YAML=1 ;; *) echo "Unknown module: $arg"; exit 1 ;; esac done @@ -371,6 +372,10 @@ if [ -n "$F_UNINSTRUMENTED" -o -n "$F_THRIFT" ]; then build_thrift fi +if [ -n "$F_UNINSTRUMENTED" -o -n "$F_YAML" ]; then + build_yaml +fi + restore_env # If we're on macOS best to exit here, otherwise single dependency builds will try to @@ -551,6 +556,10 @@ if [ -n "$F_TSAN" -o -n "$F_THRIFT" ]; then build_thrift fi +if [ -n "$F_TSAN" -o -n "$F_YAML" ]; then + build_yaml +fi + restore_env finish diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh index a6fa2b948e..7eeccdfd04 100755 --- a/thirdparty/download-thirdparty.sh +++ b/thirdparty/download-thirdparty.sh @@ -430,5 +430,11 @@ fetch_and_patch \ $SENTRY_SOURCE \ $SENTRY_PATCHLEVEL +YAML_PATCHLEVEL=0 +fetch_and_patch \ + $YAML_NAME.tar.gz \ + $YAML_SOURCE \ + $YAML_PATCHLEVEL + echo "---------------" echo "Thirdparty dependencies downloaded successfully" diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh index 01eea406dd..df92495aa3 100644 --- a/thirdparty/vars.sh +++ b/thirdparty/vars.sh @@ -234,3 +234,8 @@ HADOOP_SOURCE=$TP_SOURCE_DIR/$HADOOP_NAME SENTRY_VERSION=505b42e81a9d85c4ebe8db3f48ad7a6e824a5db5 SENTRY_NAME=sentry-$SENTRY_VERSION SENTRY_SOURCE=$TP_SOURCE_DIR/$SENTRY_NAME + +YAML_VERSION=0.6.2 +YAML_NAME=yaml-cpp-yaml-cpp-$YAML_VERSION +YAML_SOURCE=$TP_SOURCE_DIR/$YAML_NAME + From 96d6cc6333083fdff0760a1c1c8571ab9b27ce78 Mon Sep 17 00:00:00 2001 From: Alexey Serbin Date: Tue, 14 May 2019 21:13:54 -0700 Subject: [PATCH 13/15] [master_sentry-itest] disable one test scenario This is to fix a flakiness in SentryAuthzProviderCacheITest.CreateTables scenario: it 100% passes with the HEAD versions of Hive and Sentry but 100% fails with Hive 2.1.1 and Sentry 2.1.0. The scenario doesn't exhibit any other signs of flakiness. Until the root cause is clear, let's keep this test scenario disabled. Change-Id: Ic397990e2fcda37ee6224a71a2ffe31aaa6414c6 Reviewed-on: http://gerrit.cloudera.org:8080/13340 Reviewed-by: Adar Dembo Reviewed-by: Andrew Wong Tested-by: Kudu Jenkins --- src/kudu/integration-tests/master_sentry-itest.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/kudu/integration-tests/master_sentry-itest.cc b/src/kudu/integration-tests/master_sentry-itest.cc index c42e91a71d..ddaf02d808 100644 --- a/src/kudu/integration-tests/master_sentry-itest.cc +++ b/src/kudu/integration-tests/master_sentry-itest.cc @@ -880,7 +880,10 @@ TEST_F(SentryAuthzProviderCacheITest, ResetAuthzCacheConcurrentAlterTable) { // misleading because CreateTable() reports a success to the client. The created // table indeed exists and is fully functional otherwise, but the corresponding // owner privilege record is absent in Sentry. -TEST_F(SentryAuthzProviderCacheITest, CreateTables) { +// +// TODO(aserbin): clarify why it works with HEAD of the master branches +// of Sentry/Hive but fails with Sentry 2.1.0 and Hive 2.1.1. +TEST_F(SentryAuthzProviderCacheITest, DISABLED_CreateTables) { constexpr const char* const kGhostTables[] = { "t10", "t11" }; // Grant CREATE TABLE and METADATA privileges on the database. From 64365d3fe116b8217f93731b1443cb85d67c8918 Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Mon, 13 May 2019 09:40:25 -0500 Subject: [PATCH 14/15] [backup] Factor out kudu-backup-tools module This patch refactors the kudu-backup module to make a kudu-backup-tools module. This module will be used in a future patch to create a more lightweight CLI tool that shades in all of its dependencies. While refactoring, I also broke out SessionIO into BackupIO and BackupUtils. Change-Id: I6ef2c21fbc31b11b20f0588b6de3cd4998b67443 Reviewed-on: http://gerrit.cloudera.org:8080/13320 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley Reviewed-by: Mike Percy --- java/gradle/shadow.gradle | 20 ++++-- java/kudu-backup-tools/build.gradle | 66 +++++++++++++++++++ .../src/main/protobuf/backup.proto | 0 .../org/apache/kudu/backup/BackupGraph.scala | 0 .../org/apache/kudu/backup/BackupIO.scala} | 43 ++---------- .../apache/kudu/backup/TableMetadata.scala | 12 ++-- .../src/test/resources/log4j.properties | 23 +++++++ .../apache/kudu/backup/TestBackupGraph.scala | 40 +++++++---- java/kudu-backup/build.gradle | 20 ++---- .../org/apache/kudu/backup/BackupUtils.scala | 53 +++++++++++++++ .../org/apache/kudu/backup/KuduBackup.scala | 7 +- .../org/apache/kudu/backup/KuduRestore.scala | 4 +- .../org/apache/kudu/backup/Options.scala | 12 +--- .../apache/kudu/backup/TestKuduBackup.scala | 4 +- java/settings.gradle | 1 + 15 files changed, 211 insertions(+), 94 deletions(-) create mode 100644 java/kudu-backup-tools/build.gradle rename java/{kudu-backup => kudu-backup-tools}/src/main/protobuf/backup.proto (100%) rename java/{kudu-backup => kudu-backup-tools}/src/main/scala/org/apache/kudu/backup/BackupGraph.scala (100%) rename java/{kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala => kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala} (84%) rename java/{kudu-backup => kudu-backup-tools}/src/main/scala/org/apache/kudu/backup/TableMetadata.scala (98%) create mode 100644 java/kudu-backup-tools/src/test/resources/log4j.properties rename java/{kudu-backup => kudu-backup-tools}/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala (82%) create mode 100644 java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala diff --git a/java/gradle/shadow.gradle b/java/gradle/shadow.gradle index f89b98d56d..63e7c1152a 100644 --- a/java/gradle/shadow.gradle +++ b/java/gradle/shadow.gradle @@ -41,12 +41,20 @@ configurations.archives.artifacts.removeAll { it instanceof ArchivePublishArtifact && it.archiveTask == jar } -shadowJar { - dependencies { - // Our shaded jars always try to pull in the slf4j api from - // kudu-client, though we never want it included. Excluding it - // here prevents the need to always list it. - exclude dependency(libs.slf4jApi) +// Define an overridable property to indicate tool jars that should +// include all of their dependencies. +// We use this below to ensure slf4j is included. +shadow.ext { + isToolJar = false +} +if (!shadow.isToolJar) { + shadowJar { + dependencies { + // Our shaded library jars always try to pull in the slf4j api from + // kudu-client, though we never want it included. Excluding it + // here prevents the need to always list it. + exclude dependency(libs.slf4jApi) + } } } diff --git a/java/kudu-backup-tools/build.gradle b/java/kudu-backup-tools/build.gradle new file mode 100644 index 0000000000..850060c52d --- /dev/null +++ b/java/kudu-backup-tools/build.gradle @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +apply plugin: "scala" +apply from: "$rootDir/gradle/protobuf.gradle" +apply from: "$rootDir/gradle/shadow.gradle" + +// Mark this as a tool jar so shadow doesn't exclude any dependencies. +shadow { + isToolJar = true +} + +dependencies { + compile project(path: ":kudu-client", configuration: "shadow") + compile libs.protobufJava + compile (libs.protobufJavaUtil) { + // Make sure wrong Guava version is not pulled in. + exclude group: "com.google.guava", module: "guava" + } + compile libs.hadoopCommon + compile (libs.scopt) { + // Make sure wrong Scala version is not pulled in. + exclude group: "org.scala-lang", module: "scala-library" + } + compile libs.scalaLibrary + compile libs.slf4jApi + + optional libs.yetusAnnotations + + testCompile project(path: ":kudu-test-utils", configuration: "shadow") + testCompile libs.junit + testCompile libs.log4j + testCompile libs.scalatest + testCompile libs.slf4jLog4j12 +} + +// Add protobuf files to the proto source set. +sourceSets { + main { + proto { + srcDir "src/main/protobuf" + } + } +} + +// kudu-backup-tools has no public Javadoc. +javadoc { + enabled = false +} + +// Skip publishing kudu-backup-tools until it's ready to be supported long-term. +uploadArchives.enabled = false \ No newline at end of file diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup-tools/src/main/protobuf/backup.proto similarity index 100% rename from java/kudu-backup/src/main/protobuf/backup.proto rename to java/kudu-backup-tools/src/main/protobuf/backup.proto diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupGraph.scala similarity index 100% rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupGraph.scala diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala similarity index 84% rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala index 82578ad82d..43a359d34a 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala +++ b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala @@ -26,15 +26,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.LocatedFileStatus import org.apache.hadoop.fs.Path -import org.apache.kudu.Schema import org.apache.kudu.backup.Backup.TableMetadataPB -import org.apache.kudu.backup.SessionIO._ +import org.apache.kudu.backup.BackupIO._ import org.apache.kudu.client.KuduTable -import org.apache.kudu.spark.kudu.SparkUtil -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.ByteType -import org.apache.spark.sql.types.StructField -import org.apache.spark.sql.types.StructType import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability import org.slf4j.Logger @@ -65,46 +59,19 @@ import scala.collection.mutable */ @InterfaceAudience.Private @InterfaceStability.Unstable -class SessionIO(val session: SparkSession, options: CommonOptions) { +class BackupIO(val conf: Configuration, rootPathStr: String) { val log: Logger = LoggerFactory.getLogger(getClass) - val conf: Configuration = session.sparkContext.hadoopConfiguration - val rootPath: Path = new Path(options.rootPath) + val rootPath: Path = new Path(rootPathStr) val fs: FileSystem = rootPath.getFileSystem(conf) - /** - * Returns the Spark schema for backup data based on the Kudu Schema. - * Additionally handles adding the RowAction column for incremental backup/restore. - */ - def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = { - var fields = SparkUtil.sparkSchema(schema).fields - if (includeRowAction) { - val changeTypeField = generateRowActionColumn(schema) - fields = fields ++ Seq(changeTypeField) - } - StructType(fields) - } - - /** - * Generates a RowAction column and handles column name collisions. - * The column name can vary because it's accessed positionally. - */ - private def generateRowActionColumn(schema: Schema): StructField = { - var columnName = "backup_row_action" - // If the column already exists and we need to pick an alternate column name. - while (schema.hasColumn(columnName)) { - columnName += "_" - } - StructField(columnName, ByteType) - } - /** * Return the path to the table directory. */ def tablePath(table: KuduTable): Path = { val tableName = URLEncoder.encode(table.getName, "UTF-8") val dirName = s"${table.getTableId}-$tableName" - new Path(options.rootPath, dirName) + new Path(rootPath, dirName) } /** @@ -269,7 +236,7 @@ class SessionIO(val session: SparkSession, options: CommonOptions) { } } -object SessionIO { +object BackupIO { // The name of the metadata file within a backup directory. val MetadataFileName = ".kudu-metadata.json" } diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala similarity index 98% rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala index cfd3933c7c..6fc49d3e27 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala +++ b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala @@ -49,7 +49,11 @@ object TableMetadata { val MetadataFileName = ".kudu-metadata.json" val MetadataVersion = 1 - def getTableMetadata(table: KuduTable, options: BackupOptions): TableMetadataPB = { + def getTableMetadata( + table: KuduTable, + fromMs: Long, + toMs: Long, + format: String): TableMetadataPB = { val columnIds = new util.HashMap[String, Integer]() val columns = table.getSchema.getColumns.asScala.map { col => columnIds.put(col.getName, table.getSchema.getColumnId(col.getName)) @@ -87,9 +91,9 @@ object TableMetadata { TableMetadataPB .newBuilder() .setVersion(MetadataVersion) - .setFromMs(options.fromMs) - .setToMs(options.toMs) - .setDataFormat(options.format) + .setFromMs(fromMs) + .setToMs(toMs) + .setDataFormat(format) .setTableName(table.getName) .setTableId(table.getTableId) .addAllColumns(columns.asJava) diff --git a/java/kudu-backup-tools/src/test/resources/log4j.properties b/java/kudu-backup-tools/src/test/resources/log4j.properties new file mode 100644 index 0000000000..129752c9a3 --- /dev/null +++ b/java/kudu-backup-tools/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger = INFO, out +log4j.appender.out = org.apache.log4j.ConsoleAppender +log4j.appender.out.layout = org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n + +log4j.logger.org.apache.kudu = DEBUG diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala b/java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala similarity index 82% rename from java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala rename to java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala index 480d2ebf90..314a06324b 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala +++ b/java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala @@ -16,19 +16,40 @@ // under the License. package org.apache.kudu.backup +import com.google.common.collect.ImmutableList +import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.client.KuduTable -import org.apache.kudu.spark.kudu._ +import org.apache.kudu.test.ClientTestUtil.getBasicSchema +import org.apache.kudu.test.KuduTestHarness import org.junit.Assert._ +import org.junit.Before +import org.junit.Rule import org.junit.Test import org.slf4j.Logger import org.slf4j.LoggerFactory -class TestBackupGraph extends KuduTestSuite { +import scala.annotation.meta.getter + +class TestBackupGraph { val log: Logger = LoggerFactory.getLogger(getClass) + var tableName: String = "TestBackupGraph" + var table: KuduTable = _ + + @(Rule @getter) + val harness = new KuduTestHarness + + @Before + def setUp(): Unit = { + // Create the test table. + val builder = new CreateTableOptions().setNumReplicas(3) + builder.setRangePartitionColumns(ImmutableList.of("key")) + table = harness.getClient.createTable(tableName, getBasicSchema, builder) + } + @Test def testSimpleBackupGraph() { - val graph = new BackupGraph(table.getName) + val graph = new BackupGraph(table.getTableId) val full = createBackupVertex(table, 0, 1) graph.addBackup(full) @@ -52,7 +73,7 @@ class TestBackupGraph extends KuduTestSuite { @Test def testForkingBackupGraph() { - val graph = new BackupGraph(table.getName) + val graph = new BackupGraph(table.getTableId) val full = createBackupVertex(table, 0, 1) graph.addBackup(full) // Duplicate fromMs of 1 creates a fork in the graph. @@ -81,7 +102,7 @@ class TestBackupGraph extends KuduTestSuite { @Test def testMultiFullBackupGraph() { - val graph = new BackupGraph(table.getName) + val graph = new BackupGraph(table.getTableId) val full1 = createBackupVertex(table, 0, 1) graph.addBackup(full1) val inc1 = createBackupVertex(table, 1, 2) @@ -131,14 +152,7 @@ class TestBackupGraph extends KuduTestSuite { } private def createBackupVertex(table: KuduTable, fromMs: Long, toMs: Long): BackupNode = { - val options = new BackupOptions( - tables = Seq(table.getName), - rootPath = "foo/path", - "fooAddresses", - fromMs = fromMs, - toMs = toMs - ) - val metadata = TableMetadata.getTableMetadata(table, options) + val metadata = TableMetadata.getTableMetadata(table, fromMs, toMs, "parquet") BackupNode(null, metadata) } } diff --git a/java/kudu-backup/build.gradle b/java/kudu-backup/build.gradle index 53b59af1b4..514c29d3f1 100644 --- a/java/kudu-backup/build.gradle +++ b/java/kudu-backup/build.gradle @@ -16,17 +16,16 @@ // under the License. apply plugin: "scala" -apply from: "$rootDir/gradle/protobuf.gradle" apply from: "$rootDir/gradle/shadow.gradle" dependencies { + // Note: We don't use the shaded version, so we can control the dependencies. + compile(project(path: ":kudu-backup-tools")) { + // Ensure we use the hadoop-client provided by Spark to avoid any compatibility issues. + exclude group: "org.apache.hadoop", module: "hadoop-common" + } compile project(path: ":kudu-client", configuration: "shadow") compile project(path: ":kudu-spark", configuration: "shadow") - compile libs.protobufJava - compile (libs.protobufJavaUtil) { - // Make sure wrong Guava version is not pulled in. - exclude group: "com.google.guava", module: "guava" - } compile (libs.scopt) { // Make sure wrong Scala version is not pulled in. exclude group: "org.scala-lang", module: "scala-library" @@ -48,15 +47,6 @@ dependencies { testCompile libs.slf4jLog4j12 } -// Add protobuf files to the proto source set. -sourceSets { - main { - proto { - srcDir "src/main/protobuf" - } - } -} - // Adjust the artifact name to match the maven build. archivesBaseName = "kudu-backup${versions.sparkBase}_${versions.scalaBase}" diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala new file mode 100644 index 0000000000..4b1def6e6d --- /dev/null +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.kudu.backup + +import org.apache.kudu.Schema +import org.apache.kudu.spark.kudu.SparkUtil +import org.apache.spark.sql.types.ByteType +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType + +object BackupUtils { + + /** + * Returns the Spark schema for backup data based on the Kudu Schema. + * Additionally handles adding the RowAction column for incremental backup/restore. + */ + def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = { + var fields = SparkUtil.sparkSchema(schema).fields + if (includeRowAction) { + val changeTypeField = generateRowActionColumn(schema) + fields = fields ++ Seq(changeTypeField) + } + StructType(fields) + } + + /** + * Generates a RowAction column and handles column name collisions. + * The column name can vary because it's accessed positionally. + */ + private def generateRowActionColumn(schema: Schema): StructField = { + var columnName = "backup_row_action" + // If the column already exists and we need to pick an alternate column name. + while (schema.hasColumn(columnName)) { + columnName += "_" + } + StructField(columnName, ByteType) + } + +} diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala index 2b0d84baf2..180e7c82a2 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala @@ -41,7 +41,7 @@ object KuduBackup { options.kuduMasterAddresses, session.sparkContext ) - val io = new SessionIO(session, options) + val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath) // Read the required backup metadata. val backupGraphs = @@ -97,7 +97,7 @@ object KuduBackup { val rdd = new KuduBackupRDD(table, tableOptions, incremental, context, session.sparkContext) val df = session.sqlContext - .createDataFrame(rdd, io.dataSchema(table.getSchema, incremental)) + .createDataFrame(rdd, BackupUtils.dataSchema(table.getSchema, incremental)) // Write the data to the backup path. // The backup path contains the timestampMs and should not already exist. @@ -108,7 +108,8 @@ object KuduBackup { // Generate and output the new metadata for this table. // The existence of metadata indicates this backup was successful. - val tableMetadata = TableMetadata.getTableMetadata(table, tableOptions) + val tableMetadata = TableMetadata + .getTableMetadata(table, tableOptions.fromMs, tableOptions.toMs, tableOptions.format) io.writeTableMetadata(tableMetadata, metadataPath) } } diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala index 7f1e515d84..1a5886f7da 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala @@ -47,7 +47,7 @@ object KuduRestore { options.kuduMasterAddresses, session.sparkContext ) - val io = new SessionIO(session, options) + val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath) // Read the required backup metadata. val backupGraphs = io.readBackupGraphsByTableName(options.tables, options.timestampMs) @@ -80,7 +80,7 @@ object KuduRestore { createTableRangePartitionByRangePartition(restoreName, lastMetadata, context) } } - val backupSchema = io.dataSchema(TableMetadata.getKuduSchema(metadata)) + val backupSchema = BackupUtils.dataSchema(TableMetadata.getKuduSchema(metadata)) val rowActionCol = backupSchema.fields.last.name val table = context.syncClient.openTable(restoreName) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala index df9eaeebc6..8bcbef4732 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala @@ -23,14 +23,6 @@ import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability import scopt.OptionParser -@InterfaceAudience.Private -@InterfaceStability.Unstable -trait CommonOptions { - val tables: Seq[String] - val rootPath: String - val kuduMasterAddresses: String -} - @InterfaceAudience.Private @InterfaceStability.Unstable case class BackupOptions( @@ -46,7 +38,6 @@ case class BackupOptions( scanLeaderOnly: Boolean = BackupOptions.DefaultScanLeaderOnly, scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching, keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs) - extends CommonOptions object BackupOptions { val DefaultForceFull: Boolean = false @@ -166,8 +157,7 @@ case class RestoreOptions( kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName, tableSuffix: String = "", createTables: Boolean = RestoreOptions.DefaultCreateTables, - timestampMs: Long = System.currentTimeMillis() -) extends CommonOptions + timestampMs: Long = System.currentTimeMillis()) object RestoreOptions { val DefaultCreateTables: Boolean = true diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index 5c769ce469..8a066ac04a 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -572,7 +572,7 @@ class TestKuduBackup extends KuduTestSuite { options: BackupOptions, expectedRowCount: Long, expectIncremental: Boolean): Unit = { - val io = new SessionIO(ss, options) + val io = new BackupIO(ss.sparkContext.hadoopConfiguration, options.rootPath) val tableName = options.tables.head val table = harness.getClient.openTable(tableName) val backupPath = io.backupPath(table, options.toMs) @@ -587,7 +587,7 @@ class TestKuduBackup extends KuduTestSuite { } // Verify the output data. - val schema = io.dataSchema(table.getSchema, expectIncremental) + val schema = BackupUtils.dataSchema(table.getSchema, expectIncremental) val df = ss.sqlContext.read .format(metadata.getDataFormat) .schema(schema) diff --git a/java/settings.gradle b/java/settings.gradle index 96ca03fcdf..145c3c8f0c 100644 --- a/java/settings.gradle +++ b/java/settings.gradle @@ -20,6 +20,7 @@ rootProject.name = "kudu-parent" include "kudu-backup" +include "kudu-backup-tools" include "kudu-client" include "kudu-client-tools" include "kudu-flume-sink" From 2d16b50995d869a90f077c8450f6cd7a07647bbd Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Mon, 25 Mar 2019 23:35:01 -0400 Subject: [PATCH 15/15] [maintenance] Support priorities for tables in MM compaction This commit add a feature to specify different priorities for table compaction. In a Kudu cluster with thousands of tables, it's hard for a specified tablet's maintenance OPs to be launched when their scores are not the highest, even if the table the tablet belongs to is high priority for Kudu users. This patch allow administators to specify different priorities for tables by gflags, these maintenance OPs of these high priority tables have greater chance to be launched. Change-Id: I3ea3b73505157678a8fb551656123b64e6bfb304 --- src/kudu/tablet/tablet.h | 1 + src/kudu/tablet/tablet_metadata.h | 2 +- src/kudu/tablet/tablet_mm_ops-test.cc | 3 +- src/kudu/tablet/tablet_mm_ops.cc | 14 +- src/kudu/tablet/tablet_mm_ops.h | 42 ++--- src/kudu/tablet/tablet_replica.cc | 2 +- src/kudu/tablet/tablet_replica.h | 7 +- src/kudu/tablet/tablet_replica_mm_ops.cc | 22 ++- src/kudu/tablet/tablet_replica_mm_ops.h | 66 ++++---- src/kudu/util/maintenance_manager-test.cc | 175 +++++++++++++++++--- src/kudu/util/maintenance_manager.cc | 186 +++++++++++++++------- src/kudu/util/maintenance_manager.h | 22 ++- src/kudu/util/maintenance_manager.proto | 11 +- 13 files changed, 402 insertions(+), 151 deletions(-) diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 05c7c9b9b4..c4a5691046 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -417,6 +417,7 @@ class Tablet { // This method is thread-safe. void CancelMaintenanceOps(); + const std::string& table_id() const { return metadata_->table_id(); } const std::string& tablet_id() const { return metadata_->tablet_id(); } // Return the metrics for this tablet. diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h index 62e754532e..adf0d4cbb7 100644 --- a/src/kudu/tablet/tablet_metadata.h +++ b/src/kudu/tablet/tablet_metadata.h @@ -119,7 +119,7 @@ class TabletMetadata : public RefCountedThreadSafe { return partition_; } - std::string table_id() const { + const std::string& table_id() const { DCHECK_NE(state_, kNotLoadedYet); return table_id_; } diff --git a/src/kudu/tablet/tablet_mm_ops-test.cc b/src/kudu/tablet/tablet_mm_ops-test.cc index a2f55728cd..3a13386a9d 100644 --- a/src/kudu/tablet/tablet_mm_ops-test.cc +++ b/src/kudu/tablet/tablet_mm_ops-test.cc @@ -62,7 +62,7 @@ class KuduTabletMmOpsTest : public TabletTestBase> { void StatsShouldChange(MaintenanceOp* op) { SleepFor(MonoDelta::FromMilliseconds(1)); op->UpdateStats(&stats_); - ASSERT_TRUE(next_time_ < stats_.last_modified()); + ASSERT_LT(next_time_, stats_.last_modified()); next_time_ = stats_.last_modified(); } @@ -70,7 +70,6 @@ class KuduTabletMmOpsTest : public TabletTestBase> { SleepFor(MonoDelta::FromMilliseconds(1)); op->UpdateStats(&stats_); ASSERT_EQ(next_time_, stats_.last_modified()); - next_time_ = stats_.last_modified(); } void TestFirstCall(MaintenanceOp* op) { diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc index c71bc6e527..c54c241966 100644 --- a/src/kudu/tablet/tablet_mm_ops.cc +++ b/src/kudu/tablet/tablet_mm_ops.cc @@ -87,6 +87,10 @@ string TabletOpBase::LogPrefix() const { return tablet_->LogPrefix(); } +const std::string& TabletOpBase::table_id() const { + return tablet_->table_id(); +} + //////////////////////////////////////////////////////////// // CompactRowSetsOp //////////////////////////////////////////////////////////// @@ -262,12 +266,12 @@ void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) { // cached stats. TabletMetrics* metrics = tablet_->metrics(); if (metrics) { - int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); - int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); - int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); - int64_t new_num_rs_minor_delta_compacted = + uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); + uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); + uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); + uint64_t new_num_rs_minor_delta_compacted = metrics->delta_minor_compact_rs_duration->TotalCount(); - int64_t new_num_rs_major_delta_compacted = + uint64_t new_num_rs_major_delta_compacted = metrics->delta_major_compact_rs_duration->TotalCount(); if (prev_stats_.valid() && new_num_mrs_flushed == last_num_mrs_flushed_ && diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h index 8fc865e0f1..1e117f1bd1 100644 --- a/src/kudu/tablet/tablet_mm_ops.h +++ b/src/kudu/tablet/tablet_mm_ops.h @@ -43,6 +43,8 @@ class TabletOpBase : public MaintenanceOp { std::string LogPrefix() const; protected: + const std::string& table_id() const OVERRIDE; + Tablet* const tablet_; }; @@ -57,15 +59,15 @@ class CompactRowSetsOp : public TabletOpBase { public: explicit CompactRowSetsOp(Tablet* tablet); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: mutable simple_spinlock lock_; @@ -83,15 +85,15 @@ class MinorDeltaCompactionOp : public TabletOpBase { public: explicit MinorDeltaCompactionOp(Tablet* tablet); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: mutable simple_spinlock lock_; @@ -109,15 +111,15 @@ class MajorDeltaCompactionOp : public TabletOpBase { public: explicit MajorDeltaCompactionOp(Tablet* tablet); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: mutable simple_spinlock lock_; @@ -138,19 +140,19 @@ class UndoDeltaBlockGCOp : public TabletOpBase { // Estimates the number of bytes that may potentially be in ancient delta // undo blocks. Over time, as Perform() is invoked, this estimate gets more // accurate. - void UpdateStats(MaintenanceOpStats* stats) override; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - bool Prepare() override; + bool Prepare() OVERRIDE; // Deletes ancient history data from disk. This also initializes undo delta // blocks greedily (in a budgeted manner controlled by the // --undo_delta_block_gc_init_budget_millis gflag) that makes the estimate // performed in UpdateStats() more accurate. - void Perform() override; + void Perform() OVERRIDE; - scoped_refptr DurationHistogram() const override; + scoped_refptr DurationHistogram() const OVERRIDE; - scoped_refptr > RunningGauge() const override; + scoped_refptr > RunningGauge() const OVERRIDE; private: std::string LogPrefix() const; diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index c7b231bc94..a284215bc8 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -38,6 +38,7 @@ #include "kudu/consensus/log_anchor_registry.h" #include "kudu/consensus/opid.pb.h" #include "kudu/consensus/raft_consensus.h" +#include "kudu/fs/data_dirs.h" #include "kudu/gutil/basictypes.h" #include "kudu/gutil/bind.h" #include "kudu/gutil/bind_helpers.h" @@ -124,7 +125,6 @@ TabletReplica::TabletReplica( Callback mark_dirty_clbk) : meta_(DCHECK_NOTNULL(std::move(meta))), cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))), - tablet_id_(meta_->tablet_id()), local_peer_pb_(std::move(local_peer_pb)), log_anchor_registry_(new LogAnchorRegistry()), apply_pool_(apply_pool), diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h index bf9d63dd0e..50fab2dae8 100644 --- a/src/kudu/tablet/tablet_replica.h +++ b/src/kudu/tablet/tablet_replica.h @@ -254,10 +254,8 @@ class TabletReplica : public RefCountedThreadSafe, return log_anchor_registry_; } - // Returns the tablet_id of the tablet managed by this TabletReplica. - // Returns the correct tablet_id even if the underlying tablet is not available - // yet. - const std::string& tablet_id() const { return tablet_id_; } + const std::string& table_id() const { return meta_->table_id(); } + const std::string& tablet_id() const { return meta_->tablet_id(); } // Convenience method to return the permanent_uuid of this peer. std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); } @@ -322,7 +320,6 @@ class TabletReplica : public RefCountedThreadSafe, const scoped_refptr meta_; const scoped_refptr cmeta_manager_; - const std::string tablet_id_; const consensus::RaftPeerPB local_peer_pb_; scoped_refptr log_anchor_registry_; // Assigned in tablet_replica-test diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc index a7267debc2..a848bcb838 100644 --- a/src/kudu/tablet/tablet_replica_mm_ops.cc +++ b/src/kudu/tablet/tablet_replica_mm_ops.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -121,6 +122,20 @@ void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats } } +// +// TabletReplicaOpBase. +// +TabletReplicaOpBase::TabletReplicaOpBase(std::string name, + IOUsage io_usage, + TabletReplica* tablet_replica) + : MaintenanceOp(std::move(name), io_usage), + tablet_replica_(tablet_replica) { +} + +const std::string& TabletReplicaOpBase::table_id() const { + return tablet_replica_->table_id(); +} + // // FlushMRSOp. // @@ -260,9 +275,10 @@ scoped_refptr > FlushDeltaMemStoresOp::RunningGauge() cons // LogGCOp::LogGCOp(TabletReplica* tablet_replica) - : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_replica->tablet()->tablet_id().c_str()), - MaintenanceOp::LOW_IO_USAGE), - tablet_replica_(tablet_replica), + : TabletReplicaOpBase(StringPrintf("LogGCOp(%s)", + tablet_replica->tablet()->tablet_id().c_str()), + MaintenanceOp::LOW_IO_USAGE, + tablet_replica), log_gc_duration_(METRIC_log_gc_duration.Instantiate( tablet_replica->tablet()->GetMetricEntity())), log_gc_running_(METRIC_log_gc_running.Instantiate( diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h b/src/kudu/tablet/tablet_replica_mm_ops.h index 2404b489bf..72fdb39442 100644 --- a/src/kudu/tablet/tablet_replica_mm_ops.h +++ b/src/kudu/tablet/tablet_replica_mm_ops.h @@ -47,86 +47,92 @@ class FlushOpPerfImprovementPolicy { FlushOpPerfImprovementPolicy() {} }; +class TabletReplicaOpBase : public MaintenanceOp { + public: + explicit TabletReplicaOpBase(std::string name, IOUsage io_usage, TabletReplica* tablet_replica); + + protected: + const std::string& table_id() const OVERRIDE; + + TabletReplica *const tablet_replica_; +}; + // Maintenance op for MRS flush. Only one can happen at a time. -class FlushMRSOp : public MaintenanceOp { +class FlushMRSOp : public TabletReplicaOpBase { public: explicit FlushMRSOp(TabletReplica* tablet_replica) - : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_replica->tablet()->tablet_id().c_str()), - MaintenanceOp::HIGH_IO_USAGE), - tablet_replica_(tablet_replica) { + : TabletReplicaOpBase(StringPrintf("FlushMRSOp(%s)", + tablet_replica->tablet()->tablet_id().c_str()), + MaintenanceOp::HIGH_IO_USAGE, + tablet_replica) { time_since_flush_.start(); } - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: // Lock protecting time_since_flush_. mutable simple_spinlock lock_; Stopwatch time_since_flush_; - - TabletReplica *const tablet_replica_; }; // Maintenance op for DMS flush. // Reports stats for all the DMS this tablet contains but only flushes one in Perform(). -class FlushDeltaMemStoresOp : public MaintenanceOp { +class FlushDeltaMemStoresOp : public TabletReplicaOpBase { public: explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica) - : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)", - tablet_replica->tablet()->tablet_id().c_str()), - MaintenanceOp::HIGH_IO_USAGE), - tablet_replica_(tablet_replica) { + : TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)", + tablet_replica->tablet()->tablet_id().c_str()), + MaintenanceOp::HIGH_IO_USAGE, + tablet_replica) { time_since_flush_.start(); } - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE { + bool Prepare() OVERRIDE { return true; } - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: // Lock protecting time_since_flush_ mutable simple_spinlock lock_; Stopwatch time_since_flush_; - - TabletReplica *const tablet_replica_; }; // Maintenance task that runs log GC. Reports log retention that represents the amount of data // that can be GC'd. // // Only one LogGC op can run at a time. -class LogGCOp : public MaintenanceOp { +class LogGCOp : public TabletReplicaOpBase { public: explicit LogGCOp(TabletReplica* tablet_replica); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: - TabletReplica *const tablet_replica_; scoped_refptr log_gc_duration_; scoped_refptr > log_gc_running_; mutable Semaphore sem_; diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc index 6777e06b81..8c88dd74a9 100644 --- a/src/kudu/util/maintenance_manager-test.cc +++ b/src/kudu/util/maintenance_manager-test.cc @@ -15,16 +15,22 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/util/maintenance_manager.h" + +#include + +#include #include #include +#include #include #include #include #include #include -#include #include // IWYU pragma: keep +#include #include #include #include @@ -32,7 +38,6 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/util/maintenance_manager.h" #include "kudu/util/maintenance_manager.pb.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" @@ -42,9 +47,9 @@ #include "kudu/util/test_util.h" #include "kudu/util/thread.h" +using std::list; using std::shared_ptr; using std::string; -using std::vector; using strings::Substitute; METRIC_DEFINE_entity(test); @@ -57,17 +62,29 @@ METRIC_DEFINE_histogram(test, maintenance_op_duration, kudu::MetricUnit::kSeconds, "", 60000000LU, 2); DECLARE_int64(log_target_replay_size_mb); +DECLARE_string(maintenance_manager_table_priorities); +DECLARE_double(maintenance_op_multiplier); +DECLARE_int32(max_priority_range); + namespace kudu { -static const int kHistorySize = 4; +static const int kHistorySize = 6; static const char kFakeUuid[] = "12345"; class MaintenanceManagerTest : public KuduTest { public: - void SetUp() override { + void SetUp() OVERRIDE { + StartManager(2); + } + + void TearDown() OVERRIDE { + StopManager(); + } + + void StartManager(int32_t num_threads) { MaintenanceManager::Options options; - options.num_threads = 2; + options.num_threads = num_threads; options.polling_interval_ms = 1; options.history_size = kHistorySize; manager_.reset(new MaintenanceManager(options, kFakeUuid)); @@ -78,7 +95,7 @@ class MaintenanceManagerTest : public KuduTest { ASSERT_OK(manager_->Start()); } - void TearDown() override { + void StopManager() { manager_->Shutdown(); } @@ -95,7 +112,8 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) { class TestMaintenanceOp : public MaintenanceOp { public: TestMaintenanceOp(const std::string& name, - IOUsage io_usage) + IOUsage io_usage, + std::string table_id = "fake.table_id") : MaintenanceOp(name, io_usage), ram_anchored_(500), logs_retained_bytes_(0), @@ -105,12 +123,13 @@ class TestMaintenanceOp : public MaintenanceOp { maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)), remaining_runs_(1), prepared_runs_(0), - sleep_time_(MonoDelta::FromSeconds(0)) { + sleep_time_(MonoDelta::FromSeconds(0)), + table_id_(std::move(table_id)) { } - virtual ~TestMaintenanceOp() {} + ~TestMaintenanceOp() OVERRIDE = default; - virtual bool Prepare() OVERRIDE { + bool Prepare() OVERRIDE { std::lock_guard guard(lock_); if (remaining_runs_ == 0) { return false; @@ -121,7 +140,7 @@ class TestMaintenanceOp : public MaintenanceOp { return true; } - virtual void Perform() OVERRIDE { + void Perform() OVERRIDE { { std::lock_guard guard(lock_); DLOG(INFO) << "Performing op " << name(); @@ -135,7 +154,7 @@ class TestMaintenanceOp : public MaintenanceOp { SleepFor(sleep_time_); } - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE { + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE { std::lock_guard guard(lock_); stats->set_runnable(remaining_runs_ > 0); stats->set_ram_anchored(ram_anchored_); @@ -168,14 +187,18 @@ class TestMaintenanceOp : public MaintenanceOp { perf_improvement_ = perf_improvement; } - virtual scoped_refptr DurationHistogram() const OVERRIDE { + scoped_refptr DurationHistogram() const OVERRIDE { return maintenance_op_duration_; } - virtual scoped_refptr > RunningGauge() const OVERRIDE { + scoped_refptr > RunningGauge() const OVERRIDE { return maintenance_ops_running_; } + const std::string& table_id() const OVERRIDE { + return table_id_; + } + private: Mutex lock_; @@ -195,6 +218,7 @@ class TestMaintenanceOp : public MaintenanceOp { // The amount of time each op invocation will sleep. MonoDelta sleep_time_; + std::string table_id_; }; // Create an op and wait for it to start running. Unregister it while it is @@ -226,7 +250,7 @@ TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) { // Set the op to run up to 10 times, and each time should sleep for a second. op1.set_remaining_runs(10); - op1.set_sleep_time(MonoDelta::FromSeconds(1)); + op1.set_sleep_time(MonoDelta::FromMilliseconds(10)); manager_->RegisterOp(&op1); // Wait until two instances of the ops start running, since we have two MM threads. @@ -266,7 +290,7 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) { TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) { const int64_t kMB = 1024 * 1024; - manager_->Shutdown(); + StopManager(); TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE); op1.set_ram_anchored(0); @@ -318,7 +342,7 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) { TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE); op.set_perf_improvement(10); op.set_remaining_runs(2); - op.set_sleep_time(MonoDelta::FromSeconds(1)); + op.set_sleep_time(MonoDelta::FromMilliseconds(10)); manager_->RegisterOp(&op); // Check that running instances are added to the maintenance manager's collection, @@ -341,10 +365,11 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) { manager_->GetMaintenanceManagerStatusDump(&status_pb); ASSERT_EQ(status_pb.running_operations_size(), 0); } + // Test adding operations and make sure that the history of recently completed operations // is correct in that it wraps around and doesn't grow. TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) { - for (int i = 0; i < 5; i++) { + for (int i = 0; i < kHistorySize + 1; i++) { string name = Substitute("op$0", i); TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE); op.set_perf_improvement(1); @@ -358,12 +383,120 @@ TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) { MaintenanceManagerStatusPB status_pb; manager_->GetMaintenanceManagerStatusDump(&status_pb); - // The size should be at most the history_size. - ASSERT_GE(kHistorySize, status_pb.completed_operations_size()); + // The size should equal to the current completed OP size, + // and should be at most the kHistorySize. + ASSERT_EQ(std::min(kHistorySize, i + 1), status_pb.completed_operations_size()); // The most recently completed op should always be first, even if we wrap // around. ASSERT_EQ(name, status_pb.completed_operations(0).name()); } } +// Test maintenance OP factors. +// The OPs on different priority levels have different OP score multipliers. +TEST_F(MaintenanceManagerTest, TestOpFactors) { + ASSERT_GE(FLAGS_max_priority_range, 1); + FLAGS_maintenance_manager_table_priorities + = Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4", + -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1); + TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1"); + TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2"); + TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3"); + TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4"); + TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5"); + TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6"); + + manager_->UpdateTablePriorities(); + + ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -FLAGS_max_priority_range), + manager_->PerfImprovement(1, op1.table_id())); + ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -1), + manager_->PerfImprovement(1, op2.table_id())); + ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op3.table_id())); + ASSERT_DOUBLE_EQ(FLAGS_maintenance_op_multiplier, manager_->PerfImprovement(1, op4.table_id())); + ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, FLAGS_max_priority_range), + manager_->PerfImprovement(1, op5.table_id())); + ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op6.table_id())); +} + +// Test priority OP launching. +TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) { + StopManager(); + StartManager(1); + + ASSERT_NE("", gflags::SetCommandLineOption( + "maintenance_manager_table_priorities", + Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4", + -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1).c_str())); + + TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1"); + op1.set_perf_improvement(10); + op1.set_remaining_runs(1); + op1.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2"); + op2.set_perf_improvement(10); + op2.set_remaining_runs(1); + op2.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3"); + op3.set_perf_improvement(10); + op3.set_remaining_runs(1); + op3.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4"); + op4.set_perf_improvement(10); + op4.set_remaining_runs(1); + op4.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5"); + op5.set_perf_improvement(10); + op5.set_remaining_runs(1); + op5.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6"); + op6.set_perf_improvement(12); + op6.set_remaining_runs(1); + op6.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + manager_->RegisterOp(&op1); + manager_->RegisterOp(&op2); + manager_->RegisterOp(&op3); + manager_->RegisterOp(&op4); + manager_->RegisterOp(&op5); + manager_->RegisterOp(&op6); + + ASSERT_EVENTUALLY([&]() { + MaintenanceManagerStatusPB status_pb; + manager_->GetMaintenanceManagerStatusDump(&status_pb); + ASSERT_EQ(status_pb.completed_operations_size(), 6); + }); + + // Wait for instances to complete. + manager_->UnregisterOp(&op1); + manager_->UnregisterOp(&op2); + manager_->UnregisterOp(&op3); + manager_->UnregisterOp(&op4); + manager_->UnregisterOp(&op5); + manager_->UnregisterOp(&op6); + + // Check that running instances are removed from collection after completion. + MaintenanceManagerStatusPB status_pb; + manager_->GetMaintenanceManagerStatusDump(&status_pb); + ASSERT_EQ(status_pb.running_operations_size(), 0); + ASSERT_EQ(status_pb.completed_operations_size(), 6); + // In perf_improvement score ascending order, the latter completed OP will list former. + list ordered_ops({"op1", + "op2", + "op3", + "op4", + "op6", + "op5"}); + ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size()); + for (const auto& instance : status_pb.completed_operations()) { + ASSERT_EQ(ordered_ops.front(), instance.name()); + ordered_ops.pop_front(); + } +} + } // namespace kudu diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc index ea607aa000..3231b31b6e 100644 --- a/src/kudu/util/maintenance_manager.cc +++ b/src/kudu/util/maintenance_manager.cc @@ -17,8 +17,9 @@ #include "kudu/util/maintenance_manager.h" +#include #include -#include +#include #include #include #include @@ -26,6 +27,7 @@ #include #include #include +#include #include #include @@ -33,6 +35,8 @@ #include "kudu/gutil/dynamic_annotations.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/debug/trace_logging.h" @@ -50,6 +54,8 @@ using std::pair; using std::string; +using std::vector; +using strings::Split; using strings::Substitute; DEFINE_int32(maintenance_manager_num_threads, 1, @@ -91,6 +97,29 @@ DEFINE_double(data_gc_prioritization_prob, 0.5, "such as delta compaction."); TAG_FLAG(data_gc_prioritization_prob, experimental); +DEFINE_string(maintenance_manager_table_priorities, "", + "Priorities of tables, semicolon-separated list of table-priority pairs, and each " + "table-priority pair is combined by table id, colon and priority level. Priority " + "level is ranged in [-FLAGS_max_priority_range, FLAGS_max_priority_range]"); +TAG_FLAG(maintenance_manager_table_priorities, advanced); +TAG_FLAG(maintenance_manager_table_priorities, experimental); +TAG_FLAG(maintenance_manager_table_priorities, runtime); + +DEFINE_double(maintenance_op_multiplier, 1.1, + "Multiplier applied on different priority levels, table maintenance OPs on level N " + "has multiplier of FLAGS_maintenance_op_multiplier^N, the last score will be " + "multiplied by this multiplier. Note: this multiplier is only take effect on " + "compaction OPs"); +TAG_FLAG(maintenance_op_multiplier, advanced); +TAG_FLAG(maintenance_op_multiplier, experimental); +TAG_FLAG(maintenance_op_multiplier, runtime); + +DEFINE_int32(max_priority_range, 5, + "Maximal priority range of OPs."); +TAG_FLAG(max_priority_range, advanced); +TAG_FLAG(max_priority_range, experimental); +TAG_FLAG(max_priority_range, runtime); + namespace kudu { MaintenanceOpStats::MaintenanceOpStats() { @@ -107,7 +136,7 @@ void MaintenanceOpStats::Clear() { last_modified_ = MonoTime(); } -MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage) +MaintenanceOp::MaintenanceOp(string name, IOUsage io_usage) : name_(std::move(name)), running_(0), cancel_(false), @@ -129,10 +158,10 @@ MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const { pb.set_thread_id(thread_id); pb.set_name(name); if (duration.Initialized()) { - pb.set_duration_millis(duration.ToMilliseconds()); + pb.set_duration_millis(static_cast(duration.ToMilliseconds())); } MonoDelta delta(MonoTime::Now() - start_mono_time); - pb.set_millis_since_start(delta.ToMilliseconds()); + pb.set_millis_since_start(static_cast(delta.ToMilliseconds())); return pb; } @@ -143,7 +172,7 @@ const MaintenanceManager::Options MaintenanceManager::kDefaultOptions = { }; MaintenanceManager::MaintenanceManager(const Options& options, - std::string server_uuid) + string server_uuid) : server_uuid_(std::move(server_uuid)), num_threads_(options.num_threads <= 0 ? FLAGS_maintenance_manager_num_threads : options.num_threads), @@ -264,8 +293,7 @@ void MaintenanceManager::RunSchedulerThread() { // 1) there are no free threads available to perform a maintenance op. // or 2) we just tried to schedule an op but found nothing to run. // However, if it's time to shut down, we want to do so immediately. - while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || disabled_for_tests()) && - !shutdown_) { + while (CouldNotLaunchNewOp(prev_iter_found_no_work)) { cond_.WaitFor(polling_interval); prev_iter_found_no_work = false; } @@ -274,42 +302,47 @@ void MaintenanceManager::RunSchedulerThread() { return; } - // Find the best op. - auto best_op_and_why = FindBestOp(); - auto* op = best_op_and_why.first; - const auto& note = best_op_and_why.second; + // TODO(yingchun): move it to SetFlag, callback once as a gflags setter handler. + UpdateTablePriorities(); // If we found no work to do, then we should sleep before trying again to schedule. // Otherwise, we can go right into trying to find the next op. - prev_iter_found_no_work = (op == nullptr); - if (!op) { - VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) - << "No maintenance operations look worth doing."; - continue; - } + prev_iter_found_no_work = !FindAndLaunchOp(&guard); + } +} - // Prepare the maintenance operation. - op->running_++; - running_ops_++; - guard.unlock(); - bool ready = op->Prepare(); - guard.lock(); - if (!ready) { - LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name() - << ". Re-running scheduler."; - op->running_--; - running_ops_--; - op->cond_->Signal(); - continue; - } +bool MaintenanceManager::FindAndLaunchOp(std::unique_lock* guard) { + // Find the best op. + auto best_op_and_why = FindBestOp(); + auto* op = best_op_and_why.first; + const auto& note = best_op_and_why.second; + + if (!op) { + VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) + << "No maintenance operations look worth doing."; + return false; + } - LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO) - << Substitute("Scheduling $0: $1", op->name(), note); - // Run the maintenance operation. - Status s = thread_pool_->SubmitFunc(boost::bind( - &MaintenanceManager::LaunchOp, this, op)); - CHECK(s.ok()); + // Prepare the maintenance operation. + IncreaseOpCount(op); + guard->unlock(); + bool ready = op->Prepare(); + guard->lock(); + if (!ready) { + LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name() + << ". Re-running scheduler."; + DecreaseOpCount(op); + op->cond_->Signal(); + return true; } + + LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO) + << Substitute("Scheduling $0: $1", op->name(), note); + // Run the maintenance operation. + Status s = thread_pool_->SubmitFunc(boost::bind(&MaintenanceManager::LaunchOp, this, op)); + CHECK(s.ok()); + + return true; } // Finding the best operation goes through four filters: @@ -335,8 +368,7 @@ void MaintenanceManager::RunSchedulerThread() { pair MaintenanceManager::FindBestOp() { TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp"); - size_t free_threads = num_threads_ - running_ops_; - if (free_threads == 0) { + if (!HasFreeThreads()) { return {nullptr, "no free threads"}; } @@ -367,8 +399,8 @@ pair MaintenanceManager::FindBestOp() { } const auto logs_retained_bytes = stats.logs_retained_bytes(); - if (logs_retained_bytes > low_io_most_logs_retained_bytes && - op->io_usage() == MaintenanceOp::LOW_IO_USAGE) { + if (op->io_usage() == MaintenanceOp::LOW_IO_USAGE && + logs_retained_bytes > low_io_most_logs_retained_bytes) { low_io_most_logs_retained_bytes_op = op; low_io_most_logs_retained_bytes = logs_retained_bytes; VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) @@ -401,7 +433,7 @@ pair MaintenanceManager::FindBestOp() { op->name(), data_retained_bytes); } - const auto perf_improvement = stats.perf_improvement(); + const auto perf_improvement = PerfImprovement(stats.perf_improvement(), op->table_id()); if ((!best_perf_improvement_op) || (perf_improvement > best_perf_improvement)) { best_perf_improvement_op = op; @@ -420,7 +452,7 @@ pair MaintenanceManager::FindBestOp() { double capacity_pct; if (memory_pressure_func_(&capacity_pct)) { if (!most_ram_anchored_op) { - std::string msg = StringPrintf("System under memory pressure " + string msg = StringPrintf("System under memory pressure " "(%.2f%% of limit used). However, there are no ops currently " "runnable which would free memory.", capacity_pct); KLOG_EVERY_N_SECS(WARNING, 5) << msg; @@ -456,6 +488,16 @@ pair MaintenanceManager::FindBestOp() { return {nullptr, "no ops with positive improvement"}; } +double MaintenanceManager::PerfImprovement(double perf_improvement, + const string& table_id) const { + int32_t priority = 0; + if (!FindCopy(table_priorities_, table_id, &priority)) { + return perf_improvement; + } + + return perf_improvement * std::pow(FLAGS_maintenance_op_multiplier, priority); +} + void MaintenanceManager::LaunchOp(MaintenanceOp* op) { int64_t thread_id = Thread::CurrentThreadId(); OpInstance op_instance; @@ -481,9 +523,7 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) { completed_ops_count_++; op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds()); - - running_ops_--; - op->running_--; + DecreaseOpCount(op); op->cond_->Signal(); cond_.Signal(); // Wake up scheduler. }); @@ -507,9 +547,6 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) { void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) { DCHECK(out_pb != nullptr); std::lock_guard guard(lock_); - auto best_op_and_why = FindBestOp(); - auto* best_op = best_op_and_why.first; - for (const auto& val : ops_) { MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations(); MaintenanceOp* op(val.first); @@ -527,10 +564,6 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu op_pb->set_logs_retained_bytes(0); op_pb->set_perf_improvement(0.0); } - - if (best_op == op) { - out_pb->mutable_best_op()->CopyFrom(*op_pb); - } } { @@ -540,8 +573,9 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu } } + // The latest completed op will be dumped at first. for (int n = 1; n <= completed_ops_.size(); n++) { - int i = completed_ops_count_ - n; + int64_t i = completed_ops_count_ - n; if (i < 0) break; const auto& completed_op = completed_ops_[i % completed_ops_.size()]; @@ -551,8 +585,50 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu } } -std::string MaintenanceManager::LogPrefix() const { +string MaintenanceManager::LogPrefix() const { return Substitute("P $0: ", server_uuid_); } +bool MaintenanceManager::HasFreeThreads() { + return num_threads_ - running_ops_ > 0; +} + +bool MaintenanceManager::CouldNotLaunchNewOp(bool prev_iter_found_no_work) { + return (!HasFreeThreads() || prev_iter_found_no_work || disabled_for_tests()) && !shutdown_; +} + +void MaintenanceManager::UpdateTablePriorities() { + string table_priorities_str; + if (!google::GetCommandLineOption("maintenance_manager_table_priorities", + &table_priorities_str)) { + return; + } + + TablePriorities table_priorities; + int32_t value; + for (auto table_priority_str : Split(table_priorities_str, ";", strings::SkipEmpty())) { + vector table_priority = Split(table_priority_str, ":", strings::SkipEmpty()); + if (safe_strto32_base(table_priority[1].c_str(), &value, 10)) { + value = std::max(value, -FLAGS_max_priority_range); + value = std::min(value, FLAGS_max_priority_range); + table_priorities[table_priority[0]] = value; + } else { + LOG(WARNING) << "Some error occured when parse flag maintenance_manager_table_priorities: " + << table_priorities_str; + return; + } + } + table_priorities_.swap(table_priorities); +} + +void MaintenanceManager::IncreaseOpCount(MaintenanceOp *op) { + op->running_++; + running_ops_++; +} + +void MaintenanceManager::DecreaseOpCount(MaintenanceOp *op) { + op->running_--; + running_ops_--; +} + } // namespace kudu diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h index c5dd8ac83c..e3452a2f46 100644 --- a/src/kudu/util/maintenance_manager.h +++ b/src/kudu/util/maintenance_manager.h @@ -230,6 +230,8 @@ class MaintenanceOp { } private: + virtual const std::string& table_id() const = 0; + DISALLOW_COPY_AND_ASSIGN(MaintenanceOp); // The name of the operation. Op names must be unique. @@ -302,8 +304,11 @@ class MaintenanceManager : public std::enable_shared_from_this OpMapTy; + typedef std::unordered_map TablePriorities; // Return true if tests have currently disabled the maintenance // manager by way of changing the gflags at runtime. @@ -311,17 +316,32 @@ class MaintenanceManager : public std::enable_shared_from_this* guard); + // Find the best op, or null if there is nothing we want to run. // // Returns the op, as well as a string explanation of why that op was chosen, // suitable for logging. std::pair FindBestOp(); + double PerfImprovement(double perf_improvement, + const std::string& table_id) const; + void LaunchOp(MaintenanceOp* op); std::string LogPrefix() const; + bool HasFreeThreads(); + + bool CouldNotLaunchNewOp(bool prev_iter_found_no_work); + + void UpdateTablePriorities(); + + void IncreaseOpCount(MaintenanceOp *op); + void DecreaseOpCount(MaintenanceOp *op); + const std::string server_uuid_; + TablePriorities table_priorities_; const int32_t num_threads_; OpMapTy ops_; // Registered operations. Mutex lock_; @@ -330,7 +350,7 @@ class MaintenanceManager : public std::enable_shared_from_this completed_ops_; diff --git a/src/kudu/util/maintenance_manager.proto b/src/kudu/util/maintenance_manager.proto index b6b1203908..77c8cf8fd1 100644 --- a/src/kudu/util/maintenance_manager.proto +++ b/src/kudu/util/maintenance_manager.proto @@ -26,7 +26,7 @@ message MaintenanceManagerStatusPB { // Number of times this operation is currently running. required uint32 running = 2; required bool runnable = 3; - required uint64 ram_anchored_bytes = 4; + required int64 ram_anchored_bytes = 4; required int64 logs_retained_bytes = 5; required double perf_improvement = 6; } @@ -40,15 +40,12 @@ message MaintenanceManagerStatusPB { required int32 millis_since_start = 4; } - // The next operation that would run. - optional MaintenanceOpPB best_op = 1; - // List of all the operations. - repeated MaintenanceOpPB registered_operations = 2; + repeated MaintenanceOpPB registered_operations = 1; // This list isn't in order of anything. Can contain the same operation multiple times. - repeated OpInstancePB running_operations = 3; + repeated OpInstancePB running_operations = 2; // This list isn't in order of anything. Can contain the same operation multiple times. - repeated OpInstancePB completed_operations = 4; + repeated OpInstancePB completed_operations = 3; }