diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6c08e4c2dd..f0548096d8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1222,6 +1222,13 @@ else()
set(KRB5_REALM_OVERRIDE -Wl,-U,krb5_realm_override_loaded krb5_realm_override)
endif()
+## yaml
+find_package(Yaml REQUIRED)
+include_directories(SYSTEM ${YAML_INCLUDE_DIR})
+ADD_THIRDPARTY_LIB(yaml
+ STATIC_LIB "${YAML_STATIC_LIB}"
+ SHARED_LIB "${YAML_SHARED_LIB}")
+
## Boost
# We use a custom cmake module and not cmake's FindBoost.
diff --git a/cmake_modules/FindYaml.cmake b/cmake_modules/FindYaml.cmake
new file mode 100644
index 0000000000..cc73805ced
--- /dev/null
+++ b/cmake_modules/FindYaml.cmake
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_path(YAML_INCLUDE_DIR yaml-cpp/yaml.h
+ # make sure we don't accidentally pick up a different version
+ NO_CMAKE_SYSTEM_PATH
+ NO_SYSTEM_ENVIRONMENT_PATH)
+find_library(YAML_STATIC_LIB libyaml-cpp.a
+ NO_CMAKE_SYSTEM_PATH
+ NO_SYSTEM_ENVIRONMENT_PATH)
+find_library(YAML_SHARED_LIB libyaml-cpp.so
+ NO_CMAKE_SYSTEM_PATH
+ NO_SYSTEM_ENVIRONMENT_PATH)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(YAML REQUIRED_VARS
+ YAML_STATIC_LIB YAML_SHARED_LIB YAML_INCLUDE_DIR)
diff --git a/java/kudu_style.xml b/java/config/checkstyle/checkstyle.xml
similarity index 72%
rename from java/kudu_style.xml
rename to java/config/checkstyle/checkstyle.xml
index d0b8d6c2e6..562955d052 100644
--- a/java/kudu_style.xml
+++ b/java/config/checkstyle/checkstyle.xml
@@ -19,10 +19,10 @@
// under the License.
-->
+ "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
+ "https://checkstyle.org/dtds/configuration_1_3.dtd">
-
+
+
+
+
+
-
+
-
+
-
-
-
+
+
@@ -76,19 +82,27 @@
-
+
-
-
+
+
+
+
-
+
-
+
+
@@ -108,13 +122,32 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
+
+
+
+
-
+
-
+
@@ -183,28 +221,32 @@
-
-
+
-
+
-
+
+
+
@@ -213,7 +255,8 @@
-
+
diff --git a/java/checkstyle_suppressions.xml b/java/config/checkstyle/suppressions.xml
similarity index 100%
rename from java/checkstyle_suppressions.xml
rename to java/config/checkstyle/suppressions.xml
diff --git a/java/gradle/quality.gradle b/java/gradle/quality.gradle
index c92d0b230a..8bdc21aa3f 100644
--- a/java/gradle/quality.gradle
+++ b/java/gradle/quality.gradle
@@ -25,12 +25,8 @@ apply plugin: "ru.vyarus.animalsniffer" // Ensures Java code uses APIs from a pa
apply plugin: "scalafmt" // Automatically formats Scala code on each build.
apply plugin: "net.ltgt.errorprone" // Performs static code analysis to look for bugs in Java code.
-
checkstyle {
- configFile = file("$rootDir/kudu_style.xml")
- configProperties = [
- "checkstyle.suppressions.file" : "$rootDir/checkstyle_suppressions.xml"
- ]
+ configDir = file("$rootProject.projectDir/config/checkstyle")
ignoreFailures = true
showViolations = true
}
diff --git a/java/gradle/shadow.gradle b/java/gradle/shadow.gradle
index f89b98d56d..63e7c1152a 100644
--- a/java/gradle/shadow.gradle
+++ b/java/gradle/shadow.gradle
@@ -41,12 +41,20 @@ configurations.archives.artifacts.removeAll {
it instanceof ArchivePublishArtifact && it.archiveTask == jar
}
-shadowJar {
- dependencies {
- // Our shaded jars always try to pull in the slf4j api from
- // kudu-client, though we never want it included. Excluding it
- // here prevents the need to always list it.
- exclude dependency(libs.slf4jApi)
+// Define an overridable property to indicate tool jars that should
+// include all of their dependencies.
+// We use this below to ensure slf4j is included.
+shadow.ext {
+ isToolJar = false
+}
+if (!shadow.isToolJar) {
+ shadowJar {
+ dependencies {
+ // Our shaded library jars always try to pull in the slf4j api from
+ // kudu-client, though we never want it included. Excluding it
+ // here prevents the need to always list it.
+ exclude dependency(libs.slf4jApi)
+ }
}
}
diff --git a/java/kudu-backup-tools/build.gradle b/java/kudu-backup-tools/build.gradle
new file mode 100644
index 0000000000..850060c52d
--- /dev/null
+++ b/java/kudu-backup-tools/build.gradle
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+apply plugin: "scala"
+apply from: "$rootDir/gradle/protobuf.gradle"
+apply from: "$rootDir/gradle/shadow.gradle"
+
+// Mark this as a tool jar so shadow doesn't exclude any dependencies.
+shadow {
+ isToolJar = true
+}
+
+dependencies {
+ compile project(path: ":kudu-client", configuration: "shadow")
+ compile libs.protobufJava
+ compile (libs.protobufJavaUtil) {
+ // Make sure wrong Guava version is not pulled in.
+ exclude group: "com.google.guava", module: "guava"
+ }
+ compile libs.hadoopCommon
+ compile (libs.scopt) {
+ // Make sure wrong Scala version is not pulled in.
+ exclude group: "org.scala-lang", module: "scala-library"
+ }
+ compile libs.scalaLibrary
+ compile libs.slf4jApi
+
+ optional libs.yetusAnnotations
+
+ testCompile project(path: ":kudu-test-utils", configuration: "shadow")
+ testCompile libs.junit
+ testCompile libs.log4j
+ testCompile libs.scalatest
+ testCompile libs.slf4jLog4j12
+}
+
+// Add protobuf files to the proto source set.
+sourceSets {
+ main {
+ proto {
+ srcDir "src/main/protobuf"
+ }
+ }
+}
+
+// kudu-backup-tools has no public Javadoc.
+javadoc {
+ enabled = false
+}
+
+// Skip publishing kudu-backup-tools until it's ready to be supported long-term.
+uploadArchives.enabled = false
\ No newline at end of file
diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup-tools/src/main/protobuf/backup.proto
similarity index 100%
rename from java/kudu-backup/src/main/protobuf/backup.proto
rename to java/kudu-backup-tools/src/main/protobuf/backup.proto
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
similarity index 100%
rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala
similarity index 84%
rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala
index 82578ad82d..43a359d34a 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
+++ b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala
@@ -26,15 +26,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.hadoop.fs.Path
-import org.apache.kudu.Schema
import org.apache.kudu.backup.Backup.TableMetadataPB
-import org.apache.kudu.backup.SessionIO._
+import org.apache.kudu.backup.BackupIO._
import org.apache.kudu.client.KuduTable
-import org.apache.kudu.spark.kudu.SparkUtil
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.types.ByteType
-import org.apache.spark.sql.types.StructField
-import org.apache.spark.sql.types.StructType
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
@@ -65,46 +59,19 @@ import scala.collection.mutable
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-class SessionIO(val session: SparkSession, options: CommonOptions) {
+class BackupIO(val conf: Configuration, rootPathStr: String) {
val log: Logger = LoggerFactory.getLogger(getClass)
- val conf: Configuration = session.sparkContext.hadoopConfiguration
- val rootPath: Path = new Path(options.rootPath)
+ val rootPath: Path = new Path(rootPathStr)
val fs: FileSystem = rootPath.getFileSystem(conf)
- /**
- * Returns the Spark schema for backup data based on the Kudu Schema.
- * Additionally handles adding the RowAction column for incremental backup/restore.
- */
- def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = {
- var fields = SparkUtil.sparkSchema(schema).fields
- if (includeRowAction) {
- val changeTypeField = generateRowActionColumn(schema)
- fields = fields ++ Seq(changeTypeField)
- }
- StructType(fields)
- }
-
- /**
- * Generates a RowAction column and handles column name collisions.
- * The column name can vary because it's accessed positionally.
- */
- private def generateRowActionColumn(schema: Schema): StructField = {
- var columnName = "backup_row_action"
- // If the column already exists and we need to pick an alternate column name.
- while (schema.hasColumn(columnName)) {
- columnName += "_"
- }
- StructField(columnName, ByteType)
- }
-
/**
* Return the path to the table directory.
*/
def tablePath(table: KuduTable): Path = {
val tableName = URLEncoder.encode(table.getName, "UTF-8")
val dirName = s"${table.getTableId}-$tableName"
- new Path(options.rootPath, dirName)
+ new Path(rootPath, dirName)
}
/**
@@ -269,7 +236,7 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
}
}
-object SessionIO {
+object BackupIO {
// The name of the metadata file within a backup directory.
val MetadataFileName = ".kudu-metadata.json"
}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
similarity index 98%
rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index cfd3933c7c..6fc49d3e27 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -49,7 +49,11 @@ object TableMetadata {
val MetadataFileName = ".kudu-metadata.json"
val MetadataVersion = 1
- def getTableMetadata(table: KuduTable, options: BackupOptions): TableMetadataPB = {
+ def getTableMetadata(
+ table: KuduTable,
+ fromMs: Long,
+ toMs: Long,
+ format: String): TableMetadataPB = {
val columnIds = new util.HashMap[String, Integer]()
val columns = table.getSchema.getColumns.asScala.map { col =>
columnIds.put(col.getName, table.getSchema.getColumnId(col.getName))
@@ -87,9 +91,9 @@ object TableMetadata {
TableMetadataPB
.newBuilder()
.setVersion(MetadataVersion)
- .setFromMs(options.fromMs)
- .setToMs(options.toMs)
- .setDataFormat(options.format)
+ .setFromMs(fromMs)
+ .setToMs(toMs)
+ .setDataFormat(format)
.setTableName(table.getName)
.setTableId(table.getTableId)
.addAllColumns(columns.asJava)
diff --git a/java/kudu-backup-tools/src/test/resources/log4j.properties b/java/kudu-backup-tools/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..129752c9a3
--- /dev/null
+++ b/java/kudu-backup-tools/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger = INFO, out
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
+
+log4j.logger.org.apache.kudu = DEBUG
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala b/java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
similarity index 82%
rename from java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
rename to java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
index 480d2ebf90..314a06324b 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
+++ b/java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala
@@ -16,19 +16,40 @@
// under the License.
package org.apache.kudu.backup
+import com.google.common.collect.ImmutableList
+import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.KuduTable
-import org.apache.kudu.spark.kudu._
+import org.apache.kudu.test.ClientTestUtil.getBasicSchema
+import org.apache.kudu.test.KuduTestHarness
import org.junit.Assert._
+import org.junit.Before
+import org.junit.Rule
import org.junit.Test
import org.slf4j.Logger
import org.slf4j.LoggerFactory
-class TestBackupGraph extends KuduTestSuite {
+import scala.annotation.meta.getter
+
+class TestBackupGraph {
val log: Logger = LoggerFactory.getLogger(getClass)
+ var tableName: String = "TestBackupGraph"
+ var table: KuduTable = _
+
+ @(Rule @getter)
+ val harness = new KuduTestHarness
+
+ @Before
+ def setUp(): Unit = {
+ // Create the test table.
+ val builder = new CreateTableOptions().setNumReplicas(3)
+ builder.setRangePartitionColumns(ImmutableList.of("key"))
+ table = harness.getClient.createTable(tableName, getBasicSchema, builder)
+ }
+
@Test
def testSimpleBackupGraph() {
- val graph = new BackupGraph(table.getName)
+ val graph = new BackupGraph(table.getTableId)
val full = createBackupVertex(table, 0, 1)
graph.addBackup(full)
@@ -52,7 +73,7 @@ class TestBackupGraph extends KuduTestSuite {
@Test
def testForkingBackupGraph() {
- val graph = new BackupGraph(table.getName)
+ val graph = new BackupGraph(table.getTableId)
val full = createBackupVertex(table, 0, 1)
graph.addBackup(full)
// Duplicate fromMs of 1 creates a fork in the graph.
@@ -81,7 +102,7 @@ class TestBackupGraph extends KuduTestSuite {
@Test
def testMultiFullBackupGraph() {
- val graph = new BackupGraph(table.getName)
+ val graph = new BackupGraph(table.getTableId)
val full1 = createBackupVertex(table, 0, 1)
graph.addBackup(full1)
val inc1 = createBackupVertex(table, 1, 2)
@@ -131,14 +152,7 @@ class TestBackupGraph extends KuduTestSuite {
}
private def createBackupVertex(table: KuduTable, fromMs: Long, toMs: Long): BackupNode = {
- val options = new BackupOptions(
- tables = Seq(table.getName),
- rootPath = "foo/path",
- "fooAddresses",
- fromMs = fromMs,
- toMs = toMs
- )
- val metadata = TableMetadata.getTableMetadata(table, options)
+ val metadata = TableMetadata.getTableMetadata(table, fromMs, toMs, "parquet")
BackupNode(null, metadata)
}
}
diff --git a/java/kudu-backup/build.gradle b/java/kudu-backup/build.gradle
index 53b59af1b4..514c29d3f1 100644
--- a/java/kudu-backup/build.gradle
+++ b/java/kudu-backup/build.gradle
@@ -16,17 +16,16 @@
// under the License.
apply plugin: "scala"
-apply from: "$rootDir/gradle/protobuf.gradle"
apply from: "$rootDir/gradle/shadow.gradle"
dependencies {
+ // Note: We don't use the shaded version, so we can control the dependencies.
+ compile(project(path: ":kudu-backup-tools")) {
+ // Ensure we use the hadoop-client provided by Spark to avoid any compatibility issues.
+ exclude group: "org.apache.hadoop", module: "hadoop-common"
+ }
compile project(path: ":kudu-client", configuration: "shadow")
compile project(path: ":kudu-spark", configuration: "shadow")
- compile libs.protobufJava
- compile (libs.protobufJavaUtil) {
- // Make sure wrong Guava version is not pulled in.
- exclude group: "com.google.guava", module: "guava"
- }
compile (libs.scopt) {
// Make sure wrong Scala version is not pulled in.
exclude group: "org.scala-lang", module: "scala-library"
@@ -48,15 +47,6 @@ dependencies {
testCompile libs.slf4jLog4j12
}
-// Add protobuf files to the proto source set.
-sourceSets {
- main {
- proto {
- srcDir "src/main/protobuf"
- }
- }
-}
-
// Adjust the artifact name to match the maven build.
archivesBaseName = "kudu-backup${versions.sparkBase}_${versions.scalaBase}"
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala
new file mode 100644
index 0000000000..4b1def6e6d
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import org.apache.kudu.Schema
+import org.apache.kudu.spark.kudu.SparkUtil
+import org.apache.spark.sql.types.ByteType
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
+
+object BackupUtils {
+
+ /**
+ * Returns the Spark schema for backup data based on the Kudu Schema.
+ * Additionally handles adding the RowAction column for incremental backup/restore.
+ */
+ def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = {
+ var fields = SparkUtil.sparkSchema(schema).fields
+ if (includeRowAction) {
+ val changeTypeField = generateRowActionColumn(schema)
+ fields = fields ++ Seq(changeTypeField)
+ }
+ StructType(fields)
+ }
+
+ /**
+ * Generates a RowAction column and handles column name collisions.
+ * The column name can vary because it's accessed positionally.
+ */
+ private def generateRowActionColumn(schema: Schema): StructField = {
+ var columnName = "backup_row_action"
+ // If the column already exists and we need to pick an alternate column name.
+ while (schema.hasColumn(columnName)) {
+ columnName += "_"
+ }
+ StructField(columnName, ByteType)
+ }
+
+}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 2b0d84baf2..180e7c82a2 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -41,7 +41,7 @@ object KuduBackup {
options.kuduMasterAddresses,
session.sparkContext
)
- val io = new SessionIO(session, options)
+ val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath)
// Read the required backup metadata.
val backupGraphs =
@@ -97,7 +97,7 @@ object KuduBackup {
val rdd = new KuduBackupRDD(table, tableOptions, incremental, context, session.sparkContext)
val df =
session.sqlContext
- .createDataFrame(rdd, io.dataSchema(table.getSchema, incremental))
+ .createDataFrame(rdd, BackupUtils.dataSchema(table.getSchema, incremental))
// Write the data to the backup path.
// The backup path contains the timestampMs and should not already exist.
@@ -108,7 +108,8 @@ object KuduBackup {
// Generate and output the new metadata for this table.
// The existence of metadata indicates this backup was successful.
- val tableMetadata = TableMetadata.getTableMetadata(table, tableOptions)
+ val tableMetadata = TableMetadata
+ .getTableMetadata(table, tableOptions.fromMs, tableOptions.toMs, tableOptions.format)
io.writeTableMetadata(tableMetadata, metadataPath)
}
}
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 7f1e515d84..1a5886f7da 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -47,7 +47,7 @@ object KuduRestore {
options.kuduMasterAddresses,
session.sparkContext
)
- val io = new SessionIO(session, options)
+ val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath)
// Read the required backup metadata.
val backupGraphs = io.readBackupGraphsByTableName(options.tables, options.timestampMs)
@@ -80,7 +80,7 @@ object KuduRestore {
createTableRangePartitionByRangePartition(restoreName, lastMetadata, context)
}
}
- val backupSchema = io.dataSchema(TableMetadata.getKuduSchema(metadata))
+ val backupSchema = BackupUtils.dataSchema(TableMetadata.getKuduSchema(metadata))
val rowActionCol = backupSchema.fields.last.name
val table = context.syncClient.openTable(restoreName)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
index df9eaeebc6..8bcbef4732 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala
@@ -23,14 +23,6 @@ import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import scopt.OptionParser
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-trait CommonOptions {
- val tables: Seq[String]
- val rootPath: String
- val kuduMasterAddresses: String
-}
-
@InterfaceAudience.Private
@InterfaceStability.Unstable
case class BackupOptions(
@@ -46,7 +38,6 @@ case class BackupOptions(
scanLeaderOnly: Boolean = BackupOptions.DefaultScanLeaderOnly,
scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs)
- extends CommonOptions
object BackupOptions {
val DefaultForceFull: Boolean = false
@@ -166,8 +157,7 @@ case class RestoreOptions(
kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
tableSuffix: String = "",
createTables: Boolean = RestoreOptions.DefaultCreateTables,
- timestampMs: Long = System.currentTimeMillis()
-) extends CommonOptions
+ timestampMs: Long = System.currentTimeMillis())
object RestoreOptions {
val DefaultCreateTables: Boolean = true
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 5c769ce469..8a066ac04a 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -572,7 +572,7 @@ class TestKuduBackup extends KuduTestSuite {
options: BackupOptions,
expectedRowCount: Long,
expectIncremental: Boolean): Unit = {
- val io = new SessionIO(ss, options)
+ val io = new BackupIO(ss.sparkContext.hadoopConfiguration, options.rootPath)
val tableName = options.tables.head
val table = harness.getClient.openTable(tableName)
val backupPath = io.backupPath(table, options.toMs)
@@ -587,7 +587,7 @@ class TestKuduBackup extends KuduTestSuite {
}
// Verify the output data.
- val schema = io.dataSchema(table.getSchema, expectIncremental)
+ val schema = BackupUtils.dataSchema(table.getSchema, expectIncremental)
val df = ss.sqlContext.read
.format(metadata.getDataFormat)
.schema(schema)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index afc54d1189..668f2387d1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -35,6 +35,7 @@
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.function.Consumer;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@@ -75,6 +76,7 @@
import org.apache.kudu.master.Master;
import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
import org.apache.kudu.master.Master.TableIdentifierPB;
+import org.apache.kudu.master.Master.TSInfoPB;
import org.apache.kudu.util.AsyncUtil;
import org.apache.kudu.util.NetUtil;
import org.apache.kudu.util.Pair;
@@ -2112,6 +2114,7 @@ public Object call(final GetTableLocationsResponsePB response) {
partitionKey,
requestedBatchSize,
response.getTabletLocationsList(),
+ response.getTsInfosList(),
response.getTtlMillis());
} catch (KuduException e) {
return e;
@@ -2146,12 +2149,13 @@ private void releaseMasterLookupPermit() {
}
/**
- * Makes discovered tablet locations visible in the clients caches.
+ * Makes discovered tablet locations visible in the client's caches.
* @param table the table which the locations belong to
* @param requestPartitionKey the partition key of the table locations request
* @param requestedBatchSize the number of tablet locations requested from the master in the
* original request
* @param locations the discovered locations
+ * @param tsInfosList a list of ts info that the replicas in 'locations' references by index.
* @param ttl the ttl of the locations
*/
@InterfaceAudience.LimitedPrivate("Test")
@@ -2159,8 +2163,8 @@ void discoverTablets(KuduTable table,
byte[] requestPartitionKey,
int requestedBatchSize,
List locations,
+ List tsInfosList,
long ttl) throws KuduException {
- // TODO(todd): handle "interned" response here
String tableId = table.getTableId();
String tableName = table.getName();
@@ -2179,20 +2183,48 @@ void discoverTablets(KuduTable table,
// Build the list of discovered remote tablet instances. If we have
// already discovered the tablet, its locations are refreshed.
+ int numTsInfos = tsInfosList.size();
List tablets = new ArrayList<>(locations.size());
for (Master.TabletLocationsPB tabletPb : locations) {
- List lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount());
+ List lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount());
List servers = new ArrayList<>(tabletPb.getReplicasCount());
- for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
+
+ // Lambda that does the common handling of a ts info.
+ Consumer updateServersAndCollectExceptions = tsInfo -> {
try {
- ServerInfo serverInfo = resolveTS(replica.getTsInfo());
+ ServerInfo serverInfo = resolveTS(tsInfo);
if (serverInfo != null) {
servers.add(serverInfo);
}
} catch (UnknownHostException ex) {
lookupExceptions.add(ex);
}
+ };
+
+ // Handle "old-style" non-interned replicas.
+ for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
+ updateServersAndCollectExceptions.accept(replica.getTsInfo());
+ }
+
+ // Handle interned replicas. As a shim, we also need to create a list of "old-style" ReplicaPBs
+ // to be stored inside the RemoteTablet.
+ // TODO(wdberkeley): Change this so ReplicaPBs aren't used by the client at all anymore.
+ List replicas = new ArrayList<>();
+ for (Master.TabletLocationsPB.InternedReplicaPB replica : tabletPb.getInternedReplicasList()) {
+ int tsInfoIdx = replica.getTsInfoIdx();
+ if (tsInfoIdx >= numTsInfos) {
+ lookupExceptions.add(new NonRecoverableException(Status.Corruption(
+ String.format("invalid response from master: referenced tablet idx %d but only %d present",
+ tsInfoIdx, numTsInfos))));
+ continue;
+ }
+ TSInfoPB tsInfo = tsInfosList.get(tsInfoIdx);
+ updateServersAndCollectExceptions.accept(tsInfo);
+ Master.TabletLocationsPB.ReplicaPB.Builder builder = Master.TabletLocationsPB.ReplicaPB.newBuilder();
+ builder.setRole(replica.getRole());
+ builder.setTsInfo(tsInfo);
+ replicas.add(builder.build());
}
if (!lookupExceptions.isEmpty() &&
@@ -2202,7 +2234,11 @@ void discoverTablets(KuduTable table,
throw new NonRecoverableException(statusIOE);
}
- RemoteTablet rt = new RemoteTablet(tableId, tabletPb, servers);
+ RemoteTablet rt = new RemoteTablet(tableId,
+ tabletPb.getTabletId().toStringUtf8(),
+ ProtobufHelper.pbToPartition(tabletPb.getPartition()),
+ replicas.isEmpty() ? tabletPb.getReplicasList() : replicas,
+ servers);
LOG.debug("Learned about tablet {} for table '{}' with partition {}",
rt.getTabletId(), tableName, rt.getPartition());
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
index a7fe825228..3be2770532 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -91,6 +91,7 @@ Message createRequestPB() {
builder.setPartitionKeyEnd(UnsafeByteOperations.unsafeWrap(endKey));
}
builder.setMaxReturnedLocations(maxReturnedLocations);
+ builder.setInternTsInfosInResponse(true);
return builder.build();
}
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java
index 655f800692..ae319034b3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java
@@ -107,6 +107,8 @@ public String toString() {
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static class Replica {
+ // TODO(wdberkeley): The ReplicaPB is deprecated server-side, so we ought to redo how this
+ // class stores its information.
private final ReplicaPB pb;
Replica(ReplicaPB pb) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index fd0eb3a758..2241f586cb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -69,11 +69,13 @@ public class RemoteTablet implements Comparable {
private String leaderUuid;
RemoteTablet(String tableId,
- Master.TabletLocationsPB tabletLocations,
+ String tabletId,
+ Partition partition,
+ List replicas,
List serverInfos) {
- this.tabletId = tabletLocations.getTabletId().toStringUtf8();
+ this.tabletId = tabletId;
this.tableId = tableId;
- this.partition = ProtobufHelper.pbToPartition(tabletLocations.getPartition());
+ this.partition = partition;
this.tabletServers = new HashMap<>(serverInfos.size());
for (ServerInfo serverInfo : serverInfos) {
@@ -81,18 +83,18 @@ public class RemoteTablet implements Comparable {
}
ImmutableList.Builder replicasBuilder = new ImmutableList.Builder<>();
- for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
+ for (Master.TabletLocationsPB.ReplicaPB replica : replicas) {
String uuid = replica.getTsInfo().getPermanentUuid().toStringUtf8();
replicasBuilder.add(new LocatedTablet.Replica(replica));
if (replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
- leaderUuid = uuid;
+ this.leaderUuid = uuid;
}
}
if (leaderUuid == null) {
LOG.warn("No leader provided for tablet {}", getTabletId());
}
- replicas.set(replicasBuilder.build());
+ this.replicas.set(replicasBuilder.build());
}
@Override
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index d79d114fb7..bef643d5b7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -173,7 +173,8 @@ public void testBadHostnames() throws Exception {
try {
KuduTable badTable = new KuduTable(asyncClient, "Invalid table name",
"Invalid table ID", null, null, 3);
- asyncClient.discoverTablets(badTable, null, requestBatchSize, tabletLocations, 1000);
+ asyncClient.discoverTablets(badTable, null, requestBatchSize,
+ tabletLocations, new ArrayList<>(), 1000);
fail("This should have failed quickly");
} catch (NonRecoverableException ex) {
assertTrue(ex.getMessage().contains(badHostname));
@@ -205,7 +206,8 @@ public void testNoLeader() throws Exception {
"master", leader.getRpcHost(), leader.getRpcPort(), Metadata.RaftPeerPB.Role.FOLLOWER));
tabletLocations.add(tabletPb.build());
try {
- asyncClient.discoverTablets(table, new byte[0], requestBatchSize, tabletLocations, 1000);
+ asyncClient.discoverTablets(table, new byte[0], requestBatchSize,
+ tabletLocations, new ArrayList<>(), 1000);
fail("discoverTablets should throw an exception if there's no leader");
} catch (NoLeaderFoundException ex) {
// Expected.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index 43ca37ac38..c4372c1b47 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -223,10 +223,8 @@ private RemoteTablet getTablet(int leaderIndex) {
static RemoteTablet getTablet(int leaderIndex,
int localReplicaIndex,
int sameLocationReplicaIndex) {
- Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder();
-
- tabletPb.setPartition(ProtobufUtils.getFakePartitionPB());
- tabletPb.setTabletId(ByteString.copyFromUtf8("fake tablet"));
+ Partition partition = ProtobufHelper.pbToPartition(ProtobufUtils.getFakePartitionPB().build());
+ List replicas = new ArrayList<>();
List servers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
InetAddress addr;
@@ -246,11 +244,11 @@ static RemoteTablet getTablet(int leaderIndex,
new HostAndPort("host", 1000 + i),
addr,
location));
- tabletPb.addReplicas(ProtobufUtils.getFakeTabletReplicaPB(
- uuid, "host", i,
- leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : Metadata.RaftPeerPB.Role.FOLLOWER));
+ Metadata.RaftPeerPB.Role role = leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER :
+ Metadata.RaftPeerPB.Role.FOLLOWER;
+ replicas.add(ProtobufUtils.getFakeTabletReplicaPB(uuid, "host", i, role).build());
}
- return new RemoteTablet("fake table", tabletPb.build(), servers);
+ return new RemoteTablet("fake table", "fake tablet", partition, replicas, servers);
}
}
diff --git a/java/kudu-hive/build.gradle b/java/kudu-hive/build.gradle
index 3d4bb2bf8f..31ce5775f3 100644
--- a/java/kudu-hive/build.gradle
+++ b/java/kudu-hive/build.gradle
@@ -36,7 +36,4 @@ dependencies {
// kudu-hive has no public Javadoc.
javadoc {
enabled = false
-}
-
-// Skip publishing kudu-hive until it's ready to be supported long-term.
-uploadArchives.enabled = false
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
index 37e26248e4..902ababc6b 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala
@@ -22,9 +22,8 @@ import org.apache.kudu.spark.kudu.KuduTestSuite
import org.apache.kudu.test.RandomUtils
import org.apache.kudu.util.DecimalUtil
import org.apache.kudu.util.SchemaGenerator
+import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.spark.rdd.RDD
-import org.apache.spark.scheduler.SparkListener
-import org.apache.spark.scheduler.SparkListenerTaskEnd
import org.apache.spark.sql.Row
import org.junit.Test
import org.junit.Assert.assertEquals
@@ -88,15 +87,6 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
@Test
def testNumTasks() {
- // Add a SparkListener to count the number of tasks that end.
- var actualNumTasks = 0
- val listener = new SparkListener {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
- actualNumTasks += 1
- }
- }
- ss.sparkContext.addSparkListener(listener)
-
val numTasks = 8
val numRows = 100
val args = Array(
@@ -104,22 +94,16 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
s"--num-tasks=$numTasks",
randomTableName,
harness.getMasterAddressesAsString)
- runGeneratorTest(args)
+ // count the number of tasks that end.
+ val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+ runGeneratorTest(args)
+ }
assertEquals(numTasks, actualNumTasks)
}
@Test
def testNumTasksRepartition(): Unit = {
- // Add a SparkListener to count the number of tasks that end.
- var actualNumTasks = 0
- val listener = new SparkListener {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
- actualNumTasks += 1
- }
- }
- ss.sparkContext.addSparkListener(listener)
-
val numTasks = 8
val numRows = 100
val args = Array(
@@ -128,7 +112,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite {
"--repartition=true",
randomTableName,
harness.getMasterAddressesAsString)
- runGeneratorTest(args)
+
+ // count the number of tasks that end.
+ val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+ runGeneratorTest(args)
+ }
val table = kuduContext.syncClient.openTable(randomTableName)
val numPartitions = new KuduPartitioner.KuduPartitionerBuilder(table).build().numPartitions()
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 6afd32cb3d..101546f171 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -33,8 +33,7 @@ import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.Schema
import org.apache.kudu.Type
import org.apache.kudu.test.RandomUtils
-import org.apache.spark.scheduler.SparkListener
-import org.apache.spark.scheduler.SparkListenerTaskEnd
+import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.junit.Before
import org.junit.Test
@@ -428,15 +427,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
options.addSplitRow(split)
val table = kuduClient.createTable(tableName, simpleSchema, options)
- // Add a SparkListener to count the number of tasks that end.
- var actualNumTasks = 0
- val listener = new SparkListener {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
- actualNumTasks += 1
- }
- }
- ss.sparkContext.addSparkListener(listener)
-
val random = Random.javaRandomToRandom(RandomUtils.getRandom)
val data = random.shuffle(
Seq(
@@ -459,10 +449,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
// Capture the rows so we can validate the insert order.
kuduContext.captureRows = true
- kuduContext.insertRows(
- dataDF,
- tableName,
- new KuduWriteOptions(repartition = true, repartitionSort = repartitionSort))
+ // Count the number of tasks that end.
+ val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ =>
+ kuduContext.insertRows(
+ dataDF,
+ tableName,
+ new KuduWriteOptions(repartition = true, repartitionSort = repartitionSort))
+ }
+
// 2 tasks from the parallelize call, and 2 from the repartitioning.
assertEquals(4, actualNumTasks)
val rows = kuduContext.rowsAccumulator.value.asScala
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala
new file mode 100644
index 0000000000..b9bb8857bf
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kudu.spark.kudu
+
+import org.apache.kudu.test.junit.AssertHelpers
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.scheduler.SparkListenerJobEnd
+import org.apache.spark.scheduler.SparkListenerTaskEnd
+
+object SparkListenerUtil {
+
+ // TODO: Use org.apache.spark.TestUtils.withListener if it becomes public test API
+ def withJobTaskCounter(sc: SparkContext)(body: Any => Unit): Int = {
+ // Add a SparkListener to count the number of tasks that end.
+ var numTasks = 0
+ var jobDone = false
+ val listener: SparkListener = new SparkListener {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ numTasks += 1
+ }
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ jobDone = true
+ }
+ }
+ sc.addSparkListener(listener)
+ try {
+ body()
+ } finally {
+ // Because the SparkListener events are processed on an async queue which is behind
+ // private API, we use the jobEnd event to know that all of the taskEnd events
+ // must have been processed.
+ AssertHelpers.assertEventuallyTrue("Spark job did not complete", new BooleanExpression {
+ override def get(): Boolean = jobDone
+ }, 5000)
+ sc.removeSparkListener(listener)
+ }
+ numTasks
+ }
+}
diff --git a/java/settings.gradle b/java/settings.gradle
index 96ca03fcdf..145c3c8f0c 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -20,6 +20,7 @@
rootProject.name = "kudu-parent"
include "kudu-backup"
+include "kudu-backup-tools"
include "kudu-client"
include "kudu-client-tools"
include "kudu-flume-sink"
diff --git a/src/kudu/client/master_proxy_rpc.cc b/src/kudu/client/master_proxy_rpc.cc
index 326df12117..b873f1dc1c 100644
--- a/src/kudu/client/master_proxy_rpc.cc
+++ b/src/kudu/client/master_proxy_rpc.cc
@@ -202,8 +202,8 @@ bool AsyncLeaderMasterRpc::RetryOrReconnectIfNecessary(
// negotiation.
// Authorization errors during negotiation generally indicate failure to
- // authenticate. If that failure was due to an invalid token, try to get a
- // new one by reconnecting with the master.
+ // authenticate. If that failure was due to an invalid authn token,
+ // try to get a new one by establising a new connection to the master.
const ErrorStatusPB* err = retrier().controller().error_response();
if (s.IsNotAuthorized()) {
if (err && err->has_code() &&
diff --git a/src/kudu/clock/system_ntp.cc b/src/kudu/clock/system_ntp.cc
index 108056a437..9993838c25 100644
--- a/src/kudu/clock/system_ntp.cc
+++ b/src/kudu/clock/system_ntp.cc
@@ -35,6 +35,7 @@
#include "kudu/util/errno.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
#include "kudu/util/subprocess.h"
@@ -124,36 +125,26 @@ Status WaitForNtp() {
}
LOG(INFO) << Substitute("Waiting up to --ntp_initial_sync_wait_secs=$0 "
"seconds for the clock to synchronize", wait_secs);
- vector cmd;
- string exe;
- Status s = FindExecutable("ntp-wait", {"/sbin", "/usr/sbin"}, &exe);
- if (s.ok()) {
- // -s is the number of seconds to sleep between retries.
- // -n is the number of tries before giving up.
- cmd = {exe, "-s", "1", "-n", std::to_string(wait_secs)};
- } else {
- LOG(WARNING) << "Could not find ntp-wait; trying chrony waitsync instead: "
- << s.ToString();
- s = FindExecutable("chronyc", {"/sbin", "/usr/sbin"}, &exe);
- if (!s.ok()) {
- LOG(WARNING) << "Could not find chronyc: " << s.ToString();
- return Status::NotFound("failed to find ntp-wait or chronyc");
+
+ // We previously relied on ntpd/chrony support tools to wait, but that
+ // approach doesn't work in environments where ntpd is unreachable but the
+ // clock is still synchronized (i.e. running inside a Linux container).
+ //
+ // Now we just interrogate the kernel directly.
+ Status s;
+ for (int i = 0; i < wait_secs; i++) {
+ timex timex;
+ s = CallAdjTime(&timex);
+ if (s.ok() || !s.IsServiceUnavailable()) {
+ return s;
}
- // Usage: waitsync max-tries max-correction max-skew interval.
- // max-correction and max-skew parameters as 0 means no checks.
- // The interval is measured in seconds.
- cmd = {exe, "waitsync", std::to_string(wait_secs), "0", "0", "1"};
- }
- // Unfortunately, neither ntp-wait nor chronyc waitsync print useful messages.
- // Instead, rely on DumpDiagnostics.
- s = Subprocess::Call(cmd);
- if (!s.ok()) {
- return s.CloneAndPrepend(
- Substitute("failed to wait for clock sync using command '$0'",
- JoinStrings(cmd, " ")));
+ SleepFor(MonoDelta::FromSeconds(1));
}
- return Status::OK();
+
+ // Return the last failure.
+ return s.CloneAndPrepend("Timed out waiting for clock sync");
}
+
} // anonymous namespace
void SystemNtp::DumpDiagnostics(vector* log) const {
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 3435d5d63a..a7b2057225 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1178,8 +1178,12 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH,
Status::IllegalState(error_msg));
- LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer "
- << req.leader_uuid << ": " << error_msg;
+ // Adding a check to eliminate an unnecessary log message in the
+ // scenario where this is the first message from the Leader of a new tablet.
+ if (!OpIdEquals(MakeOpId(1,1), *req.preceding_opid)) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer "
+ << req.leader_uuid << ": " << error_msg;
+ }
// If the terms mismatch we abort down to the index before the leader's preceding,
// since we know that is the last opid that has a chance of not being overwritten.
diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index e730f80653..7ccb4a9cb8 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -71,12 +71,16 @@ void MiniHms::EnableKerberos(string krb5_conf,
}
void MiniHms::EnableSentry(const HostPort& sentry_address,
- string sentry_service_principal) {
+ string sentry_service_principal,
+ int sentry_client_rpc_retry_num,
+ int sentry_client_rpc_retry_interval_ms) {
CHECK(!hms_process_);
DCHECK(!sentry_service_principal.empty());
VLOG(1) << Substitute("Enabling Sentry, at $0, for HMS", sentry_address.ToString());
sentry_address_ = sentry_address.ToString();
sentry_service_principal_ = std::move(sentry_service_principal);
+ sentry_client_rpc_retry_num_ = sentry_client_rpc_retry_num;
+ sentry_client_rpc_retry_interval_ms_ = sentry_client_rpc_retry_interval_ms;
}
void MiniHms::SetDataRoot(string data_root) {
@@ -366,6 +370,14 @@ Status MiniHms::CreateHiveSite() const {
// - sentry.metastore.service.users
// Set of service users whose access will be excluded from
// Sentry authorization checks.
+ //
+ // - sentry.service.client.rpc.retry-total
+ // Maximum number of attempts that Sentry RPC client does while
+ // re-trying a remote call to Sentry.
+ //
+ // - sentry.service.client.rpc.retry.interval.msec
+ // Time interval between attempts of Sentry's client to retry a remote
+ // call to Sentry.
static const string kSentryFileTemplate = R"(
@@ -387,12 +399,25 @@ Status MiniHms::CreateHiveSite() const {
sentry.metastore.service.users
kudu
+
+
+ sentry.service.client.rpc.retry-total
+ $3
+
+
+
+ sentry.service.client.rpc.retry.interval.msec
+ $4
+
)";
- string sentry_file_contents = Substitute(kSentryFileTemplate,
- sentry_address_,
- sentry_service_principal_,
- "server1");
+ auto sentry_file_contents = Substitute(
+ kSentryFileTemplate,
+ sentry_address_,
+ sentry_service_principal_,
+ "server1",
+ sentry_client_rpc_retry_num_,
+ sentry_client_rpc_retry_interval_ms_);
RETURN_NOT_OK(WriteStringToFile(Env::Default(),
sentry_file_contents,
JoinPathSegments(data_root_, "hive-sentry-site.xml")));
diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h
index 0adcb21042..7fb1c22089 100644
--- a/src/kudu/hms/mini_hms.h
+++ b/src/kudu/hms/mini_hms.h
@@ -48,8 +48,16 @@ class MiniHms {
// Configures the mini HMS to enable the Sentry plugin, passing the
// Sentry service's principal to be used in Kerberos environment.
+ //
+ // Parameters 'sentry_client_rpc_retry_num' and
+ // 'sentry_client_rpc_retry_interval_ms' are used to override default settings
+ // of the Sentry client used by HMS plugins. The default values for these two
+ // parameters are set to allow for shorter HMS --> Sentry RPC timeout
+ // (i.e. shorter than with the default Sentry v2.{0,1} client's settings).
void EnableSentry(const HostPort& sentry_address,
- std::string sentry_service_principal);
+ std::string sentry_service_principal,
+ int sentry_client_rpc_retry_num = 3,
+ int sentry_client_rpc_retry_interval_ms = 500);
// Configures the mini HMS to store its data in the provided path. If not set,
// it uses a test-only temporary directory.
@@ -114,6 +122,8 @@ class MiniHms {
// Sentry configuration
std::string sentry_address_;
std::string sentry_service_principal_;
+ int sentry_client_rpc_retry_num_;
+ int sentry_client_rpc_retry_interval_ms_;
};
} // namespace hms
diff --git a/src/kudu/integration-tests/hms_itest-base.cc b/src/kudu/integration-tests/hms_itest-base.cc
index 03697921f0..64a635b40b 100644
--- a/src/kudu/integration-tests/hms_itest-base.cc
+++ b/src/kudu/integration-tests/hms_itest-base.cc
@@ -35,6 +35,7 @@
#include "kudu/hms/mini_hms.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/util/decimal_util.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -73,9 +74,9 @@ Status HmsITestBase::CreateDatabase(const string& database_name) {
}
Status HmsITestBase::CreateKuduTable(const string& database_name,
- const string& table_name) {
+ const string& table_name,
+ MonoDelta timeout) {
// Get coverage of all column types.
- KuduSchema schema;
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
b.AddColumn("int8_val")->Type(KuduColumnSchema::INT8);
@@ -94,14 +95,19 @@ Status HmsITestBase::CreateKuduTable(const string& database_name,
->Precision(kMaxDecimal64Precision);
b.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL)
->Precision(kMaxDecimal128Precision);
-
+ KuduSchema schema;
RETURN_NOT_OK(b.Build(&schema));
unique_ptr table_creator(client_->NewTableCreator());
- return table_creator->table_name(Substitute("$0.$1", database_name, table_name))
- .schema(&schema)
- .num_replicas(1)
- .set_range_partition_columns({ "key" })
- .Create();
+ if (timeout.Initialized()) {
+ // If specified, set the timeout for the operation.
+ table_creator->timeout(timeout);
+ }
+ return table_creator->table_name(Substitute("$0.$1",
+ database_name, table_name))
+ .schema(&schema)
+ .num_replicas(1)
+ .set_range_partition_columns({ "key" })
+ .Create();
}
Status HmsITestBase::RenameHmsTable(const string& database_name,
diff --git a/src/kudu/integration-tests/hms_itest-base.h b/src/kudu/integration-tests/hms_itest-base.h
index b348031684..50c814c4e1 100644
--- a/src/kudu/integration-tests/hms_itest-base.h
+++ b/src/kudu/integration-tests/hms_itest-base.h
@@ -22,12 +22,15 @@
#include
+#include "kudu/gutil/port.h"
#include "kudu/hms/hms_client.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/util/status.h"
namespace kudu {
+class MonoDelta;
+
class HmsITestBase : public ExternalMiniClusterITestBase {
public:
Status StartHms() WARN_UNUSED_RESULT;
@@ -38,7 +41,8 @@ class HmsITestBase : public ExternalMiniClusterITestBase {
// Creates a table in Kudu.
Status CreateKuduTable(const std::string& database_name,
- const std::string& table_name) WARN_UNUSED_RESULT;
+ const std::string& table_name,
+ MonoDelta timeout = {}) WARN_UNUSED_RESULT;
// Renames a table entry in the HMS catalog.
Status RenameHmsTable(const std::string& database_name,
diff --git a/src/kudu/integration-tests/master_sentry-itest.cc b/src/kudu/integration-tests/master_sentry-itest.cc
index 02ea86a119..ddaf02d808 100644
--- a/src/kudu/integration-tests/master_sentry-itest.cc
+++ b/src/kudu/integration-tests/master_sentry-itest.cc
@@ -873,6 +873,119 @@ TEST_F(SentryAuthzProviderCacheITest, ResetAuthzCacheConcurrentAlterTable) {
}
}
+// This test scenario documents an artifact of the Kudu+HMS+Sentry integration
+// when authz cache is enabled (the cache is enabled by default). In essence,
+// information on the ownership of a table created during a short period of
+// Sentry's unavailability will not ever appear in Sentry. That might be
+// misleading because CreateTable() reports a success to the client. The created
+// table indeed exists and is fully functional otherwise, but the corresponding
+// owner privilege record is absent in Sentry.
+//
+// TODO(aserbin): clarify why it works with HEAD of the master branches
+// of Sentry/Hive but fails with Sentry 2.1.0 and Hive 2.1.1.
+TEST_F(SentryAuthzProviderCacheITest, DISABLED_CreateTables) {
+ constexpr const char* const kGhostTables[] = { "t10", "t11" };
+
+ // Grant CREATE TABLE and METADATA privileges on the database.
+ ASSERT_OK(GrantCreateTablePrivilege({ kDatabaseName }));
+ ASSERT_OK(AlterRoleGrantPrivilege(
+ sentry_client_.get(), kDevRole,
+ GetDatabasePrivilege(kDatabaseName, "METADATA")));
+
+ // Make sure it's possible to create a table in the database. This also
+ // populates the privileges cache with information on the privileges
+ // granted on the database.
+ ASSERT_OK(CreateKuduTable(kDatabaseName, "t0"));
+
+ // An attempt to open a not-yet-existing table will fetch the information
+ // on the granted privileges on the table into the privileges cache.
+ for (const auto& t : kGhostTables) {
+ shared_ptr kudu_table;
+ const auto s = client_->OpenTable(Substitute("$0.$1", kDatabaseName, t),
+ &kudu_table);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
+
+ ASSERT_OK(StopSentry());
+
+ // CreateTable() with operation timeout longer than HMS --> Sentry
+ // communication timeout successfully completes. After failing to push
+ // the information on the newly created table to Sentry due to the logic
+ // implemented in the SentrySyncHMSNotificationsPostEventListener plugin,
+ // HMS sends success response to Kudu master and Kudu successfully completes
+ // the rest of the steps.
+ ASSERT_OK(CreateKuduTable(kDatabaseName, kGhostTables[0]));
+
+ // In this case, the timeout for the CreateTable RPC is set to be lower than
+ // the HMS --> Sentry communication timeout (see corresponding parameters
+ // of the MiniHms::EnableSentry() method). CreateTable() successfully passes
+ // the authz phase since the information on privileges is cached and no
+ // Sentry RPC calls are attempted. However, since Sentry is down,
+ // CreateTable() takes a long time on the HMS's side and the client's
+ // request times out, while the creation of the table continues in the
+ // background.
+ {
+ const auto s = CreateKuduTable(kDatabaseName, kGhostTables[1],
+ MonoDelta::FromSeconds(1));
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ }
+
+ // Before starting Sentry, make sure the abandoned request to create the
+ // latter table succeeded even if CreateKuduTable() reported timeout.
+ // This is to make sure HMS stopped trying to push notification
+ // on table creation to Sentry anymore via the metastore plugin
+ // SentrySyncHMSNotificationsPostEventListener.
+ ASSERT_EVENTUALLY([&]{
+ bool exists;
+ ASSERT_OK(client_->TableExists(
+ Substitute("$0.$1", kDatabaseName, kGhostTables[1]), &exists));
+ ASSERT_TRUE(exists);
+ });
+
+ ASSERT_OK(ResetCache());
+
+ // After resetting the cache, it should not be possible to create another
+ // table: authz provider needs to fetch information on privileges directly
+ // from Sentry, but it's still down.
+ {
+ const auto s = CreateKuduTable(kDatabaseName, "t2");
+ ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+ }
+
+ ASSERT_OK(StartSentry());
+
+ // Try to create the table after starting Sentry back: it should be a success.
+ ASSERT_OK(CreateKuduTable(kDatabaseName, "t2"));
+
+ // Once table has been created, it should be possible to perform DDL operation
+ // on it since the user is the owner of the table.
+ {
+ unique_ptr table_alterer(
+ client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, "t2")));
+ table_alterer->AddColumn("new_int8_columun")->Type(KuduColumnSchema::INT8);
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ // Try to run DDL against the tables created during Sentry's downtime. These
+ // should not be authorized since Sentry didn't received information on the
+ // ownership of those tables from HMS during their creation and there isn't
+ // any catch up for those events made after Sentry started.
+ for (const auto& t : kGhostTables) {
+ unique_ptr table_alterer(
+ client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, t)));
+ table_alterer->AddColumn("new_int8_columun")->Type(KuduColumnSchema::INT8);
+ auto s = table_alterer->Alter();
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+
+ // After granting the ALTER TABLE privilege the alteration should be
+ // successful. One caveat: it's necessary to reset the cache since the cache
+ // will not have the new record till the current entry hasn't yet expired.
+ ASSERT_OK(GrantAlterTablePrivilege({ kDatabaseName, t }));
+ ASSERT_OK(ResetCache());
+ ASSERT_OK(table_alterer->Alter());
+ }
+}
+
// Basic test to verify access control and functionality of
// the ResetAuthzCache(); integration with Sentry is not enabled.
class AuthzCacheControlTest : public ExternalMiniClusterITestBase {
diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
index b34d2338b1..2237a4b47a 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -118,7 +118,7 @@ Status HmsNotificationLogListenerTask::WaitForCatchUp(const MonoTime& deadline)
wake_up_cv_.Signal();
}
- RETURN_NOT_OK_PREPEND(synchronizer.WaitFor(deadline - MonoTime::Now()),
+ RETURN_NOT_OK_PREPEND(synchronizer.WaitUntil(deadline),
"failed to wait for Hive Metastore notification log listener to catch up");
return Status::OK();
}
diff --git a/src/kudu/tablet/rowset_info.h b/src/kudu/tablet/rowset_info.h
index af2d07db3e..f133e3e1ee 100644
--- a/src/kudu/tablet/rowset_info.h
+++ b/src/kudu/tablet/rowset_info.h
@@ -51,6 +51,8 @@ class RowSetInfo {
// rowset tree is set into 'average_height', if it is not nullptr.
// If one of 'info_by_min_key' and 'info_by_max_key' is nullptr, the other
// must be.
+ // Requires holding the compact_select_lock_ for the tablet that the
+ // rowsets in 'tree' references.
static void ComputeCdfAndCollectOrdered(const RowSetTree& tree,
double* average_height,
std::vector* info_by_min_key,
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 7627ed43d2..daee9a5b18 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -134,11 +134,13 @@ DEFINE_double(tablet_throttler_burst_factor, 1.0f,
"base rate.");
TAG_FLAG(tablet_throttler_burst_factor, experimental);
-DEFINE_int32(tablet_history_max_age_sec, 15 * 60,
- "Number of seconds to retain tablet history. Reads initiated at a "
- "snapshot that is older than this age will be rejected. "
- "To disable history removal, set to -1.");
+DEFINE_int32(tablet_history_max_age_sec, 60 * 60 * 24 * 7,
+ "Number of seconds to retain tablet history, including history "
+ "required to perform diff scans and incremental backups. Reads "
+ "initiated at a snapshot that is older than this age will be "
+ "rejected. To disable history removal, set to -1.");
TAG_FLAG(tablet_history_max_age_sec, advanced);
+TAG_FLAG(tablet_history_max_age_sec, stable);
DEFINE_int32(max_cell_size_bytes, 64 * 1024,
"The maximum size of any individual cell in a table. Attempting to store "
@@ -264,7 +266,6 @@ Status Tablet::Open() {
TRACE_EVENT0("tablet", "Tablet::Open");
RETURN_IF_STOPPED_OR_CHECK_STATE(kInitialized);
- std::lock_guard lock(component_lock_);
CHECK(schema()->has_column_ids());
next_mrs_id_ = metadata_->last_durable_mrs_id() + 1;
@@ -291,15 +292,6 @@ Status Tablet::Open() {
shared_ptr new_rowset_tree(new RowSetTree());
CHECK_OK(new_rowset_tree->Reset(rowsets_opened));
- if (metrics_) {
- // Compute the initial average height of the rowset tree.
- double avg_height;
- RowSetInfo::ComputeCdfAndCollectOrdered(*new_rowset_tree,
- &avg_height,
- nullptr,
- nullptr);
- metrics_->average_diskrowset_height->set_value(avg_height);
- }
// Now that the current state is loaded, create the new MemRowSet with the next id.
shared_ptr new_mrs;
@@ -307,7 +299,13 @@ Status Tablet::Open() {
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&new_mrs));
- components_ = new TabletComponents(new_mrs, new_rowset_tree);
+ {
+ std::lock_guard lock(component_lock_);
+ components_ = new TabletComponents(new_mrs, new_rowset_tree);
+ }
+
+ // Compute the initial average rowset height.
+ UpdateAverageRowsetHeight();
{
std::lock_guard l(state_lock_);
@@ -1053,10 +1051,10 @@ void Tablet::ModifyRowSetTree(const RowSetTree& old_tree,
CHECK_OK(new_tree->Reset(post_swap));
}
-void Tablet::AtomicSwapRowSets(const RowSetVector &old_rowsets,
- const RowSetVector &new_rowsets) {
+void Tablet::AtomicSwapRowSets(const RowSetVector &to_remove,
+ const RowSetVector &to_add) {
std::lock_guard lock(component_lock_);
- AtomicSwapRowSetsUnlocked(old_rowsets, new_rowsets);
+ AtomicSwapRowSetsUnlocked(to_remove, to_add);
}
void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove,
@@ -1068,19 +1066,6 @@ void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove,
to_remove, to_add, new_tree.get());
components_ = new TabletComponents(components_->memrowset, new_tree);
-
- // Recompute the average rowset height.
- // TODO(wdberkeley): We should be able to cache the computation of the CDF
- // and average height and efficiently recompute it instead of doing it from
- // scratch.
- if (metrics_) {
- double avg_height;
- RowSetInfo::ComputeCdfAndCollectOrdered(*new_tree,
- &avg_height,
- nullptr,
- nullptr);
- metrics_->average_diskrowset_height->set_value(avg_height);
- }
}
Status Tablet::DoMajorDeltaCompaction(const vector& col_ids,
@@ -1723,6 +1708,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
// Replace the compacted rowsets with the new on-disk rowsets, making them visible now that
// their metadata was written to disk.
AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets);
+ UpdateAverageRowsetHeight();
const auto rows_written = drsw.rows_written_count();
const auto drs_written = drsw.drs_written_count();
@@ -1753,9 +1739,28 @@ Status Tablet::HandleEmptyCompactionOrFlush(const RowSetVector& rowsets,
"Failed to flush new tablet metadata");
AtomicSwapRowSets(rowsets, RowSetVector());
+ UpdateAverageRowsetHeight();
return Status::OK();
}
+void Tablet::UpdateAverageRowsetHeight() {
+ if (!metrics_) {
+ return;
+ }
+ // TODO(wdberkeley): We should be able to cache the computation of the CDF
+ // and average height and efficiently recompute it instead of doing it from
+ // scratch.
+ scoped_refptr comps;
+ GetComponents(&comps);
+ std::lock_guard l(compact_select_lock_);
+ double avg_height;
+ RowSetInfo::ComputeCdfAndCollectOrdered(*comps->rowsets,
+ &avg_height,
+ nullptr,
+ nullptr);
+ metrics_->average_diskrowset_height->set_value(avg_height);
+}
+
Status Tablet::Compact(CompactFlags flags) {
RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 87448f5da3..c4a5691046 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -347,7 +347,6 @@ class Tablet {
// memrowset in the current implementation.
Status CountRows(uint64_t *count) const;
-
// Verbosely dump this entire tablet to the logs. This is only
// really useful when debugging unit tests failures where the tablet
// has a very small number of rows.
@@ -418,6 +417,7 @@ class Tablet {
// This method is thread-safe.
void CancelMaintenanceOps();
+ const std::string& table_id() const { return metadata_->table_id(); }
const std::string& tablet_id() const { return metadata_->tablet_id(); }
// Return the metrics for this tablet.
@@ -599,6 +599,10 @@ class Tablet {
Status HandleEmptyCompactionOrFlush(const RowSetVector& rowsets,
int mrs_being_flushed);
+ // Updates the average rowset height metric. Acquires the tablet's
+ // compact_select_lock_.
+ void UpdateAverageRowsetHeight();
+
Status FlushMetadata(const RowSetVector& to_remove,
const RowSetMetadataVector& to_add,
int64_t mrs_being_flushed);
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index 62e754532e..adf0d4cbb7 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -119,7 +119,7 @@ class TabletMetadata : public RefCountedThreadSafe {
return partition_;
}
- std::string table_id() const {
+ const std::string& table_id() const {
DCHECK_NE(state_, kNotLoadedYet);
return table_id_;
}
diff --git a/src/kudu/tablet/tablet_mm_ops-test.cc b/src/kudu/tablet/tablet_mm_ops-test.cc
index a2f55728cd..3a13386a9d 100644
--- a/src/kudu/tablet/tablet_mm_ops-test.cc
+++ b/src/kudu/tablet/tablet_mm_ops-test.cc
@@ -62,7 +62,7 @@ class KuduTabletMmOpsTest : public TabletTestBase> {
void StatsShouldChange(MaintenanceOp* op) {
SleepFor(MonoDelta::FromMilliseconds(1));
op->UpdateStats(&stats_);
- ASSERT_TRUE(next_time_ < stats_.last_modified());
+ ASSERT_LT(next_time_, stats_.last_modified());
next_time_ = stats_.last_modified();
}
@@ -70,7 +70,6 @@ class KuduTabletMmOpsTest : public TabletTestBase> {
SleepFor(MonoDelta::FromMilliseconds(1));
op->UpdateStats(&stats_);
ASSERT_EQ(next_time_, stats_.last_modified());
- next_time_ = stats_.last_modified();
}
void TestFirstCall(MaintenanceOp* op) {
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index c71bc6e527..c54c241966 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -87,6 +87,10 @@ string TabletOpBase::LogPrefix() const {
return tablet_->LogPrefix();
}
+const std::string& TabletOpBase::table_id() const {
+ return tablet_->table_id();
+}
+
////////////////////////////////////////////////////////////
// CompactRowSetsOp
////////////////////////////////////////////////////////////
@@ -262,12 +266,12 @@ void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
- int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
- int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
- int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
- int64_t new_num_rs_minor_delta_compacted =
+ uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+ uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
+ uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+ uint64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
- int64_t new_num_rs_major_delta_compacted =
+ uint64_t new_num_rs_major_delta_compacted =
metrics->delta_major_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index 8fc865e0f1..1e117f1bd1 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -43,6 +43,8 @@ class TabletOpBase : public MaintenanceOp {
std::string LogPrefix() const;
protected:
+ const std::string& table_id() const OVERRIDE;
+
Tablet* const tablet_;
};
@@ -57,15 +59,15 @@ class CompactRowSetsOp : public TabletOpBase {
public:
explicit CompactRowSetsOp(Tablet* tablet);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() OVERRIDE;
- virtual void Perform() OVERRIDE;
+ void Perform() OVERRIDE;
- virtual scoped_refptr DurationHistogram() const OVERRIDE;
+ scoped_refptr DurationHistogram() const OVERRIDE;
- virtual scoped_refptr > RunningGauge() const OVERRIDE;
+ scoped_refptr > RunningGauge() const OVERRIDE;
private:
mutable simple_spinlock lock_;
@@ -83,15 +85,15 @@ class MinorDeltaCompactionOp : public TabletOpBase {
public:
explicit MinorDeltaCompactionOp(Tablet* tablet);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() OVERRIDE;
- virtual void Perform() OVERRIDE;
+ void Perform() OVERRIDE;
- virtual scoped_refptr DurationHistogram() const OVERRIDE;
+ scoped_refptr DurationHistogram() const OVERRIDE;
- virtual scoped_refptr > RunningGauge() const OVERRIDE;
+ scoped_refptr > RunningGauge() const OVERRIDE;
private:
mutable simple_spinlock lock_;
@@ -109,15 +111,15 @@ class MajorDeltaCompactionOp : public TabletOpBase {
public:
explicit MajorDeltaCompactionOp(Tablet* tablet);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() OVERRIDE;
- virtual void Perform() OVERRIDE;
+ void Perform() OVERRIDE;
- virtual scoped_refptr DurationHistogram() const OVERRIDE;
+ scoped_refptr DurationHistogram() const OVERRIDE;
- virtual scoped_refptr > RunningGauge() const OVERRIDE;
+ scoped_refptr > RunningGauge() const OVERRIDE;
private:
mutable simple_spinlock lock_;
@@ -138,19 +140,19 @@ class UndoDeltaBlockGCOp : public TabletOpBase {
// Estimates the number of bytes that may potentially be in ancient delta
// undo blocks. Over time, as Perform() is invoked, this estimate gets more
// accurate.
- void UpdateStats(MaintenanceOpStats* stats) override;
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
- bool Prepare() override;
+ bool Prepare() OVERRIDE;
// Deletes ancient history data from disk. This also initializes undo delta
// blocks greedily (in a budgeted manner controlled by the
// --undo_delta_block_gc_init_budget_millis gflag) that makes the estimate
// performed in UpdateStats() more accurate.
- void Perform() override;
+ void Perform() OVERRIDE;
- scoped_refptr DurationHistogram() const override;
+ scoped_refptr DurationHistogram() const OVERRIDE;
- scoped_refptr > RunningGauge() const override;
+ scoped_refptr > RunningGauge() const OVERRIDE;
private:
std::string LogPrefix() const;
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index c7b231bc94..a284215bc8 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -38,6 +38,7 @@
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
+#include "kudu/fs/data_dirs.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
@@ -124,7 +125,6 @@ TabletReplica::TabletReplica(
Callback mark_dirty_clbk)
: meta_(DCHECK_NOTNULL(std::move(meta))),
cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
- tablet_id_(meta_->tablet_id()),
local_peer_pb_(std::move(local_peer_pb)),
log_anchor_registry_(new LogAnchorRegistry()),
apply_pool_(apply_pool),
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index bf9d63dd0e..50fab2dae8 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -254,10 +254,8 @@ class TabletReplica : public RefCountedThreadSafe,
return log_anchor_registry_;
}
- // Returns the tablet_id of the tablet managed by this TabletReplica.
- // Returns the correct tablet_id even if the underlying tablet is not available
- // yet.
- const std::string& tablet_id() const { return tablet_id_; }
+ const std::string& table_id() const { return meta_->table_id(); }
+ const std::string& tablet_id() const { return meta_->tablet_id(); }
// Convenience method to return the permanent_uuid of this peer.
std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); }
@@ -322,7 +320,6 @@ class TabletReplica : public RefCountedThreadSafe,
const scoped_refptr meta_;
const scoped_refptr cmeta_manager_;
- const std::string tablet_id_;
const consensus::RaftPeerPB local_peer_pb_;
scoped_refptr log_anchor_registry_; // Assigned in tablet_replica-test
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc
index a7267debc2..a848bcb838 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.cc
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
#include
#include
@@ -121,6 +122,20 @@ void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats
}
}
+//
+// TabletReplicaOpBase.
+//
+TabletReplicaOpBase::TabletReplicaOpBase(std::string name,
+ IOUsage io_usage,
+ TabletReplica* tablet_replica)
+ : MaintenanceOp(std::move(name), io_usage),
+ tablet_replica_(tablet_replica) {
+}
+
+const std::string& TabletReplicaOpBase::table_id() const {
+ return tablet_replica_->table_id();
+}
+
//
// FlushMRSOp.
//
@@ -260,9 +275,10 @@ scoped_refptr > FlushDeltaMemStoresOp::RunningGauge() cons
//
LogGCOp::LogGCOp(TabletReplica* tablet_replica)
- : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
- MaintenanceOp::LOW_IO_USAGE),
- tablet_replica_(tablet_replica),
+ : TabletReplicaOpBase(StringPrintf("LogGCOp(%s)",
+ tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::LOW_IO_USAGE,
+ tablet_replica),
log_gc_duration_(METRIC_log_gc_duration.Instantiate(
tablet_replica->tablet()->GetMetricEntity())),
log_gc_running_(METRIC_log_gc_running.Instantiate(
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h b/src/kudu/tablet/tablet_replica_mm_ops.h
index 2404b489bf..72fdb39442 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.h
+++ b/src/kudu/tablet/tablet_replica_mm_ops.h
@@ -47,86 +47,92 @@ class FlushOpPerfImprovementPolicy {
FlushOpPerfImprovementPolicy() {}
};
+class TabletReplicaOpBase : public MaintenanceOp {
+ public:
+ explicit TabletReplicaOpBase(std::string name, IOUsage io_usage, TabletReplica* tablet_replica);
+
+ protected:
+ const std::string& table_id() const OVERRIDE;
+
+ TabletReplica *const tablet_replica_;
+};
+
// Maintenance op for MRS flush. Only one can happen at a time.
-class FlushMRSOp : public MaintenanceOp {
+class FlushMRSOp : public TabletReplicaOpBase {
public:
explicit FlushMRSOp(TabletReplica* tablet_replica)
- : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
- MaintenanceOp::HIGH_IO_USAGE),
- tablet_replica_(tablet_replica) {
+ : TabletReplicaOpBase(StringPrintf("FlushMRSOp(%s)",
+ tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::HIGH_IO_USAGE,
+ tablet_replica) {
time_since_flush_.start();
}
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() OVERRIDE;
- virtual void Perform() OVERRIDE;
+ void Perform() OVERRIDE;
- virtual scoped_refptr DurationHistogram() const OVERRIDE;
+ scoped_refptr DurationHistogram() const OVERRIDE;
- virtual scoped_refptr > RunningGauge() const OVERRIDE;
+ scoped_refptr > RunningGauge() const OVERRIDE;
private:
// Lock protecting time_since_flush_.
mutable simple_spinlock lock_;
Stopwatch time_since_flush_;
-
- TabletReplica *const tablet_replica_;
};
// Maintenance op for DMS flush.
// Reports stats for all the DMS this tablet contains but only flushes one in Perform().
-class FlushDeltaMemStoresOp : public MaintenanceOp {
+class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
public:
explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica)
- : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
- tablet_replica->tablet()->tablet_id().c_str()),
- MaintenanceOp::HIGH_IO_USAGE),
- tablet_replica_(tablet_replica) {
+ : TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)",
+ tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::HIGH_IO_USAGE,
+ tablet_replica) {
time_since_flush_.start();
}
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
- virtual bool Prepare() OVERRIDE {
+ bool Prepare() OVERRIDE {
return true;
}
- virtual void Perform() OVERRIDE;
+ void Perform() OVERRIDE;
- virtual scoped_refptr DurationHistogram() const OVERRIDE;
+ scoped_refptr DurationHistogram() const OVERRIDE;
- virtual scoped_refptr > RunningGauge() const OVERRIDE;
+ scoped_refptr > RunningGauge() const OVERRIDE;
private:
// Lock protecting time_since_flush_
mutable simple_spinlock lock_;
Stopwatch time_since_flush_;
-
- TabletReplica *const tablet_replica_;
};
// Maintenance task that runs log GC. Reports log retention that represents the amount of data
// that can be GC'd.
//
// Only one LogGC op can run at a time.
-class LogGCOp : public MaintenanceOp {
+class LogGCOp : public TabletReplicaOpBase {
public:
explicit LogGCOp(TabletReplica* tablet_replica);
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
- virtual bool Prepare() OVERRIDE;
+ bool Prepare() OVERRIDE;
- virtual void Perform() OVERRIDE;
+ void Perform() OVERRIDE;
- virtual scoped_refptr DurationHistogram() const OVERRIDE;
+ scoped_refptr DurationHistogram() const OVERRIDE;
- virtual scoped_refptr > RunningGauge() const OVERRIDE;
+ scoped_refptr > RunningGauge() const OVERRIDE;
private:
- TabletReplica *const tablet_replica_;
scoped_refptr log_gc_duration_;
scoped_refptr > log_gc_running_;
mutable Semaphore sem_;
diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h
index 99624b5943..b4db99c80d 100644
--- a/src/kudu/tserver/tablet_copy-test-base.h
+++ b/src/kudu/tserver/tablet_copy-test-base.h
@@ -57,7 +57,9 @@ class TabletCopyTest : public TabletServerTestBase {
}
virtual void TearDown() OVERRIDE {
- ASSERT_OK(tablet_replica_->log_anchor_registry()->Unregister(&anchor_));
+ if (tablet_replica_) {
+ ASSERT_OK(tablet_replica_->log_anchor_registry()->Unregister(&anchor_));
+ }
NO_FATALS(TabletServerTestBase::TearDown());
}
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index f12d181e8e..47334c63b2 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -230,6 +230,7 @@ set(UTIL_SRCS
version_info.cc
version_util.cc
website_util.cc
+ yamlreader.cc
zlib.cc
)
@@ -257,6 +258,7 @@ set(UTIL_LIBS
pb_util_proto
protobuf
version_info_proto
+ yaml
zlib)
if(NOT APPLE)
@@ -293,7 +295,6 @@ set(UTIL_COMPRESSION_SRCS
set(UTIL_COMPRESSION_LIBS
kudu_util
util_compression_proto
-
glog
gutil
lz4
@@ -467,6 +468,7 @@ ADD_KUDU_TEST(ttl_cache-test)
ADD_KUDU_TEST(url-coding-test)
ADD_KUDU_TEST(user-test)
ADD_KUDU_TEST(version_util-test)
+ADD_KUDU_TEST(yamlreader-test)
if (NOT APPLE)
ADD_KUDU_TEST(minidump-test)
diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc
index 5cb7a6372b..91f2baa11d 100644
--- a/src/kudu/util/async_util-test.cc
+++ b/src/kudu/util/async_util-test.cc
@@ -28,6 +28,7 @@
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/callback.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -43,7 +44,7 @@ class AsyncUtilTest : public KuduTest {
// Set up an alarm to fail the test in case of deadlock.
alarm(30);
}
- ~AsyncUtilTest() {
+ virtual ~AsyncUtilTest() {
// Disable the alarm on test exit.
alarm(0);
}
@@ -99,31 +100,72 @@ TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) {
}
}
-TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) {
- thread waiter;
- {
- Synchronizer sync;
- auto cb = sync.AsStatusCallback();
- waiter = thread([cb] {
- SleepFor(MonoDelta::FromMilliseconds(5));
- cb.Run(Status::OK());
- });
- ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000)));
- }
- waiter.join();
+// Flavors of wait that Synchronizer is capable of: WaitFor() or WaitUntil().
+enum class TimedWaitFlavor {
+ WaitFor,
+ WaitUntil,
+};
- {
- Synchronizer sync;
- auto cb = sync.AsStatusCallback();
- waiter = thread([cb] {
- SleepFor(MonoDelta::FromMilliseconds(1000));
- cb.Run(Status::OK());
- });
- ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut());
+class AsyncUtilTimedWaitTest:
+ public AsyncUtilTest,
+ public ::testing::WithParamInterface {
+};
+
+TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitSuccess) {
+ const auto kWaitInterval = MonoDelta::FromMilliseconds(1000);
+
+ Synchronizer sync;
+ auto cb = sync.AsStatusCallback();
+ auto waiter = thread([cb] {
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ cb.Run(Status::OK());
+ });
+ SCOPED_CLEANUP({
+ waiter.join();
+ });
+ const auto mode = GetParam();
+ switch (mode) {
+ case TimedWaitFlavor::WaitFor:
+ ASSERT_OK(sync.WaitFor(kWaitInterval));
+ break;
+ case TimedWaitFlavor::WaitUntil:
+ ASSERT_OK(sync.WaitUntil(MonoTime::Now() + kWaitInterval));
+ break;
+ default:
+ FAIL() << "unsupported wait mode " << static_cast(mode);
+ break;
}
+}
+
+TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitTimeout) {
+ const auto kWaitInterval = MonoDelta::FromMilliseconds(5);
- // Waiting on the thread gives TSAN to check that no thread safety issues
- // occurred.
- waiter.join();
+ Synchronizer sync;
+ auto cb = sync.AsStatusCallback();
+ auto waiter = thread([cb] {
+ SleepFor(MonoDelta::FromMilliseconds(1000));
+ cb.Run(Status::OK());
+ });
+ SCOPED_CLEANUP({
+ waiter.join();
+ });
+ const auto mode = GetParam();
+ switch (mode) {
+ case TimedWaitFlavor::WaitFor:
+ ASSERT_TRUE(sync.WaitFor(kWaitInterval).IsTimedOut());
+ break;
+ case TimedWaitFlavor::WaitUntil:
+ ASSERT_TRUE(sync.WaitUntil(MonoTime::Now() + kWaitInterval).IsTimedOut());
+ break;
+ default:
+ FAIL() << "unsupported wait mode " << static_cast(mode);
+ break;
+ }
}
+
+INSTANTIATE_TEST_CASE_P(WaitFlavors,
+ AsyncUtilTimedWaitTest,
+ ::testing::Values(TimedWaitFlavor::WaitFor,
+ TimedWaitFlavor::WaitUntil));
+
} // namespace kudu
diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h
index 338c6c2973..61621d6b4f 100644
--- a/src/kudu/util/async_util.h
+++ b/src/kudu/util/async_util.h
@@ -44,7 +44,7 @@ namespace kudu {
class Synchronizer {
public:
Synchronizer()
- : data_(std::make_shared()) {
+ : data_(std::make_shared()) {
}
void StatusCB(const Status& status) {
@@ -71,6 +71,13 @@ class Synchronizer {
return data_->status;
}
+ Status WaitUntil(const MonoTime& deadline) const {
+ if (PREDICT_FALSE(!data_->latch.WaitUntil(deadline))) {
+ return Status::TimedOut("timed out while waiting for the callback to be called");
+ }
+ return data_->status;
+ }
+
void Reset() {
data_->latch.Reset(1);
}
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index 6777e06b81..8c88dd74a9 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -15,16 +15,22 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/util/maintenance_manager.h"
+
+#include
+
+#include
#include
#include
+#include
#include
#include
#include
#include
#include
-#include
#include // IWYU pragma: keep
+#include
#include
#include
#include
@@ -32,7 +38,6 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/maintenance_manager.h"
#include "kudu/util/maintenance_manager.pb.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -42,9 +47,9 @@
#include "kudu/util/test_util.h"
#include "kudu/util/thread.h"
+using std::list;
using std::shared_ptr;
using std::string;
-using std::vector;
using strings::Substitute;
METRIC_DEFINE_entity(test);
@@ -57,17 +62,29 @@ METRIC_DEFINE_histogram(test, maintenance_op_duration,
kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
DECLARE_int64(log_target_replay_size_mb);
+DECLARE_string(maintenance_manager_table_priorities);
+DECLARE_double(maintenance_op_multiplier);
+DECLARE_int32(max_priority_range);
+
namespace kudu {
-static const int kHistorySize = 4;
+static const int kHistorySize = 6;
static const char kFakeUuid[] = "12345";
class MaintenanceManagerTest : public KuduTest {
public:
- void SetUp() override {
+ void SetUp() OVERRIDE {
+ StartManager(2);
+ }
+
+ void TearDown() OVERRIDE {
+ StopManager();
+ }
+
+ void StartManager(int32_t num_threads) {
MaintenanceManager::Options options;
- options.num_threads = 2;
+ options.num_threads = num_threads;
options.polling_interval_ms = 1;
options.history_size = kHistorySize;
manager_.reset(new MaintenanceManager(options, kFakeUuid));
@@ -78,7 +95,7 @@ class MaintenanceManagerTest : public KuduTest {
ASSERT_OK(manager_->Start());
}
- void TearDown() override {
+ void StopManager() {
manager_->Shutdown();
}
@@ -95,7 +112,8 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
class TestMaintenanceOp : public MaintenanceOp {
public:
TestMaintenanceOp(const std::string& name,
- IOUsage io_usage)
+ IOUsage io_usage,
+ std::string table_id = "fake.table_id")
: MaintenanceOp(name, io_usage),
ram_anchored_(500),
logs_retained_bytes_(0),
@@ -105,12 +123,13 @@ class TestMaintenanceOp : public MaintenanceOp {
maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)),
remaining_runs_(1),
prepared_runs_(0),
- sleep_time_(MonoDelta::FromSeconds(0)) {
+ sleep_time_(MonoDelta::FromSeconds(0)),
+ table_id_(std::move(table_id)) {
}
- virtual ~TestMaintenanceOp() {}
+ ~TestMaintenanceOp() OVERRIDE = default;
- virtual bool Prepare() OVERRIDE {
+ bool Prepare() OVERRIDE {
std::lock_guard guard(lock_);
if (remaining_runs_ == 0) {
return false;
@@ -121,7 +140,7 @@ class TestMaintenanceOp : public MaintenanceOp {
return true;
}
- virtual void Perform() OVERRIDE {
+ void Perform() OVERRIDE {
{
std::lock_guard guard(lock_);
DLOG(INFO) << "Performing op " << name();
@@ -135,7 +154,7 @@ class TestMaintenanceOp : public MaintenanceOp {
SleepFor(sleep_time_);
}
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
+ void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
std::lock_guard guard(lock_);
stats->set_runnable(remaining_runs_ > 0);
stats->set_ram_anchored(ram_anchored_);
@@ -168,14 +187,18 @@ class TestMaintenanceOp : public MaintenanceOp {
perf_improvement_ = perf_improvement;
}
- virtual scoped_refptr DurationHistogram() const OVERRIDE {
+ scoped_refptr DurationHistogram() const OVERRIDE {
return maintenance_op_duration_;
}
- virtual scoped_refptr > RunningGauge() const OVERRIDE {
+ scoped_refptr > RunningGauge() const OVERRIDE {
return maintenance_ops_running_;
}
+ const std::string& table_id() const OVERRIDE {
+ return table_id_;
+ }
+
private:
Mutex lock_;
@@ -195,6 +218,7 @@ class TestMaintenanceOp : public MaintenanceOp {
// The amount of time each op invocation will sleep.
MonoDelta sleep_time_;
+ std::string table_id_;
};
// Create an op and wait for it to start running. Unregister it while it is
@@ -226,7 +250,7 @@ TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
// Set the op to run up to 10 times, and each time should sleep for a second.
op1.set_remaining_runs(10);
- op1.set_sleep_time(MonoDelta::FromSeconds(1));
+ op1.set_sleep_time(MonoDelta::FromMilliseconds(10));
manager_->RegisterOp(&op1);
// Wait until two instances of the ops start running, since we have two MM threads.
@@ -266,7 +290,7 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
const int64_t kMB = 1024 * 1024;
- manager_->Shutdown();
+ StopManager();
TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
op1.set_ram_anchored(0);
@@ -318,7 +342,7 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) {
TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
op.set_perf_improvement(10);
op.set_remaining_runs(2);
- op.set_sleep_time(MonoDelta::FromSeconds(1));
+ op.set_sleep_time(MonoDelta::FromMilliseconds(10));
manager_->RegisterOp(&op);
// Check that running instances are added to the maintenance manager's collection,
@@ -341,10 +365,11 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) {
manager_->GetMaintenanceManagerStatusDump(&status_pb);
ASSERT_EQ(status_pb.running_operations_size(), 0);
}
+
// Test adding operations and make sure that the history of recently completed operations
// is correct in that it wraps around and doesn't grow.
TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < kHistorySize + 1; i++) {
string name = Substitute("op$0", i);
TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
op.set_perf_improvement(1);
@@ -358,12 +383,120 @@ TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
MaintenanceManagerStatusPB status_pb;
manager_->GetMaintenanceManagerStatusDump(&status_pb);
- // The size should be at most the history_size.
- ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
+ // The size should equal to the current completed OP size,
+ // and should be at most the kHistorySize.
+ ASSERT_EQ(std::min(kHistorySize, i + 1), status_pb.completed_operations_size());
// The most recently completed op should always be first, even if we wrap
// around.
ASSERT_EQ(name, status_pb.completed_operations(0).name());
}
}
+// Test maintenance OP factors.
+// The OPs on different priority levels have different OP score multipliers.
+TEST_F(MaintenanceManagerTest, TestOpFactors) {
+ ASSERT_GE(FLAGS_max_priority_range, 1);
+ FLAGS_maintenance_manager_table_priorities
+ = Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4",
+ -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1);
+ TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1");
+ TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2");
+ TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3");
+ TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4");
+ TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5");
+ TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6");
+
+ manager_->UpdateTablePriorities();
+
+ ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -FLAGS_max_priority_range),
+ manager_->PerfImprovement(1, op1.table_id()));
+ ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -1),
+ manager_->PerfImprovement(1, op2.table_id()));
+ ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op3.table_id()));
+ ASSERT_DOUBLE_EQ(FLAGS_maintenance_op_multiplier, manager_->PerfImprovement(1, op4.table_id()));
+ ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, FLAGS_max_priority_range),
+ manager_->PerfImprovement(1, op5.table_id()));
+ ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op6.table_id()));
+}
+
+// Test priority OP launching.
+TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
+ StopManager();
+ StartManager(1);
+
+ ASSERT_NE("", gflags::SetCommandLineOption(
+ "maintenance_manager_table_priorities",
+ Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4",
+ -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1).c_str()));
+
+ TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1");
+ op1.set_perf_improvement(10);
+ op1.set_remaining_runs(1);
+ op1.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2");
+ op2.set_perf_improvement(10);
+ op2.set_remaining_runs(1);
+ op2.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3");
+ op3.set_perf_improvement(10);
+ op3.set_remaining_runs(1);
+ op3.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4");
+ op4.set_perf_improvement(10);
+ op4.set_remaining_runs(1);
+ op4.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5");
+ op5.set_perf_improvement(10);
+ op5.set_remaining_runs(1);
+ op5.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6");
+ op6.set_perf_improvement(12);
+ op6.set_remaining_runs(1);
+ op6.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+ manager_->RegisterOp(&op1);
+ manager_->RegisterOp(&op2);
+ manager_->RegisterOp(&op3);
+ manager_->RegisterOp(&op4);
+ manager_->RegisterOp(&op5);
+ manager_->RegisterOp(&op6);
+
+ ASSERT_EVENTUALLY([&]() {
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(status_pb.completed_operations_size(), 6);
+ });
+
+ // Wait for instances to complete.
+ manager_->UnregisterOp(&op1);
+ manager_->UnregisterOp(&op2);
+ manager_->UnregisterOp(&op3);
+ manager_->UnregisterOp(&op4);
+ manager_->UnregisterOp(&op5);
+ manager_->UnregisterOp(&op6);
+
+ // Check that running instances are removed from collection after completion.
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(status_pb.running_operations_size(), 0);
+ ASSERT_EQ(status_pb.completed_operations_size(), 6);
+ // In perf_improvement score ascending order, the latter completed OP will list former.
+ list ordered_ops({"op1",
+ "op2",
+ "op3",
+ "op4",
+ "op6",
+ "op5"});
+ ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size());
+ for (const auto& instance : status_pb.completed_operations()) {
+ ASSERT_EQ(ordered_ops.front(), instance.name());
+ ordered_ops.pop_front();
+ }
+}
+
} // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index ea607aa000..3231b31b6e 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -17,8 +17,9 @@
#include "kudu/util/maintenance_manager.h"
+#include
#include
-#include
+#include
#include
#include
#include
@@ -26,6 +27,7 @@
#include
#include
#include
+#include
#include
#include
@@ -33,6 +35,8 @@
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/debug/trace_logging.h"
@@ -50,6 +54,8 @@
using std::pair;
using std::string;
+using std::vector;
+using strings::Split;
using strings::Substitute;
DEFINE_int32(maintenance_manager_num_threads, 1,
@@ -91,6 +97,29 @@ DEFINE_double(data_gc_prioritization_prob, 0.5,
"such as delta compaction.");
TAG_FLAG(data_gc_prioritization_prob, experimental);
+DEFINE_string(maintenance_manager_table_priorities, "",
+ "Priorities of tables, semicolon-separated list of table-priority pairs, and each "
+ "table-priority pair is combined by table id, colon and priority level. Priority "
+ "level is ranged in [-FLAGS_max_priority_range, FLAGS_max_priority_range]");
+TAG_FLAG(maintenance_manager_table_priorities, advanced);
+TAG_FLAG(maintenance_manager_table_priorities, experimental);
+TAG_FLAG(maintenance_manager_table_priorities, runtime);
+
+DEFINE_double(maintenance_op_multiplier, 1.1,
+ "Multiplier applied on different priority levels, table maintenance OPs on level N "
+ "has multiplier of FLAGS_maintenance_op_multiplier^N, the last score will be "
+ "multiplied by this multiplier. Note: this multiplier is only take effect on "
+ "compaction OPs");
+TAG_FLAG(maintenance_op_multiplier, advanced);
+TAG_FLAG(maintenance_op_multiplier, experimental);
+TAG_FLAG(maintenance_op_multiplier, runtime);
+
+DEFINE_int32(max_priority_range, 5,
+ "Maximal priority range of OPs.");
+TAG_FLAG(max_priority_range, advanced);
+TAG_FLAG(max_priority_range, experimental);
+TAG_FLAG(max_priority_range, runtime);
+
namespace kudu {
MaintenanceOpStats::MaintenanceOpStats() {
@@ -107,7 +136,7 @@ void MaintenanceOpStats::Clear() {
last_modified_ = MonoTime();
}
-MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+MaintenanceOp::MaintenanceOp(string name, IOUsage io_usage)
: name_(std::move(name)),
running_(0),
cancel_(false),
@@ -129,10 +158,10 @@ MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const {
pb.set_thread_id(thread_id);
pb.set_name(name);
if (duration.Initialized()) {
- pb.set_duration_millis(duration.ToMilliseconds());
+ pb.set_duration_millis(static_cast(duration.ToMilliseconds()));
}
MonoDelta delta(MonoTime::Now() - start_mono_time);
- pb.set_millis_since_start(delta.ToMilliseconds());
+ pb.set_millis_since_start(static_cast(delta.ToMilliseconds()));
return pb;
}
@@ -143,7 +172,7 @@ const MaintenanceManager::Options MaintenanceManager::kDefaultOptions = {
};
MaintenanceManager::MaintenanceManager(const Options& options,
- std::string server_uuid)
+ string server_uuid)
: server_uuid_(std::move(server_uuid)),
num_threads_(options.num_threads <= 0 ?
FLAGS_maintenance_manager_num_threads : options.num_threads),
@@ -264,8 +293,7 @@ void MaintenanceManager::RunSchedulerThread() {
// 1) there are no free threads available to perform a maintenance op.
// or 2) we just tried to schedule an op but found nothing to run.
// However, if it's time to shut down, we want to do so immediately.
- while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || disabled_for_tests()) &&
- !shutdown_) {
+ while (CouldNotLaunchNewOp(prev_iter_found_no_work)) {
cond_.WaitFor(polling_interval);
prev_iter_found_no_work = false;
}
@@ -274,42 +302,47 @@ void MaintenanceManager::RunSchedulerThread() {
return;
}
- // Find the best op.
- auto best_op_and_why = FindBestOp();
- auto* op = best_op_and_why.first;
- const auto& note = best_op_and_why.second;
+ // TODO(yingchun): move it to SetFlag, callback once as a gflags setter handler.
+ UpdateTablePriorities();
// If we found no work to do, then we should sleep before trying again to schedule.
// Otherwise, we can go right into trying to find the next op.
- prev_iter_found_no_work = (op == nullptr);
- if (!op) {
- VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
- << "No maintenance operations look worth doing.";
- continue;
- }
+ prev_iter_found_no_work = !FindAndLaunchOp(&guard);
+ }
+}
- // Prepare the maintenance operation.
- op->running_++;
- running_ops_++;
- guard.unlock();
- bool ready = op->Prepare();
- guard.lock();
- if (!ready) {
- LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
- << ". Re-running scheduler.";
- op->running_--;
- running_ops_--;
- op->cond_->Signal();
- continue;
- }
+bool MaintenanceManager::FindAndLaunchOp(std::unique_lock* guard) {
+ // Find the best op.
+ auto best_op_and_why = FindBestOp();
+ auto* op = best_op_and_why.first;
+ const auto& note = best_op_and_why.second;
+
+ if (!op) {
+ VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
+ << "No maintenance operations look worth doing.";
+ return false;
+ }
- LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
- << Substitute("Scheduling $0: $1", op->name(), note);
- // Run the maintenance operation.
- Status s = thread_pool_->SubmitFunc(boost::bind(
- &MaintenanceManager::LaunchOp, this, op));
- CHECK(s.ok());
+ // Prepare the maintenance operation.
+ IncreaseOpCount(op);
+ guard->unlock();
+ bool ready = op->Prepare();
+ guard->lock();
+ if (!ready) {
+ LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+ << ". Re-running scheduler.";
+ DecreaseOpCount(op);
+ op->cond_->Signal();
+ return true;
}
+
+ LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
+ << Substitute("Scheduling $0: $1", op->name(), note);
+ // Run the maintenance operation.
+ Status s = thread_pool_->SubmitFunc(boost::bind(&MaintenanceManager::LaunchOp, this, op));
+ CHECK(s.ok());
+
+ return true;
}
// Finding the best operation goes through four filters:
@@ -335,8 +368,7 @@ void MaintenanceManager::RunSchedulerThread() {
pair MaintenanceManager::FindBestOp() {
TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
- size_t free_threads = num_threads_ - running_ops_;
- if (free_threads == 0) {
+ if (!HasFreeThreads()) {
return {nullptr, "no free threads"};
}
@@ -367,8 +399,8 @@ pair MaintenanceManager::FindBestOp() {
}
const auto logs_retained_bytes = stats.logs_retained_bytes();
- if (logs_retained_bytes > low_io_most_logs_retained_bytes &&
- op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
+ if (op->io_usage() == MaintenanceOp::LOW_IO_USAGE &&
+ logs_retained_bytes > low_io_most_logs_retained_bytes) {
low_io_most_logs_retained_bytes_op = op;
low_io_most_logs_retained_bytes = logs_retained_bytes;
VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
@@ -401,7 +433,7 @@ pair MaintenanceManager::FindBestOp() {
op->name(), data_retained_bytes);
}
- const auto perf_improvement = stats.perf_improvement();
+ const auto perf_improvement = PerfImprovement(stats.perf_improvement(), op->table_id());
if ((!best_perf_improvement_op) ||
(perf_improvement > best_perf_improvement)) {
best_perf_improvement_op = op;
@@ -420,7 +452,7 @@ pair MaintenanceManager::FindBestOp() {
double capacity_pct;
if (memory_pressure_func_(&capacity_pct)) {
if (!most_ram_anchored_op) {
- std::string msg = StringPrintf("System under memory pressure "
+ string msg = StringPrintf("System under memory pressure "
"(%.2f%% of limit used). However, there are no ops currently "
"runnable which would free memory.", capacity_pct);
KLOG_EVERY_N_SECS(WARNING, 5) << msg;
@@ -456,6 +488,16 @@ pair MaintenanceManager::FindBestOp() {
return {nullptr, "no ops with positive improvement"};
}
+double MaintenanceManager::PerfImprovement(double perf_improvement,
+ const string& table_id) const {
+ int32_t priority = 0;
+ if (!FindCopy(table_priorities_, table_id, &priority)) {
+ return perf_improvement;
+ }
+
+ return perf_improvement * std::pow(FLAGS_maintenance_op_multiplier, priority);
+}
+
void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
int64_t thread_id = Thread::CurrentThreadId();
OpInstance op_instance;
@@ -481,9 +523,7 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
completed_ops_count_++;
op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds());
-
- running_ops_--;
- op->running_--;
+ DecreaseOpCount(op);
op->cond_->Signal();
cond_.Signal(); // Wake up scheduler.
});
@@ -507,9 +547,6 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
DCHECK(out_pb != nullptr);
std::lock_guard guard(lock_);
- auto best_op_and_why = FindBestOp();
- auto* best_op = best_op_and_why.first;
-
for (const auto& val : ops_) {
MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
MaintenanceOp* op(val.first);
@@ -527,10 +564,6 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
op_pb->set_logs_retained_bytes(0);
op_pb->set_perf_improvement(0.0);
}
-
- if (best_op == op) {
- out_pb->mutable_best_op()->CopyFrom(*op_pb);
- }
}
{
@@ -540,8 +573,9 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
}
}
+ // The latest completed op will be dumped at first.
for (int n = 1; n <= completed_ops_.size(); n++) {
- int i = completed_ops_count_ - n;
+ int64_t i = completed_ops_count_ - n;
if (i < 0) break;
const auto& completed_op = completed_ops_[i % completed_ops_.size()];
@@ -551,8 +585,50 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
}
}
-std::string MaintenanceManager::LogPrefix() const {
+string MaintenanceManager::LogPrefix() const {
return Substitute("P $0: ", server_uuid_);
}
+bool MaintenanceManager::HasFreeThreads() {
+ return num_threads_ - running_ops_ > 0;
+}
+
+bool MaintenanceManager::CouldNotLaunchNewOp(bool prev_iter_found_no_work) {
+ return (!HasFreeThreads() || prev_iter_found_no_work || disabled_for_tests()) && !shutdown_;
+}
+
+void MaintenanceManager::UpdateTablePriorities() {
+ string table_priorities_str;
+ if (!google::GetCommandLineOption("maintenance_manager_table_priorities",
+ &table_priorities_str)) {
+ return;
+ }
+
+ TablePriorities table_priorities;
+ int32_t value;
+ for (auto table_priority_str : Split(table_priorities_str, ";", strings::SkipEmpty())) {
+ vector table_priority = Split(table_priority_str, ":", strings::SkipEmpty());
+ if (safe_strto32_base(table_priority[1].c_str(), &value, 10)) {
+ value = std::max(value, -FLAGS_max_priority_range);
+ value = std::min(value, FLAGS_max_priority_range);
+ table_priorities[table_priority[0]] = value;
+ } else {
+ LOG(WARNING) << "Some error occured when parse flag maintenance_manager_table_priorities: "
+ << table_priorities_str;
+ return;
+ }
+ }
+ table_priorities_.swap(table_priorities);
+}
+
+void MaintenanceManager::IncreaseOpCount(MaintenanceOp *op) {
+ op->running_++;
+ running_ops_++;
+}
+
+void MaintenanceManager::DecreaseOpCount(MaintenanceOp *op) {
+ op->running_--;
+ running_ops_--;
+}
+
} // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index c5dd8ac83c..e3452a2f46 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -230,6 +230,8 @@ class MaintenanceOp {
}
private:
+ virtual const std::string& table_id() const = 0;
+
DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
// The name of the operation. Op names must be unique.
@@ -302,8 +304,11 @@ class MaintenanceManager : public std::enable_shared_from_this OpMapTy;
+ typedef std::unordered_map TablePriorities;
// Return true if tests have currently disabled the maintenance
// manager by way of changing the gflags at runtime.
@@ -311,17 +316,32 @@ class MaintenanceManager : public std::enable_shared_from_this* guard);
+
// Find the best op, or null if there is nothing we want to run.
//
// Returns the op, as well as a string explanation of why that op was chosen,
// suitable for logging.
std::pair FindBestOp();
+ double PerfImprovement(double perf_improvement,
+ const std::string& table_id) const;
+
void LaunchOp(MaintenanceOp* op);
std::string LogPrefix() const;
+ bool HasFreeThreads();
+
+ bool CouldNotLaunchNewOp(bool prev_iter_found_no_work);
+
+ void UpdateTablePriorities();
+
+ void IncreaseOpCount(MaintenanceOp *op);
+ void DecreaseOpCount(MaintenanceOp *op);
+
const std::string server_uuid_;
+ TablePriorities table_priorities_;
const int32_t num_threads_;
OpMapTy ops_; // Registered operations.
Mutex lock_;
@@ -330,7 +350,7 @@ class MaintenanceManager : public std::enable_shared_from_this completed_ops_;
diff --git a/src/kudu/util/maintenance_manager.proto b/src/kudu/util/maintenance_manager.proto
index b6b1203908..77c8cf8fd1 100644
--- a/src/kudu/util/maintenance_manager.proto
+++ b/src/kudu/util/maintenance_manager.proto
@@ -26,7 +26,7 @@ message MaintenanceManagerStatusPB {
// Number of times this operation is currently running.
required uint32 running = 2;
required bool runnable = 3;
- required uint64 ram_anchored_bytes = 4;
+ required int64 ram_anchored_bytes = 4;
required int64 logs_retained_bytes = 5;
required double perf_improvement = 6;
}
@@ -40,15 +40,12 @@ message MaintenanceManagerStatusPB {
required int32 millis_since_start = 4;
}
- // The next operation that would run.
- optional MaintenanceOpPB best_op = 1;
-
// List of all the operations.
- repeated MaintenanceOpPB registered_operations = 2;
+ repeated MaintenanceOpPB registered_operations = 1;
// This list isn't in order of anything. Can contain the same operation multiple times.
- repeated OpInstancePB running_operations = 3;
+ repeated OpInstancePB running_operations = 2;
// This list isn't in order of anything. Can contain the same operation multiple times.
- repeated OpInstancePB completed_operations = 4;
+ repeated OpInstancePB completed_operations = 3;
}
diff --git a/src/kudu/util/yamlreader-test.cc b/src/kudu/util/yamlreader-test.cc
new file mode 100644
index 0000000000..8394cf4440
--- /dev/null
+++ b/src/kudu/util/yamlreader-test.cc
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/yamlreader.h"
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+#include "kudu/util/env.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+
+class YamlReaderTest : public KuduTest {
+public:
+ Status GenerateYamlReader(const string& content, unique_ptr* result) {
+ string fname = GetTestPath("YamlReaderTest.json");
+ unique_ptr writable_file;
+ RETURN_NOT_OK(env_->NewWritableFile(fname, &writable_file));
+ RETURN_NOT_OK(writable_file->Append(Slice(content)));
+ RETURN_NOT_OK(writable_file->Close());
+ result->reset(new YamlReader(fname));
+ return Status::OK();
+ }
+};
+
+TEST_F(YamlReaderTest, FileNotExist) {
+ YamlReader r("YamlReaderTest.NotExist");
+ Status s = r.Init();
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_STR_CONTAINS(
+ s.ToString(), "YAML::LoadFile error");
+}
+
+TEST_F(YamlReaderTest, Corruption) {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("foo", &r));
+ ASSERT_OK(r->Init());
+ int val = 0;
+ ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "foo", &val).IsCorruption());
+}
+
+TEST_F(YamlReaderTest, EmptyFile) {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("", &r));
+ ASSERT_OK(r->Init());
+
+ int val = 0;
+ ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "foo", &val).IsCorruption());
+}
+
+TEST_F(YamlReaderTest, KeyNotExist) {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("foo: 1", &r));
+ ASSERT_OK(r->Init());
+
+ int val = 0;
+ ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "bar", &val).IsNotFound());
+}
+
+TEST_F(YamlReaderTest, Scalar) {
+ {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("bool_val: false", &r));
+ ASSERT_OK(r->Init());
+ bool val = true;
+ ASSERT_OK(YamlReader::ExtractScalar(r->node(), "bool_val", &val));
+ ASSERT_EQ(val, false);
+ }
+
+ {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("int_val: 123", &r));
+ ASSERT_OK(r->Init());
+ int val = 0;
+ ASSERT_OK(YamlReader::ExtractScalar(r->node(), "int_val", &val));
+ ASSERT_EQ(val, 123);
+ }
+
+ {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("double_val: 123.456", &r));
+ ASSERT_OK(r->Init());
+ double val = 0.0;
+ ASSERT_OK(YamlReader::ExtractScalar(r->node(), "double_val", &val));
+ ASSERT_EQ(val, 123.456);
+ }
+
+ {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("string_val: hello yaml", &r));
+ ASSERT_OK(r->Init());
+ string val;
+ ASSERT_OK(YamlReader::ExtractScalar(r->node(), "string_val", &val));
+ ASSERT_EQ(val, "hello yaml");
+ }
+}
+
+TEST_F(YamlReaderTest, Map) {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader(
+ "map_val: { key1: hello yaml, key2: 123.456 , key3: 123, key4: false}", &r));
+ ASSERT_OK(r->Init());
+
+ YAML::Node node;
+ ASSERT_OK(YamlReader::ExtractMap(r->node(), "map_val", &node));
+
+ {
+ string val;
+ ASSERT_OK(YamlReader::ExtractScalar(&node, "key1", &val));
+ ASSERT_EQ(val, "hello yaml");
+ }
+
+ {
+ double val = 0.0;
+ ASSERT_OK(YamlReader::ExtractScalar(&node, "key2", &val));
+ ASSERT_EQ(val, 123.456);
+ }
+
+ {
+ int val = 0;
+ ASSERT_OK(YamlReader::ExtractScalar(&node, "key3", &val));
+ ASSERT_EQ(val, 123);
+ }
+
+ {
+ bool val = true;
+ ASSERT_OK(YamlReader::ExtractScalar(&node, "key4", &val));
+ ASSERT_EQ(val, false);
+ }
+
+ // Not exist key.
+ {
+ int val = 0;
+ ASSERT_TRUE(YamlReader::ExtractScalar(&node, "key5", &val).IsNotFound());
+ }
+}
+
+TEST_F(YamlReaderTest, Array) {
+ unique_ptr r;
+ ASSERT_OK(GenerateYamlReader("list_val: [1, 3, 5, 7, 9]", &r));
+ ASSERT_OK(r->Init());
+ const std::vector& expect_vals = { 1, 3, 5, 7, 9 };
+
+ std::vector vals;
+ ASSERT_OK(YamlReader::ExtractArray(r->node(), "list_val", &vals));
+ ASSERT_EQ(vals, expect_vals);
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/yamlreader.cc b/src/kudu/util/yamlreader.cc
new file mode 100644
index 0000000000..a3ee8e0d88
--- /dev/null
+++ b/src/kudu/util/yamlreader.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/yamlreader.h"
+
+#include
+#include
+
+#include
+// IWYU pragma: no_include
+// IWYU pragma: no_include
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+using strings::Substitute;
+using YAML::Node;
+using YAML::NodeType;
+
+namespace kudu {
+
+YamlReader::YamlReader(string filename) : filename_(std::move(filename)) {}
+
+Status YamlReader::Init() {
+ try {
+ node_ = YAML::LoadFile(filename_);
+ } catch (std::exception& e) {
+ return Status::Corruption(Substitute("YAML::LoadFile error: $0", e.what()));
+ }
+
+ return Status::OK();
+}
+
+Status YamlReader::ExtractMap(const Node* node,
+ const string& field,
+ Node* result) {
+ CHECK(result);
+ Node val;
+ RETURN_NOT_OK(ExtractField(node, field, &val));
+ if (PREDICT_FALSE(!val.IsMap())) {
+ return Status::Corruption(Substitute(
+ "wrong type during field extraction: expected map but got $0",
+ TypeToString(val.Type())));
+ }
+ *result = val;
+ return Status::OK();
+}
+
+Status YamlReader::ExtractField(const Node* node,
+ const string& field,
+ Node* result) {
+ if (PREDICT_FALSE(!node->IsDefined() || !node->IsMap())) {
+ return Status::Corruption("node is not map type");
+ }
+ try {
+ *result = (*node)[field];
+ } catch (std::exception& e) {
+ return Status::NotFound(Substitute("parse field $0 error: $1", field, e.what()));
+ }
+ if (PREDICT_FALSE(!result->IsDefined())) {
+ return Status::Corruption("Missing field", field);
+ }
+
+ return Status::OK();
+}
+
+const char* YamlReader::TypeToString(NodeType::value t) {
+ switch (t) {
+ case NodeType::Undefined:
+ return "undefined";
+ case NodeType::Null:
+ return "null";
+ case NodeType::Scalar:
+ return "scalar";
+ case NodeType::Sequence:
+ return "sequence";
+ case NodeType::Map:
+ return "map";
+ default:
+ LOG(FATAL) << "unexpected type: " << t;
+ }
+ return "";
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/yamlreader.h b/src/kudu/util/yamlreader.h
new file mode 100644
index 0000000000..2888f9fb3f
--- /dev/null
+++ b/src/kudu/util/yamlreader.h
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include
+#include
+#include
+
+#include
+// IWYU pragma: no_include
+// IWYU pragma: no_include
+// IWYU pragma: no_include
+// IWYU pragma: no_include
+// IWYU pragma: no_include
+// IWYU pragma: no_include
+#include // IWYU pragma: keep
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Wraps the YAML parsing functionality of YAML::Node.
+//
+// This class can read yaml content from a file, extract scalar, map and array type of values.
+class YamlReader {
+ public:
+ explicit YamlReader(std::string filename);
+ ~YamlReader() = default;
+
+ Status Init();
+
+ // Extractor methods.
+ //
+ // Look for a field with the name of 'field' in the given node.
+ // Return Status::OK if it can be found and extracted as the specified type,
+ // or return Status::NotFound if it cannot be found, or Status::Corruption if
+ // the node is not extractable or the field can not be extracted as the type.
+
+ template
+ static Status ExtractScalar(const YAML::Node* node,
+ const std::string& field,
+ T* result);
+
+ static Status ExtractMap(const YAML::Node* node,
+ const std::string& field,
+ YAML::Node* result);
+
+ template
+ static Status ExtractArray(const YAML::Node* node,
+ const std::string& field,
+ std::vector* result);
+
+ const YAML::Node* node() const { return &node_; }
+
+ private:
+ static const char* TypeToString(YAML::NodeType::value t);
+
+ static Status ExtractField(const YAML::Node* node,
+ const std::string& field,
+ YAML::Node* result);
+
+ std::string filename_;
+ YAML::Node node_;
+
+ DISALLOW_COPY_AND_ASSIGN(YamlReader);
+};
+
+template
+Status YamlReader::ExtractScalar(const YAML::Node* node,
+ const std::string& field,
+ T* result) {
+ CHECK(result);
+ YAML::Node val;
+ RETURN_NOT_OK(ExtractField(node, field, &val));
+ if (PREDICT_FALSE(!val.IsScalar())) {
+ return Status::Corruption(strings::Substitute(
+ "wrong type during field extraction: expected scalar but got $0",
+ TypeToString(val.Type())));
+ }
+ *result = val.as();
+ return Status::OK();
+}
+
+template
+Status YamlReader::ExtractArray(const YAML::Node* node,
+ const std::string& field,
+ std::vector* result) {
+ CHECK(result);
+ YAML::Node val;
+ RETURN_NOT_OK(ExtractField(node, field, &val));
+ if (PREDICT_FALSE(!val.IsSequence())) {
+ return Status::Corruption(strings::Substitute(
+ "wrong type during field extraction: expected sequence but got $0",
+ TypeToString(val.Type())));
+ }
+ result->reserve(val.size());
+ for (YAML::const_iterator iter = val.begin(); iter != val.end(); ++iter) {
+ try {
+ if (PREDICT_FALSE(!iter->IsScalar())) {
+ return Status::Corruption(strings::Substitute(
+ "wrong type during field extraction: expected scalar but got $0",
+ TypeToString(iter->Type())));
+ }
+ result->push_back(iter->as());
+ } catch (std::exception& e) {
+ return Status::Corruption(strings::Substitute("parse list element error: $0", e.what()));
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace kudu
diff --git a/thirdparty/build-definitions.sh b/thirdparty/build-definitions.sh
index 2ae2f59ae5..609b7b4e77 100644
--- a/thirdparty/build-definitions.sh
+++ b/thirdparty/build-definitions.sh
@@ -919,3 +919,32 @@ build_bison() {
make -j$PARALLEL $EXTRA_MAKEFLAGS install
popd
}
+
+build_yaml() {
+ YAML_SHARED_BDIR=$TP_BUILD_DIR/$YAML_NAME.shared$MODE_SUFFIX
+ YAML_STATIC_BDIR=$TP_BUILD_DIR/$YAML_NAME.static$MODE_SUFFIX
+ for SHARED in ON OFF; do
+ if [ $SHARED = "ON" ]; then
+ YAML_BDIR=$YAML_SHARED_BDIR
+ else
+ YAML_BDIR=$YAML_STATIC_BDIR
+ fi
+ mkdir -p $YAML_BDIR
+ pushd $YAML_BDIR
+ rm -rf CMakeCache.txt CMakeFiles/
+ CFLAGS="$EXTRA_CFLAGS -fPIC" \
+ CXXFLAGS="$EXTRA_CXXFLAGS -fPIC" \
+ LDFLAGS="$EXTRA_LDFLAGS" \
+ LIBS="$EXTRA_LIBS" \
+ cmake \
+ -DCMAKE_BUILD_TYPE=Release \
+ -DYAML_CPP_BUILD_TESTS=OFF \
+ -DYAML_CPP_BUILD_TOOLS=OFF \
+ -DBUILD_SHARED_LIBS=$SHARED \
+ -DCMAKE_INSTALL_PREFIX=$PREFIX \
+ $EXTRA_CMAKE_FLAGS \
+ $YAML_SOURCE
+ ${NINJA:-make} -j$PARALLEL $EXTRA_MAKEFLAGS install
+ popd
+ done
+}
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index 0fe31ca991..0c2d451896 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -100,6 +100,7 @@ else
"hadoop") F_HADOOP=1 ;;
"hive") F_HIVE=1 ;;
"sentry") F_SENTRY=1 ;;
+ "yaml") F_YAML=1 ;;
*) echo "Unknown module: $arg"; exit 1 ;;
esac
done
@@ -371,6 +372,10 @@ if [ -n "$F_UNINSTRUMENTED" -o -n "$F_THRIFT" ]; then
build_thrift
fi
+if [ -n "$F_UNINSTRUMENTED" -o -n "$F_YAML" ]; then
+ build_yaml
+fi
+
restore_env
# If we're on macOS best to exit here, otherwise single dependency builds will try to
@@ -551,6 +556,10 @@ if [ -n "$F_TSAN" -o -n "$F_THRIFT" ]; then
build_thrift
fi
+if [ -n "$F_TSAN" -o -n "$F_YAML" ]; then
+ build_yaml
+fi
+
restore_env
finish
diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh
index a6fa2b948e..7eeccdfd04 100755
--- a/thirdparty/download-thirdparty.sh
+++ b/thirdparty/download-thirdparty.sh
@@ -430,5 +430,11 @@ fetch_and_patch \
$SENTRY_SOURCE \
$SENTRY_PATCHLEVEL
+YAML_PATCHLEVEL=0
+fetch_and_patch \
+ $YAML_NAME.tar.gz \
+ $YAML_SOURCE \
+ $YAML_PATCHLEVEL
+
echo "---------------"
echo "Thirdparty dependencies downloaded successfully"
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index 01eea406dd..df92495aa3 100644
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -234,3 +234,8 @@ HADOOP_SOURCE=$TP_SOURCE_DIR/$HADOOP_NAME
SENTRY_VERSION=505b42e81a9d85c4ebe8db3f48ad7a6e824a5db5
SENTRY_NAME=sentry-$SENTRY_VERSION
SENTRY_SOURCE=$TP_SOURCE_DIR/$SENTRY_NAME
+
+YAML_VERSION=0.6.2
+YAML_NAME=yaml-cpp-yaml-cpp-$YAML_VERSION
+YAML_SOURCE=$TP_SOURCE_DIR/$YAML_NAME
+