diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c08e4c2dd..f0548096d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1222,6 +1222,13 @@ else() set(KRB5_REALM_OVERRIDE -Wl,-U,krb5_realm_override_loaded krb5_realm_override) endif() +## yaml +find_package(Yaml REQUIRED) +include_directories(SYSTEM ${YAML_INCLUDE_DIR}) +ADD_THIRDPARTY_LIB(yaml + STATIC_LIB "${YAML_STATIC_LIB}" + SHARED_LIB "${YAML_SHARED_LIB}") + ## Boost # We use a custom cmake module and not cmake's FindBoost. diff --git a/cmake_modules/FindYaml.cmake b/cmake_modules/FindYaml.cmake new file mode 100644 index 0000000000..cc73805ced --- /dev/null +++ b/cmake_modules/FindYaml.cmake @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +find_path(YAML_INCLUDE_DIR yaml-cpp/yaml.h + # make sure we don't accidentally pick up a different version + NO_CMAKE_SYSTEM_PATH + NO_SYSTEM_ENVIRONMENT_PATH) +find_library(YAML_STATIC_LIB libyaml-cpp.a + NO_CMAKE_SYSTEM_PATH + NO_SYSTEM_ENVIRONMENT_PATH) +find_library(YAML_SHARED_LIB libyaml-cpp.so + NO_CMAKE_SYSTEM_PATH + NO_SYSTEM_ENVIRONMENT_PATH) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(YAML REQUIRED_VARS + YAML_STATIC_LIB YAML_SHARED_LIB YAML_INCLUDE_DIR) diff --git a/java/kudu_style.xml b/java/config/checkstyle/checkstyle.xml similarity index 72% rename from java/kudu_style.xml rename to java/config/checkstyle/checkstyle.xml index d0b8d6c2e6..562955d052 100644 --- a/java/kudu_style.xml +++ b/java/config/checkstyle/checkstyle.xml @@ -19,10 +19,10 @@ // under the License. --> + "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN" + "https://checkstyle.org/dtds/configuration_1_3.dtd"> - + + + + + - + - + - - - + + @@ -76,19 +82,27 @@ - + - - + + + + - + - + + @@ -108,13 +122,32 @@ + + + + + + + + + + + + + + + + + + + - + - + + + + + - + - + @@ -183,28 +221,32 @@ - - + - + - + + + @@ -213,7 +255,8 @@ - + diff --git a/java/checkstyle_suppressions.xml b/java/config/checkstyle/suppressions.xml similarity index 100% rename from java/checkstyle_suppressions.xml rename to java/config/checkstyle/suppressions.xml diff --git a/java/gradle/quality.gradle b/java/gradle/quality.gradle index c92d0b230a..8bdc21aa3f 100644 --- a/java/gradle/quality.gradle +++ b/java/gradle/quality.gradle @@ -25,12 +25,8 @@ apply plugin: "ru.vyarus.animalsniffer" // Ensures Java code uses APIs from a pa apply plugin: "scalafmt" // Automatically formats Scala code on each build. apply plugin: "net.ltgt.errorprone" // Performs static code analysis to look for bugs in Java code. - checkstyle { - configFile = file("$rootDir/kudu_style.xml") - configProperties = [ - "checkstyle.suppressions.file" : "$rootDir/checkstyle_suppressions.xml" - ] + configDir = file("$rootProject.projectDir/config/checkstyle") ignoreFailures = true showViolations = true } diff --git a/java/gradle/shadow.gradle b/java/gradle/shadow.gradle index f89b98d56d..63e7c1152a 100644 --- a/java/gradle/shadow.gradle +++ b/java/gradle/shadow.gradle @@ -41,12 +41,20 @@ configurations.archives.artifacts.removeAll { it instanceof ArchivePublishArtifact && it.archiveTask == jar } -shadowJar { - dependencies { - // Our shaded jars always try to pull in the slf4j api from - // kudu-client, though we never want it included. Excluding it - // here prevents the need to always list it. - exclude dependency(libs.slf4jApi) +// Define an overridable property to indicate tool jars that should +// include all of their dependencies. +// We use this below to ensure slf4j is included. +shadow.ext { + isToolJar = false +} +if (!shadow.isToolJar) { + shadowJar { + dependencies { + // Our shaded library jars always try to pull in the slf4j api from + // kudu-client, though we never want it included. Excluding it + // here prevents the need to always list it. + exclude dependency(libs.slf4jApi) + } } } diff --git a/java/kudu-backup-tools/build.gradle b/java/kudu-backup-tools/build.gradle new file mode 100644 index 0000000000..850060c52d --- /dev/null +++ b/java/kudu-backup-tools/build.gradle @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +apply plugin: "scala" +apply from: "$rootDir/gradle/protobuf.gradle" +apply from: "$rootDir/gradle/shadow.gradle" + +// Mark this as a tool jar so shadow doesn't exclude any dependencies. +shadow { + isToolJar = true +} + +dependencies { + compile project(path: ":kudu-client", configuration: "shadow") + compile libs.protobufJava + compile (libs.protobufJavaUtil) { + // Make sure wrong Guava version is not pulled in. + exclude group: "com.google.guava", module: "guava" + } + compile libs.hadoopCommon + compile (libs.scopt) { + // Make sure wrong Scala version is not pulled in. + exclude group: "org.scala-lang", module: "scala-library" + } + compile libs.scalaLibrary + compile libs.slf4jApi + + optional libs.yetusAnnotations + + testCompile project(path: ":kudu-test-utils", configuration: "shadow") + testCompile libs.junit + testCompile libs.log4j + testCompile libs.scalatest + testCompile libs.slf4jLog4j12 +} + +// Add protobuf files to the proto source set. +sourceSets { + main { + proto { + srcDir "src/main/protobuf" + } + } +} + +// kudu-backup-tools has no public Javadoc. +javadoc { + enabled = false +} + +// Skip publishing kudu-backup-tools until it's ready to be supported long-term. +uploadArchives.enabled = false \ No newline at end of file diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup-tools/src/main/protobuf/backup.proto similarity index 100% rename from java/kudu-backup/src/main/protobuf/backup.proto rename to java/kudu-backup-tools/src/main/protobuf/backup.proto diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupGraph.scala similarity index 100% rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupGraph.scala diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala similarity index 84% rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala index 82578ad82d..43a359d34a 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala +++ b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/BackupIO.scala @@ -26,15 +26,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.LocatedFileStatus import org.apache.hadoop.fs.Path -import org.apache.kudu.Schema import org.apache.kudu.backup.Backup.TableMetadataPB -import org.apache.kudu.backup.SessionIO._ +import org.apache.kudu.backup.BackupIO._ import org.apache.kudu.client.KuduTable -import org.apache.kudu.spark.kudu.SparkUtil -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.ByteType -import org.apache.spark.sql.types.StructField -import org.apache.spark.sql.types.StructType import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability import org.slf4j.Logger @@ -65,46 +59,19 @@ import scala.collection.mutable */ @InterfaceAudience.Private @InterfaceStability.Unstable -class SessionIO(val session: SparkSession, options: CommonOptions) { +class BackupIO(val conf: Configuration, rootPathStr: String) { val log: Logger = LoggerFactory.getLogger(getClass) - val conf: Configuration = session.sparkContext.hadoopConfiguration - val rootPath: Path = new Path(options.rootPath) + val rootPath: Path = new Path(rootPathStr) val fs: FileSystem = rootPath.getFileSystem(conf) - /** - * Returns the Spark schema for backup data based on the Kudu Schema. - * Additionally handles adding the RowAction column for incremental backup/restore. - */ - def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = { - var fields = SparkUtil.sparkSchema(schema).fields - if (includeRowAction) { - val changeTypeField = generateRowActionColumn(schema) - fields = fields ++ Seq(changeTypeField) - } - StructType(fields) - } - - /** - * Generates a RowAction column and handles column name collisions. - * The column name can vary because it's accessed positionally. - */ - private def generateRowActionColumn(schema: Schema): StructField = { - var columnName = "backup_row_action" - // If the column already exists and we need to pick an alternate column name. - while (schema.hasColumn(columnName)) { - columnName += "_" - } - StructField(columnName, ByteType) - } - /** * Return the path to the table directory. */ def tablePath(table: KuduTable): Path = { val tableName = URLEncoder.encode(table.getName, "UTF-8") val dirName = s"${table.getTableId}-$tableName" - new Path(options.rootPath, dirName) + new Path(rootPath, dirName) } /** @@ -269,7 +236,7 @@ class SessionIO(val session: SparkSession, options: CommonOptions) { } } -object SessionIO { +object BackupIO { // The name of the metadata file within a backup directory. val MetadataFileName = ".kudu-metadata.json" } diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala similarity index 98% rename from java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala rename to java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala index cfd3933c7c..6fc49d3e27 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala +++ b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/TableMetadata.scala @@ -49,7 +49,11 @@ object TableMetadata { val MetadataFileName = ".kudu-metadata.json" val MetadataVersion = 1 - def getTableMetadata(table: KuduTable, options: BackupOptions): TableMetadataPB = { + def getTableMetadata( + table: KuduTable, + fromMs: Long, + toMs: Long, + format: String): TableMetadataPB = { val columnIds = new util.HashMap[String, Integer]() val columns = table.getSchema.getColumns.asScala.map { col => columnIds.put(col.getName, table.getSchema.getColumnId(col.getName)) @@ -87,9 +91,9 @@ object TableMetadata { TableMetadataPB .newBuilder() .setVersion(MetadataVersion) - .setFromMs(options.fromMs) - .setToMs(options.toMs) - .setDataFormat(options.format) + .setFromMs(fromMs) + .setToMs(toMs) + .setDataFormat(format) .setTableName(table.getName) .setTableId(table.getTableId) .addAllColumns(columns.asJava) diff --git a/java/kudu-backup-tools/src/test/resources/log4j.properties b/java/kudu-backup-tools/src/test/resources/log4j.properties new file mode 100644 index 0000000000..129752c9a3 --- /dev/null +++ b/java/kudu-backup-tools/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger = INFO, out +log4j.appender.out = org.apache.log4j.ConsoleAppender +log4j.appender.out.layout = org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n + +log4j.logger.org.apache.kudu = DEBUG diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala b/java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala similarity index 82% rename from java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala rename to java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala index 480d2ebf90..314a06324b 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala +++ b/java/kudu-backup-tools/src/test/scala/org/apache/kudu/backup/TestBackupGraph.scala @@ -16,19 +16,40 @@ // under the License. package org.apache.kudu.backup +import com.google.common.collect.ImmutableList +import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.client.KuduTable -import org.apache.kudu.spark.kudu._ +import org.apache.kudu.test.ClientTestUtil.getBasicSchema +import org.apache.kudu.test.KuduTestHarness import org.junit.Assert._ +import org.junit.Before +import org.junit.Rule import org.junit.Test import org.slf4j.Logger import org.slf4j.LoggerFactory -class TestBackupGraph extends KuduTestSuite { +import scala.annotation.meta.getter + +class TestBackupGraph { val log: Logger = LoggerFactory.getLogger(getClass) + var tableName: String = "TestBackupGraph" + var table: KuduTable = _ + + @(Rule @getter) + val harness = new KuduTestHarness + + @Before + def setUp(): Unit = { + // Create the test table. + val builder = new CreateTableOptions().setNumReplicas(3) + builder.setRangePartitionColumns(ImmutableList.of("key")) + table = harness.getClient.createTable(tableName, getBasicSchema, builder) + } + @Test def testSimpleBackupGraph() { - val graph = new BackupGraph(table.getName) + val graph = new BackupGraph(table.getTableId) val full = createBackupVertex(table, 0, 1) graph.addBackup(full) @@ -52,7 +73,7 @@ class TestBackupGraph extends KuduTestSuite { @Test def testForkingBackupGraph() { - val graph = new BackupGraph(table.getName) + val graph = new BackupGraph(table.getTableId) val full = createBackupVertex(table, 0, 1) graph.addBackup(full) // Duplicate fromMs of 1 creates a fork in the graph. @@ -81,7 +102,7 @@ class TestBackupGraph extends KuduTestSuite { @Test def testMultiFullBackupGraph() { - val graph = new BackupGraph(table.getName) + val graph = new BackupGraph(table.getTableId) val full1 = createBackupVertex(table, 0, 1) graph.addBackup(full1) val inc1 = createBackupVertex(table, 1, 2) @@ -131,14 +152,7 @@ class TestBackupGraph extends KuduTestSuite { } private def createBackupVertex(table: KuduTable, fromMs: Long, toMs: Long): BackupNode = { - val options = new BackupOptions( - tables = Seq(table.getName), - rootPath = "foo/path", - "fooAddresses", - fromMs = fromMs, - toMs = toMs - ) - val metadata = TableMetadata.getTableMetadata(table, options) + val metadata = TableMetadata.getTableMetadata(table, fromMs, toMs, "parquet") BackupNode(null, metadata) } } diff --git a/java/kudu-backup/build.gradle b/java/kudu-backup/build.gradle index 53b59af1b4..514c29d3f1 100644 --- a/java/kudu-backup/build.gradle +++ b/java/kudu-backup/build.gradle @@ -16,17 +16,16 @@ // under the License. apply plugin: "scala" -apply from: "$rootDir/gradle/protobuf.gradle" apply from: "$rootDir/gradle/shadow.gradle" dependencies { + // Note: We don't use the shaded version, so we can control the dependencies. + compile(project(path: ":kudu-backup-tools")) { + // Ensure we use the hadoop-client provided by Spark to avoid any compatibility issues. + exclude group: "org.apache.hadoop", module: "hadoop-common" + } compile project(path: ":kudu-client", configuration: "shadow") compile project(path: ":kudu-spark", configuration: "shadow") - compile libs.protobufJava - compile (libs.protobufJavaUtil) { - // Make sure wrong Guava version is not pulled in. - exclude group: "com.google.guava", module: "guava" - } compile (libs.scopt) { // Make sure wrong Scala version is not pulled in. exclude group: "org.scala-lang", module: "scala-library" @@ -48,15 +47,6 @@ dependencies { testCompile libs.slf4jLog4j12 } -// Add protobuf files to the proto source set. -sourceSets { - main { - proto { - srcDir "src/main/protobuf" - } - } -} - // Adjust the artifact name to match the maven build. archivesBaseName = "kudu-backup${versions.sparkBase}_${versions.scalaBase}" diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala new file mode 100644 index 0000000000..4b1def6e6d --- /dev/null +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.kudu.backup + +import org.apache.kudu.Schema +import org.apache.kudu.spark.kudu.SparkUtil +import org.apache.spark.sql.types.ByteType +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType + +object BackupUtils { + + /** + * Returns the Spark schema for backup data based on the Kudu Schema. + * Additionally handles adding the RowAction column for incremental backup/restore. + */ + def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = { + var fields = SparkUtil.sparkSchema(schema).fields + if (includeRowAction) { + val changeTypeField = generateRowActionColumn(schema) + fields = fields ++ Seq(changeTypeField) + } + StructType(fields) + } + + /** + * Generates a RowAction column and handles column name collisions. + * The column name can vary because it's accessed positionally. + */ + private def generateRowActionColumn(schema: Schema): StructField = { + var columnName = "backup_row_action" + // If the column already exists and we need to pick an alternate column name. + while (schema.hasColumn(columnName)) { + columnName += "_" + } + StructField(columnName, ByteType) + } + +} diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala index 2b0d84baf2..180e7c82a2 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala @@ -41,7 +41,7 @@ object KuduBackup { options.kuduMasterAddresses, session.sparkContext ) - val io = new SessionIO(session, options) + val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath) // Read the required backup metadata. val backupGraphs = @@ -97,7 +97,7 @@ object KuduBackup { val rdd = new KuduBackupRDD(table, tableOptions, incremental, context, session.sparkContext) val df = session.sqlContext - .createDataFrame(rdd, io.dataSchema(table.getSchema, incremental)) + .createDataFrame(rdd, BackupUtils.dataSchema(table.getSchema, incremental)) // Write the data to the backup path. // The backup path contains the timestampMs and should not already exist. @@ -108,7 +108,8 @@ object KuduBackup { // Generate and output the new metadata for this table. // The existence of metadata indicates this backup was successful. - val tableMetadata = TableMetadata.getTableMetadata(table, tableOptions) + val tableMetadata = TableMetadata + .getTableMetadata(table, tableOptions.fromMs, tableOptions.toMs, tableOptions.format) io.writeTableMetadata(tableMetadata, metadataPath) } } diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala index 7f1e515d84..1a5886f7da 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala @@ -47,7 +47,7 @@ object KuduRestore { options.kuduMasterAddresses, session.sparkContext ) - val io = new SessionIO(session, options) + val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath) // Read the required backup metadata. val backupGraphs = io.readBackupGraphsByTableName(options.tables, options.timestampMs) @@ -80,7 +80,7 @@ object KuduRestore { createTableRangePartitionByRangePartition(restoreName, lastMetadata, context) } } - val backupSchema = io.dataSchema(TableMetadata.getKuduSchema(metadata)) + val backupSchema = BackupUtils.dataSchema(TableMetadata.getKuduSchema(metadata)) val rowActionCol = backupSchema.fields.last.name val table = context.syncClient.openTable(restoreName) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala index df9eaeebc6..8bcbef4732 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala @@ -23,14 +23,6 @@ import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability import scopt.OptionParser -@InterfaceAudience.Private -@InterfaceStability.Unstable -trait CommonOptions { - val tables: Seq[String] - val rootPath: String - val kuduMasterAddresses: String -} - @InterfaceAudience.Private @InterfaceStability.Unstable case class BackupOptions( @@ -46,7 +38,6 @@ case class BackupOptions( scanLeaderOnly: Boolean = BackupOptions.DefaultScanLeaderOnly, scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching, keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs) - extends CommonOptions object BackupOptions { val DefaultForceFull: Boolean = false @@ -166,8 +157,7 @@ case class RestoreOptions( kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName, tableSuffix: String = "", createTables: Boolean = RestoreOptions.DefaultCreateTables, - timestampMs: Long = System.currentTimeMillis() -) extends CommonOptions + timestampMs: Long = System.currentTimeMillis()) object RestoreOptions { val DefaultCreateTables: Boolean = true diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index 5c769ce469..8a066ac04a 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -572,7 +572,7 @@ class TestKuduBackup extends KuduTestSuite { options: BackupOptions, expectedRowCount: Long, expectIncremental: Boolean): Unit = { - val io = new SessionIO(ss, options) + val io = new BackupIO(ss.sparkContext.hadoopConfiguration, options.rootPath) val tableName = options.tables.head val table = harness.getClient.openTable(tableName) val backupPath = io.backupPath(table, options.toMs) @@ -587,7 +587,7 @@ class TestKuduBackup extends KuduTestSuite { } // Verify the output data. - val schema = io.dataSchema(table.getSchema, expectIncremental) + val schema = BackupUtils.dataSchema(table.getSchema, expectIncremental) val df = ss.sqlContext.read .format(metadata.getDataFormat) .schema(schema) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index afc54d1189..668f2387d1 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -35,6 +35,7 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Arrays; +import java.util.function.Consumer; import java.util.HashSet; import java.util.List; import java.util.Random; @@ -75,6 +76,7 @@ import org.apache.kudu.master.Master; import org.apache.kudu.master.Master.GetTableLocationsResponsePB; import org.apache.kudu.master.Master.TableIdentifierPB; +import org.apache.kudu.master.Master.TSInfoPB; import org.apache.kudu.util.AsyncUtil; import org.apache.kudu.util.NetUtil; import org.apache.kudu.util.Pair; @@ -2112,6 +2114,7 @@ public Object call(final GetTableLocationsResponsePB response) { partitionKey, requestedBatchSize, response.getTabletLocationsList(), + response.getTsInfosList(), response.getTtlMillis()); } catch (KuduException e) { return e; @@ -2146,12 +2149,13 @@ private void releaseMasterLookupPermit() { } /** - * Makes discovered tablet locations visible in the clients caches. + * Makes discovered tablet locations visible in the client's caches. * @param table the table which the locations belong to * @param requestPartitionKey the partition key of the table locations request * @param requestedBatchSize the number of tablet locations requested from the master in the * original request * @param locations the discovered locations + * @param tsInfosList a list of ts info that the replicas in 'locations' references by index. * @param ttl the ttl of the locations */ @InterfaceAudience.LimitedPrivate("Test") @@ -2159,8 +2163,8 @@ void discoverTablets(KuduTable table, byte[] requestPartitionKey, int requestedBatchSize, List locations, + List tsInfosList, long ttl) throws KuduException { - // TODO(todd): handle "interned" response here String tableId = table.getTableId(); String tableName = table.getName(); @@ -2179,20 +2183,48 @@ void discoverTablets(KuduTable table, // Build the list of discovered remote tablet instances. If we have // already discovered the tablet, its locations are refreshed. + int numTsInfos = tsInfosList.size(); List tablets = new ArrayList<>(locations.size()); for (Master.TabletLocationsPB tabletPb : locations) { - List lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount()); + List lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount()); List servers = new ArrayList<>(tabletPb.getReplicasCount()); - for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) { + + // Lambda that does the common handling of a ts info. + Consumer updateServersAndCollectExceptions = tsInfo -> { try { - ServerInfo serverInfo = resolveTS(replica.getTsInfo()); + ServerInfo serverInfo = resolveTS(tsInfo); if (serverInfo != null) { servers.add(serverInfo); } } catch (UnknownHostException ex) { lookupExceptions.add(ex); } + }; + + // Handle "old-style" non-interned replicas. + for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) { + updateServersAndCollectExceptions.accept(replica.getTsInfo()); + } + + // Handle interned replicas. As a shim, we also need to create a list of "old-style" ReplicaPBs + // to be stored inside the RemoteTablet. + // TODO(wdberkeley): Change this so ReplicaPBs aren't used by the client at all anymore. + List replicas = new ArrayList<>(); + for (Master.TabletLocationsPB.InternedReplicaPB replica : tabletPb.getInternedReplicasList()) { + int tsInfoIdx = replica.getTsInfoIdx(); + if (tsInfoIdx >= numTsInfos) { + lookupExceptions.add(new NonRecoverableException(Status.Corruption( + String.format("invalid response from master: referenced tablet idx %d but only %d present", + tsInfoIdx, numTsInfos)))); + continue; + } + TSInfoPB tsInfo = tsInfosList.get(tsInfoIdx); + updateServersAndCollectExceptions.accept(tsInfo); + Master.TabletLocationsPB.ReplicaPB.Builder builder = Master.TabletLocationsPB.ReplicaPB.newBuilder(); + builder.setRole(replica.getRole()); + builder.setTsInfo(tsInfo); + replicas.add(builder.build()); } if (!lookupExceptions.isEmpty() && @@ -2202,7 +2234,11 @@ void discoverTablets(KuduTable table, throw new NonRecoverableException(statusIOE); } - RemoteTablet rt = new RemoteTablet(tableId, tabletPb, servers); + RemoteTablet rt = new RemoteTablet(tableId, + tabletPb.getTabletId().toStringUtf8(), + ProtobufHelper.pbToPartition(tabletPb.getPartition()), + replicas.isEmpty() ? tabletPb.getReplicasList() : replicas, + servers); LOG.debug("Learned about tablet {} for table '{}' with partition {}", rt.getTabletId(), tableName, rt.getPartition()); diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java index a7fe825228..3be2770532 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java @@ -91,6 +91,7 @@ Message createRequestPB() { builder.setPartitionKeyEnd(UnsafeByteOperations.unsafeWrap(endKey)); } builder.setMaxReturnedLocations(maxReturnedLocations); + builder.setInternTsInfosInResponse(true); return builder.build(); } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java index 655f800692..ae319034b3 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java @@ -107,6 +107,8 @@ public String toString() { @InterfaceAudience.Public @InterfaceStability.Evolving public static class Replica { + // TODO(wdberkeley): The ReplicaPB is deprecated server-side, so we ought to redo how this + // class stores its information. private final ReplicaPB pb; Replica(ReplicaPB pb) { diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java index fd0eb3a758..2241f586cb 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java @@ -69,11 +69,13 @@ public class RemoteTablet implements Comparable { private String leaderUuid; RemoteTablet(String tableId, - Master.TabletLocationsPB tabletLocations, + String tabletId, + Partition partition, + List replicas, List serverInfos) { - this.tabletId = tabletLocations.getTabletId().toStringUtf8(); + this.tabletId = tabletId; this.tableId = tableId; - this.partition = ProtobufHelper.pbToPartition(tabletLocations.getPartition()); + this.partition = partition; this.tabletServers = new HashMap<>(serverInfos.size()); for (ServerInfo serverInfo : serverInfos) { @@ -81,18 +83,18 @@ public class RemoteTablet implements Comparable { } ImmutableList.Builder replicasBuilder = new ImmutableList.Builder<>(); - for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) { + for (Master.TabletLocationsPB.ReplicaPB replica : replicas) { String uuid = replica.getTsInfo().getPermanentUuid().toStringUtf8(); replicasBuilder.add(new LocatedTablet.Replica(replica)); if (replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) { - leaderUuid = uuid; + this.leaderUuid = uuid; } } if (leaderUuid == null) { LOG.warn("No leader provided for tablet {}", getTabletId()); } - replicas.set(replicasBuilder.build()); + this.replicas.set(replicasBuilder.build()); } @Override diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java index d79d114fb7..bef643d5b7 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java @@ -173,7 +173,8 @@ public void testBadHostnames() throws Exception { try { KuduTable badTable = new KuduTable(asyncClient, "Invalid table name", "Invalid table ID", null, null, 3); - asyncClient.discoverTablets(badTable, null, requestBatchSize, tabletLocations, 1000); + asyncClient.discoverTablets(badTable, null, requestBatchSize, + tabletLocations, new ArrayList<>(), 1000); fail("This should have failed quickly"); } catch (NonRecoverableException ex) { assertTrue(ex.getMessage().contains(badHostname)); @@ -205,7 +206,8 @@ public void testNoLeader() throws Exception { "master", leader.getRpcHost(), leader.getRpcPort(), Metadata.RaftPeerPB.Role.FOLLOWER)); tabletLocations.add(tabletPb.build()); try { - asyncClient.discoverTablets(table, new byte[0], requestBatchSize, tabletLocations, 1000); + asyncClient.discoverTablets(table, new byte[0], requestBatchSize, + tabletLocations, new ArrayList<>(), 1000); fail("discoverTablets should throw an exception if there's no leader"); } catch (NoLeaderFoundException ex) { // Expected. diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java index 43ca37ac38..c4372c1b47 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java @@ -223,10 +223,8 @@ private RemoteTablet getTablet(int leaderIndex) { static RemoteTablet getTablet(int leaderIndex, int localReplicaIndex, int sameLocationReplicaIndex) { - Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder(); - - tabletPb.setPartition(ProtobufUtils.getFakePartitionPB()); - tabletPb.setTabletId(ByteString.copyFromUtf8("fake tablet")); + Partition partition = ProtobufHelper.pbToPartition(ProtobufUtils.getFakePartitionPB().build()); + List replicas = new ArrayList<>(); List servers = new ArrayList<>(); for (int i = 0; i < 3; i++) { InetAddress addr; @@ -246,11 +244,11 @@ static RemoteTablet getTablet(int leaderIndex, new HostAndPort("host", 1000 + i), addr, location)); - tabletPb.addReplicas(ProtobufUtils.getFakeTabletReplicaPB( - uuid, "host", i, - leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : Metadata.RaftPeerPB.Role.FOLLOWER)); + Metadata.RaftPeerPB.Role role = leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : + Metadata.RaftPeerPB.Role.FOLLOWER; + replicas.add(ProtobufUtils.getFakeTabletReplicaPB(uuid, "host", i, role).build()); } - return new RemoteTablet("fake table", tabletPb.build(), servers); + return new RemoteTablet("fake table", "fake tablet", partition, replicas, servers); } } diff --git a/java/kudu-hive/build.gradle b/java/kudu-hive/build.gradle index 3d4bb2bf8f..31ce5775f3 100644 --- a/java/kudu-hive/build.gradle +++ b/java/kudu-hive/build.gradle @@ -36,7 +36,4 @@ dependencies { // kudu-hive has no public Javadoc. javadoc { enabled = false -} - -// Skip publishing kudu-hive until it's ready to be supported long-term. -uploadArchives.enabled = false \ No newline at end of file +} \ No newline at end of file diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala index 37e26248e4..902ababc6b 100644 --- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala +++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/DistributedDataGeneratorTest.scala @@ -22,9 +22,8 @@ import org.apache.kudu.spark.kudu.KuduTestSuite import org.apache.kudu.test.RandomUtils import org.apache.kudu.util.DecimalUtil import org.apache.kudu.util.SchemaGenerator +import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.scheduler.SparkListenerTaskEnd import org.apache.spark.sql.Row import org.junit.Test import org.junit.Assert.assertEquals @@ -88,15 +87,6 @@ class DistributedDataGeneratorTest extends KuduTestSuite { @Test def testNumTasks() { - // Add a SparkListener to count the number of tasks that end. - var actualNumTasks = 0 - val listener = new SparkListener { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - actualNumTasks += 1 - } - } - ss.sparkContext.addSparkListener(listener) - val numTasks = 8 val numRows = 100 val args = Array( @@ -104,22 +94,16 @@ class DistributedDataGeneratorTest extends KuduTestSuite { s"--num-tasks=$numTasks", randomTableName, harness.getMasterAddressesAsString) - runGeneratorTest(args) + // count the number of tasks that end. + val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ => + runGeneratorTest(args) + } assertEquals(numTasks, actualNumTasks) } @Test def testNumTasksRepartition(): Unit = { - // Add a SparkListener to count the number of tasks that end. - var actualNumTasks = 0 - val listener = new SparkListener { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - actualNumTasks += 1 - } - } - ss.sparkContext.addSparkListener(listener) - val numTasks = 8 val numRows = 100 val args = Array( @@ -128,7 +112,11 @@ class DistributedDataGeneratorTest extends KuduTestSuite { "--repartition=true", randomTableName, harness.getMasterAddressesAsString) - runGeneratorTest(args) + + // count the number of tasks that end. + val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ => + runGeneratorTest(args) + } val table = kuduContext.syncClient.openTable(randomTableName) val numPartitions = new KuduPartitioner.KuduPartitionerBuilder(table).build().numPartitions() diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 6afd32cb3d..101546f171 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -33,8 +33,7 @@ import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.Schema import org.apache.kudu.Type import org.apache.kudu.test.RandomUtils -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.scheduler.SparkListenerTaskEnd +import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter import org.apache.spark.sql.execution.datasources.LogicalRelation import org.junit.Before import org.junit.Test @@ -428,15 +427,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { options.addSplitRow(split) val table = kuduClient.createTable(tableName, simpleSchema, options) - // Add a SparkListener to count the number of tasks that end. - var actualNumTasks = 0 - val listener = new SparkListener { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - actualNumTasks += 1 - } - } - ss.sparkContext.addSparkListener(listener) - val random = Random.javaRandomToRandom(RandomUtils.getRandom) val data = random.shuffle( Seq( @@ -459,10 +449,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // Capture the rows so we can validate the insert order. kuduContext.captureRows = true - kuduContext.insertRows( - dataDF, - tableName, - new KuduWriteOptions(repartition = true, repartitionSort = repartitionSort)) + // Count the number of tasks that end. + val actualNumTasks = withJobTaskCounter(ss.sparkContext) { _ => + kuduContext.insertRows( + dataDF, + tableName, + new KuduWriteOptions(repartition = true, repartitionSort = repartitionSort)) + } + // 2 tasks from the parallelize call, and 2 from the repartitioning. assertEquals(4, actualNumTasks) val rows = kuduContext.rowsAccumulator.value.asScala diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala new file mode 100644 index 0000000000..b9bb8857bf --- /dev/null +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/SparkListenerUtil.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kudu.spark.kudu + +import org.apache.kudu.test.junit.AssertHelpers +import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.scheduler.SparkListenerJobEnd +import org.apache.spark.scheduler.SparkListenerTaskEnd + +object SparkListenerUtil { + + // TODO: Use org.apache.spark.TestUtils.withListener if it becomes public test API + def withJobTaskCounter(sc: SparkContext)(body: Any => Unit): Int = { + // Add a SparkListener to count the number of tasks that end. + var numTasks = 0 + var jobDone = false + val listener: SparkListener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + numTasks += 1 + } + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobDone = true + } + } + sc.addSparkListener(listener) + try { + body() + } finally { + // Because the SparkListener events are processed on an async queue which is behind + // private API, we use the jobEnd event to know that all of the taskEnd events + // must have been processed. + AssertHelpers.assertEventuallyTrue("Spark job did not complete", new BooleanExpression { + override def get(): Boolean = jobDone + }, 5000) + sc.removeSparkListener(listener) + } + numTasks + } +} diff --git a/java/settings.gradle b/java/settings.gradle index 96ca03fcdf..145c3c8f0c 100644 --- a/java/settings.gradle +++ b/java/settings.gradle @@ -20,6 +20,7 @@ rootProject.name = "kudu-parent" include "kudu-backup" +include "kudu-backup-tools" include "kudu-client" include "kudu-client-tools" include "kudu-flume-sink" diff --git a/src/kudu/client/master_proxy_rpc.cc b/src/kudu/client/master_proxy_rpc.cc index 326df12117..b873f1dc1c 100644 --- a/src/kudu/client/master_proxy_rpc.cc +++ b/src/kudu/client/master_proxy_rpc.cc @@ -202,8 +202,8 @@ bool AsyncLeaderMasterRpc::RetryOrReconnectIfNecessary( // negotiation. // Authorization errors during negotiation generally indicate failure to - // authenticate. If that failure was due to an invalid token, try to get a - // new one by reconnecting with the master. + // authenticate. If that failure was due to an invalid authn token, + // try to get a new one by establising a new connection to the master. const ErrorStatusPB* err = retrier().controller().error_response(); if (s.IsNotAuthorized()) { if (err && err->has_code() && diff --git a/src/kudu/clock/system_ntp.cc b/src/kudu/clock/system_ntp.cc index 108056a437..9993838c25 100644 --- a/src/kudu/clock/system_ntp.cc +++ b/src/kudu/clock/system_ntp.cc @@ -35,6 +35,7 @@ #include "kudu/util/errno.h" #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" +#include "kudu/util/monotime.h" #include "kudu/util/path_util.h" #include "kudu/util/status.h" #include "kudu/util/subprocess.h" @@ -124,36 +125,26 @@ Status WaitForNtp() { } LOG(INFO) << Substitute("Waiting up to --ntp_initial_sync_wait_secs=$0 " "seconds for the clock to synchronize", wait_secs); - vector cmd; - string exe; - Status s = FindExecutable("ntp-wait", {"/sbin", "/usr/sbin"}, &exe); - if (s.ok()) { - // -s is the number of seconds to sleep between retries. - // -n is the number of tries before giving up. - cmd = {exe, "-s", "1", "-n", std::to_string(wait_secs)}; - } else { - LOG(WARNING) << "Could not find ntp-wait; trying chrony waitsync instead: " - << s.ToString(); - s = FindExecutable("chronyc", {"/sbin", "/usr/sbin"}, &exe); - if (!s.ok()) { - LOG(WARNING) << "Could not find chronyc: " << s.ToString(); - return Status::NotFound("failed to find ntp-wait or chronyc"); + + // We previously relied on ntpd/chrony support tools to wait, but that + // approach doesn't work in environments where ntpd is unreachable but the + // clock is still synchronized (i.e. running inside a Linux container). + // + // Now we just interrogate the kernel directly. + Status s; + for (int i = 0; i < wait_secs; i++) { + timex timex; + s = CallAdjTime(&timex); + if (s.ok() || !s.IsServiceUnavailable()) { + return s; } - // Usage: waitsync max-tries max-correction max-skew interval. - // max-correction and max-skew parameters as 0 means no checks. - // The interval is measured in seconds. - cmd = {exe, "waitsync", std::to_string(wait_secs), "0", "0", "1"}; - } - // Unfortunately, neither ntp-wait nor chronyc waitsync print useful messages. - // Instead, rely on DumpDiagnostics. - s = Subprocess::Call(cmd); - if (!s.ok()) { - return s.CloneAndPrepend( - Substitute("failed to wait for clock sync using command '$0'", - JoinStrings(cmd, " "))); + SleepFor(MonoDelta::FromSeconds(1)); } - return Status::OK(); + + // Return the last failure. + return s.CloneAndPrepend("Timed out waiting for clock sync"); } + } // anonymous namespace void SystemNtp::DumpDiagnostics(vector* log) const { diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 3435d5d63a..a7b2057225 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -1178,8 +1178,12 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, Status::IllegalState(error_msg)); - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer " - << req.leader_uuid << ": " << error_msg; + // Adding a check to eliminate an unnecessary log message in the + // scenario where this is the first message from the Leader of a new tablet. + if (!OpIdEquals(MakeOpId(1,1), *req.preceding_opid)) { + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer " + << req.leader_uuid << ": " << error_msg; + } // If the terms mismatch we abort down to the index before the leader's preceding, // since we know that is the last opid that has a chance of not being overwritten. diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc index e730f80653..7ccb4a9cb8 100644 --- a/src/kudu/hms/mini_hms.cc +++ b/src/kudu/hms/mini_hms.cc @@ -71,12 +71,16 @@ void MiniHms::EnableKerberos(string krb5_conf, } void MiniHms::EnableSentry(const HostPort& sentry_address, - string sentry_service_principal) { + string sentry_service_principal, + int sentry_client_rpc_retry_num, + int sentry_client_rpc_retry_interval_ms) { CHECK(!hms_process_); DCHECK(!sentry_service_principal.empty()); VLOG(1) << Substitute("Enabling Sentry, at $0, for HMS", sentry_address.ToString()); sentry_address_ = sentry_address.ToString(); sentry_service_principal_ = std::move(sentry_service_principal); + sentry_client_rpc_retry_num_ = sentry_client_rpc_retry_num; + sentry_client_rpc_retry_interval_ms_ = sentry_client_rpc_retry_interval_ms; } void MiniHms::SetDataRoot(string data_root) { @@ -366,6 +370,14 @@ Status MiniHms::CreateHiveSite() const { // - sentry.metastore.service.users // Set of service users whose access will be excluded from // Sentry authorization checks. + // + // - sentry.service.client.rpc.retry-total + // Maximum number of attempts that Sentry RPC client does while + // re-trying a remote call to Sentry. + // + // - sentry.service.client.rpc.retry.interval.msec + // Time interval between attempts of Sentry's client to retry a remote + // call to Sentry. static const string kSentryFileTemplate = R"( @@ -387,12 +399,25 @@ Status MiniHms::CreateHiveSite() const { sentry.metastore.service.users kudu + + + sentry.service.client.rpc.retry-total + $3 + + + + sentry.service.client.rpc.retry.interval.msec + $4 + )"; - string sentry_file_contents = Substitute(kSentryFileTemplate, - sentry_address_, - sentry_service_principal_, - "server1"); + auto sentry_file_contents = Substitute( + kSentryFileTemplate, + sentry_address_, + sentry_service_principal_, + "server1", + sentry_client_rpc_retry_num_, + sentry_client_rpc_retry_interval_ms_); RETURN_NOT_OK(WriteStringToFile(Env::Default(), sentry_file_contents, JoinPathSegments(data_root_, "hive-sentry-site.xml"))); diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h index 0adcb21042..7fb1c22089 100644 --- a/src/kudu/hms/mini_hms.h +++ b/src/kudu/hms/mini_hms.h @@ -48,8 +48,16 @@ class MiniHms { // Configures the mini HMS to enable the Sentry plugin, passing the // Sentry service's principal to be used in Kerberos environment. + // + // Parameters 'sentry_client_rpc_retry_num' and + // 'sentry_client_rpc_retry_interval_ms' are used to override default settings + // of the Sentry client used by HMS plugins. The default values for these two + // parameters are set to allow for shorter HMS --> Sentry RPC timeout + // (i.e. shorter than with the default Sentry v2.{0,1} client's settings). void EnableSentry(const HostPort& sentry_address, - std::string sentry_service_principal); + std::string sentry_service_principal, + int sentry_client_rpc_retry_num = 3, + int sentry_client_rpc_retry_interval_ms = 500); // Configures the mini HMS to store its data in the provided path. If not set, // it uses a test-only temporary directory. @@ -114,6 +122,8 @@ class MiniHms { // Sentry configuration std::string sentry_address_; std::string sentry_service_principal_; + int sentry_client_rpc_retry_num_; + int sentry_client_rpc_retry_interval_ms_; }; } // namespace hms diff --git a/src/kudu/integration-tests/hms_itest-base.cc b/src/kudu/integration-tests/hms_itest-base.cc index 03697921f0..64a635b40b 100644 --- a/src/kudu/integration-tests/hms_itest-base.cc +++ b/src/kudu/integration-tests/hms_itest-base.cc @@ -35,6 +35,7 @@ #include "kudu/hms/mini_hms.h" #include "kudu/mini-cluster/external_mini_cluster.h" #include "kudu/util/decimal_util.h" +#include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" @@ -73,9 +74,9 @@ Status HmsITestBase::CreateDatabase(const string& database_name) { } Status HmsITestBase::CreateKuduTable(const string& database_name, - const string& table_name) { + const string& table_name, + MonoDelta timeout) { // Get coverage of all column types. - KuduSchema schema; KuduSchemaBuilder b; b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); b.AddColumn("int8_val")->Type(KuduColumnSchema::INT8); @@ -94,14 +95,19 @@ Status HmsITestBase::CreateKuduTable(const string& database_name, ->Precision(kMaxDecimal64Precision); b.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL) ->Precision(kMaxDecimal128Precision); - + KuduSchema schema; RETURN_NOT_OK(b.Build(&schema)); unique_ptr table_creator(client_->NewTableCreator()); - return table_creator->table_name(Substitute("$0.$1", database_name, table_name)) - .schema(&schema) - .num_replicas(1) - .set_range_partition_columns({ "key" }) - .Create(); + if (timeout.Initialized()) { + // If specified, set the timeout for the operation. + table_creator->timeout(timeout); + } + return table_creator->table_name(Substitute("$0.$1", + database_name, table_name)) + .schema(&schema) + .num_replicas(1) + .set_range_partition_columns({ "key" }) + .Create(); } Status HmsITestBase::RenameHmsTable(const string& database_name, diff --git a/src/kudu/integration-tests/hms_itest-base.h b/src/kudu/integration-tests/hms_itest-base.h index b348031684..50c814c4e1 100644 --- a/src/kudu/integration-tests/hms_itest-base.h +++ b/src/kudu/integration-tests/hms_itest-base.h @@ -22,12 +22,15 @@ #include +#include "kudu/gutil/port.h" #include "kudu/hms/hms_client.h" #include "kudu/integration-tests/external_mini_cluster-itest-base.h" #include "kudu/util/status.h" namespace kudu { +class MonoDelta; + class HmsITestBase : public ExternalMiniClusterITestBase { public: Status StartHms() WARN_UNUSED_RESULT; @@ -38,7 +41,8 @@ class HmsITestBase : public ExternalMiniClusterITestBase { // Creates a table in Kudu. Status CreateKuduTable(const std::string& database_name, - const std::string& table_name) WARN_UNUSED_RESULT; + const std::string& table_name, + MonoDelta timeout = {}) WARN_UNUSED_RESULT; // Renames a table entry in the HMS catalog. Status RenameHmsTable(const std::string& database_name, diff --git a/src/kudu/integration-tests/master_sentry-itest.cc b/src/kudu/integration-tests/master_sentry-itest.cc index 02ea86a119..ddaf02d808 100644 --- a/src/kudu/integration-tests/master_sentry-itest.cc +++ b/src/kudu/integration-tests/master_sentry-itest.cc @@ -873,6 +873,119 @@ TEST_F(SentryAuthzProviderCacheITest, ResetAuthzCacheConcurrentAlterTable) { } } +// This test scenario documents an artifact of the Kudu+HMS+Sentry integration +// when authz cache is enabled (the cache is enabled by default). In essence, +// information on the ownership of a table created during a short period of +// Sentry's unavailability will not ever appear in Sentry. That might be +// misleading because CreateTable() reports a success to the client. The created +// table indeed exists and is fully functional otherwise, but the corresponding +// owner privilege record is absent in Sentry. +// +// TODO(aserbin): clarify why it works with HEAD of the master branches +// of Sentry/Hive but fails with Sentry 2.1.0 and Hive 2.1.1. +TEST_F(SentryAuthzProviderCacheITest, DISABLED_CreateTables) { + constexpr const char* const kGhostTables[] = { "t10", "t11" }; + + // Grant CREATE TABLE and METADATA privileges on the database. + ASSERT_OK(GrantCreateTablePrivilege({ kDatabaseName })); + ASSERT_OK(AlterRoleGrantPrivilege( + sentry_client_.get(), kDevRole, + GetDatabasePrivilege(kDatabaseName, "METADATA"))); + + // Make sure it's possible to create a table in the database. This also + // populates the privileges cache with information on the privileges + // granted on the database. + ASSERT_OK(CreateKuduTable(kDatabaseName, "t0")); + + // An attempt to open a not-yet-existing table will fetch the information + // on the granted privileges on the table into the privileges cache. + for (const auto& t : kGhostTables) { + shared_ptr kudu_table; + const auto s = client_->OpenTable(Substitute("$0.$1", kDatabaseName, t), + &kudu_table); + ASSERT_TRUE(s.IsNotFound()) << s.ToString(); + } + + ASSERT_OK(StopSentry()); + + // CreateTable() with operation timeout longer than HMS --> Sentry + // communication timeout successfully completes. After failing to push + // the information on the newly created table to Sentry due to the logic + // implemented in the SentrySyncHMSNotificationsPostEventListener plugin, + // HMS sends success response to Kudu master and Kudu successfully completes + // the rest of the steps. + ASSERT_OK(CreateKuduTable(kDatabaseName, kGhostTables[0])); + + // In this case, the timeout for the CreateTable RPC is set to be lower than + // the HMS --> Sentry communication timeout (see corresponding parameters + // of the MiniHms::EnableSentry() method). CreateTable() successfully passes + // the authz phase since the information on privileges is cached and no + // Sentry RPC calls are attempted. However, since Sentry is down, + // CreateTable() takes a long time on the HMS's side and the client's + // request times out, while the creation of the table continues in the + // background. + { + const auto s = CreateKuduTable(kDatabaseName, kGhostTables[1], + MonoDelta::FromSeconds(1)); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + } + + // Before starting Sentry, make sure the abandoned request to create the + // latter table succeeded even if CreateKuduTable() reported timeout. + // This is to make sure HMS stopped trying to push notification + // on table creation to Sentry anymore via the metastore plugin + // SentrySyncHMSNotificationsPostEventListener. + ASSERT_EVENTUALLY([&]{ + bool exists; + ASSERT_OK(client_->TableExists( + Substitute("$0.$1", kDatabaseName, kGhostTables[1]), &exists)); + ASSERT_TRUE(exists); + }); + + ASSERT_OK(ResetCache()); + + // After resetting the cache, it should not be possible to create another + // table: authz provider needs to fetch information on privileges directly + // from Sentry, but it's still down. + { + const auto s = CreateKuduTable(kDatabaseName, "t2"); + ASSERT_TRUE(s.IsNetworkError()) << s.ToString(); + } + + ASSERT_OK(StartSentry()); + + // Try to create the table after starting Sentry back: it should be a success. + ASSERT_OK(CreateKuduTable(kDatabaseName, "t2")); + + // Once table has been created, it should be possible to perform DDL operation + // on it since the user is the owner of the table. + { + unique_ptr table_alterer( + client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, "t2"))); + table_alterer->AddColumn("new_int8_columun")->Type(KuduColumnSchema::INT8); + ASSERT_OK(table_alterer->Alter()); + } + + // Try to run DDL against the tables created during Sentry's downtime. These + // should not be authorized since Sentry didn't received information on the + // ownership of those tables from HMS during their creation and there isn't + // any catch up for those events made after Sentry started. + for (const auto& t : kGhostTables) { + unique_ptr table_alterer( + client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, t))); + table_alterer->AddColumn("new_int8_columun")->Type(KuduColumnSchema::INT8); + auto s = table_alterer->Alter(); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + + // After granting the ALTER TABLE privilege the alteration should be + // successful. One caveat: it's necessary to reset the cache since the cache + // will not have the new record till the current entry hasn't yet expired. + ASSERT_OK(GrantAlterTablePrivilege({ kDatabaseName, t })); + ASSERT_OK(ResetCache()); + ASSERT_OK(table_alterer->Alter()); + } +} + // Basic test to verify access control and functionality of // the ResetAuthzCache(); integration with Sentry is not enabled. class AuthzCacheControlTest : public ExternalMiniClusterITestBase { diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc index b34d2338b1..2237a4b47a 100644 --- a/src/kudu/master/hms_notification_log_listener.cc +++ b/src/kudu/master/hms_notification_log_listener.cc @@ -118,7 +118,7 @@ Status HmsNotificationLogListenerTask::WaitForCatchUp(const MonoTime& deadline) wake_up_cv_.Signal(); } - RETURN_NOT_OK_PREPEND(synchronizer.WaitFor(deadline - MonoTime::Now()), + RETURN_NOT_OK_PREPEND(synchronizer.WaitUntil(deadline), "failed to wait for Hive Metastore notification log listener to catch up"); return Status::OK(); } diff --git a/src/kudu/tablet/rowset_info.h b/src/kudu/tablet/rowset_info.h index af2d07db3e..f133e3e1ee 100644 --- a/src/kudu/tablet/rowset_info.h +++ b/src/kudu/tablet/rowset_info.h @@ -51,6 +51,8 @@ class RowSetInfo { // rowset tree is set into 'average_height', if it is not nullptr. // If one of 'info_by_min_key' and 'info_by_max_key' is nullptr, the other // must be. + // Requires holding the compact_select_lock_ for the tablet that the + // rowsets in 'tree' references. static void ComputeCdfAndCollectOrdered(const RowSetTree& tree, double* average_height, std::vector* info_by_min_key, diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 7627ed43d2..daee9a5b18 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -134,11 +134,13 @@ DEFINE_double(tablet_throttler_burst_factor, 1.0f, "base rate."); TAG_FLAG(tablet_throttler_burst_factor, experimental); -DEFINE_int32(tablet_history_max_age_sec, 15 * 60, - "Number of seconds to retain tablet history. Reads initiated at a " - "snapshot that is older than this age will be rejected. " - "To disable history removal, set to -1."); +DEFINE_int32(tablet_history_max_age_sec, 60 * 60 * 24 * 7, + "Number of seconds to retain tablet history, including history " + "required to perform diff scans and incremental backups. Reads " + "initiated at a snapshot that is older than this age will be " + "rejected. To disable history removal, set to -1."); TAG_FLAG(tablet_history_max_age_sec, advanced); +TAG_FLAG(tablet_history_max_age_sec, stable); DEFINE_int32(max_cell_size_bytes, 64 * 1024, "The maximum size of any individual cell in a table. Attempting to store " @@ -264,7 +266,6 @@ Status Tablet::Open() { TRACE_EVENT0("tablet", "Tablet::Open"); RETURN_IF_STOPPED_OR_CHECK_STATE(kInitialized); - std::lock_guard lock(component_lock_); CHECK(schema()->has_column_ids()); next_mrs_id_ = metadata_->last_durable_mrs_id() + 1; @@ -291,15 +292,6 @@ Status Tablet::Open() { shared_ptr new_rowset_tree(new RowSetTree()); CHECK_OK(new_rowset_tree->Reset(rowsets_opened)); - if (metrics_) { - // Compute the initial average height of the rowset tree. - double avg_height; - RowSetInfo::ComputeCdfAndCollectOrdered(*new_rowset_tree, - &avg_height, - nullptr, - nullptr); - metrics_->average_diskrowset_height->set_value(avg_height); - } // Now that the current state is loaded, create the new MemRowSet with the next id. shared_ptr new_mrs; @@ -307,7 +299,13 @@ Status Tablet::Open() { log_anchor_registry_.get(), mem_trackers_.tablet_tracker, &new_mrs)); - components_ = new TabletComponents(new_mrs, new_rowset_tree); + { + std::lock_guard lock(component_lock_); + components_ = new TabletComponents(new_mrs, new_rowset_tree); + } + + // Compute the initial average rowset height. + UpdateAverageRowsetHeight(); { std::lock_guard l(state_lock_); @@ -1053,10 +1051,10 @@ void Tablet::ModifyRowSetTree(const RowSetTree& old_tree, CHECK_OK(new_tree->Reset(post_swap)); } -void Tablet::AtomicSwapRowSets(const RowSetVector &old_rowsets, - const RowSetVector &new_rowsets) { +void Tablet::AtomicSwapRowSets(const RowSetVector &to_remove, + const RowSetVector &to_add) { std::lock_guard lock(component_lock_); - AtomicSwapRowSetsUnlocked(old_rowsets, new_rowsets); + AtomicSwapRowSetsUnlocked(to_remove, to_add); } void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove, @@ -1068,19 +1066,6 @@ void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove, to_remove, to_add, new_tree.get()); components_ = new TabletComponents(components_->memrowset, new_tree); - - // Recompute the average rowset height. - // TODO(wdberkeley): We should be able to cache the computation of the CDF - // and average height and efficiently recompute it instead of doing it from - // scratch. - if (metrics_) { - double avg_height; - RowSetInfo::ComputeCdfAndCollectOrdered(*new_tree, - &avg_height, - nullptr, - nullptr); - metrics_->average_diskrowset_height->set_value(avg_height); - } } Status Tablet::DoMajorDeltaCompaction(const vector& col_ids, @@ -1723,6 +1708,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input, // Replace the compacted rowsets with the new on-disk rowsets, making them visible now that // their metadata was written to disk. AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets); + UpdateAverageRowsetHeight(); const auto rows_written = drsw.rows_written_count(); const auto drs_written = drsw.drs_written_count(); @@ -1753,9 +1739,28 @@ Status Tablet::HandleEmptyCompactionOrFlush(const RowSetVector& rowsets, "Failed to flush new tablet metadata"); AtomicSwapRowSets(rowsets, RowSetVector()); + UpdateAverageRowsetHeight(); return Status::OK(); } +void Tablet::UpdateAverageRowsetHeight() { + if (!metrics_) { + return; + } + // TODO(wdberkeley): We should be able to cache the computation of the CDF + // and average height and efficiently recompute it instead of doing it from + // scratch. + scoped_refptr comps; + GetComponents(&comps); + std::lock_guard l(compact_select_lock_); + double avg_height; + RowSetInfo::ComputeCdfAndCollectOrdered(*comps->rowsets, + &avg_height, + nullptr, + nullptr); + metrics_->average_diskrowset_height->set_value(avg_height); +} + Status Tablet::Compact(CompactFlags flags) { RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen); diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 87448f5da3..c4a5691046 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -347,7 +347,6 @@ class Tablet { // memrowset in the current implementation. Status CountRows(uint64_t *count) const; - // Verbosely dump this entire tablet to the logs. This is only // really useful when debugging unit tests failures where the tablet // has a very small number of rows. @@ -418,6 +417,7 @@ class Tablet { // This method is thread-safe. void CancelMaintenanceOps(); + const std::string& table_id() const { return metadata_->table_id(); } const std::string& tablet_id() const { return metadata_->tablet_id(); } // Return the metrics for this tablet. @@ -599,6 +599,10 @@ class Tablet { Status HandleEmptyCompactionOrFlush(const RowSetVector& rowsets, int mrs_being_flushed); + // Updates the average rowset height metric. Acquires the tablet's + // compact_select_lock_. + void UpdateAverageRowsetHeight(); + Status FlushMetadata(const RowSetVector& to_remove, const RowSetMetadataVector& to_add, int64_t mrs_being_flushed); diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h index 62e754532e..adf0d4cbb7 100644 --- a/src/kudu/tablet/tablet_metadata.h +++ b/src/kudu/tablet/tablet_metadata.h @@ -119,7 +119,7 @@ class TabletMetadata : public RefCountedThreadSafe { return partition_; } - std::string table_id() const { + const std::string& table_id() const { DCHECK_NE(state_, kNotLoadedYet); return table_id_; } diff --git a/src/kudu/tablet/tablet_mm_ops-test.cc b/src/kudu/tablet/tablet_mm_ops-test.cc index a2f55728cd..3a13386a9d 100644 --- a/src/kudu/tablet/tablet_mm_ops-test.cc +++ b/src/kudu/tablet/tablet_mm_ops-test.cc @@ -62,7 +62,7 @@ class KuduTabletMmOpsTest : public TabletTestBase> { void StatsShouldChange(MaintenanceOp* op) { SleepFor(MonoDelta::FromMilliseconds(1)); op->UpdateStats(&stats_); - ASSERT_TRUE(next_time_ < stats_.last_modified()); + ASSERT_LT(next_time_, stats_.last_modified()); next_time_ = stats_.last_modified(); } @@ -70,7 +70,6 @@ class KuduTabletMmOpsTest : public TabletTestBase> { SleepFor(MonoDelta::FromMilliseconds(1)); op->UpdateStats(&stats_); ASSERT_EQ(next_time_, stats_.last_modified()); - next_time_ = stats_.last_modified(); } void TestFirstCall(MaintenanceOp* op) { diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc index c71bc6e527..c54c241966 100644 --- a/src/kudu/tablet/tablet_mm_ops.cc +++ b/src/kudu/tablet/tablet_mm_ops.cc @@ -87,6 +87,10 @@ string TabletOpBase::LogPrefix() const { return tablet_->LogPrefix(); } +const std::string& TabletOpBase::table_id() const { + return tablet_->table_id(); +} + //////////////////////////////////////////////////////////// // CompactRowSetsOp //////////////////////////////////////////////////////////// @@ -262,12 +266,12 @@ void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) { // cached stats. TabletMetrics* metrics = tablet_->metrics(); if (metrics) { - int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); - int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); - int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); - int64_t new_num_rs_minor_delta_compacted = + uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); + uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); + uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); + uint64_t new_num_rs_minor_delta_compacted = metrics->delta_minor_compact_rs_duration->TotalCount(); - int64_t new_num_rs_major_delta_compacted = + uint64_t new_num_rs_major_delta_compacted = metrics->delta_major_compact_rs_duration->TotalCount(); if (prev_stats_.valid() && new_num_mrs_flushed == last_num_mrs_flushed_ && diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h index 8fc865e0f1..1e117f1bd1 100644 --- a/src/kudu/tablet/tablet_mm_ops.h +++ b/src/kudu/tablet/tablet_mm_ops.h @@ -43,6 +43,8 @@ class TabletOpBase : public MaintenanceOp { std::string LogPrefix() const; protected: + const std::string& table_id() const OVERRIDE; + Tablet* const tablet_; }; @@ -57,15 +59,15 @@ class CompactRowSetsOp : public TabletOpBase { public: explicit CompactRowSetsOp(Tablet* tablet); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: mutable simple_spinlock lock_; @@ -83,15 +85,15 @@ class MinorDeltaCompactionOp : public TabletOpBase { public: explicit MinorDeltaCompactionOp(Tablet* tablet); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: mutable simple_spinlock lock_; @@ -109,15 +111,15 @@ class MajorDeltaCompactionOp : public TabletOpBase { public: explicit MajorDeltaCompactionOp(Tablet* tablet); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: mutable simple_spinlock lock_; @@ -138,19 +140,19 @@ class UndoDeltaBlockGCOp : public TabletOpBase { // Estimates the number of bytes that may potentially be in ancient delta // undo blocks. Over time, as Perform() is invoked, this estimate gets more // accurate. - void UpdateStats(MaintenanceOpStats* stats) override; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - bool Prepare() override; + bool Prepare() OVERRIDE; // Deletes ancient history data from disk. This also initializes undo delta // blocks greedily (in a budgeted manner controlled by the // --undo_delta_block_gc_init_budget_millis gflag) that makes the estimate // performed in UpdateStats() more accurate. - void Perform() override; + void Perform() OVERRIDE; - scoped_refptr DurationHistogram() const override; + scoped_refptr DurationHistogram() const OVERRIDE; - scoped_refptr > RunningGauge() const override; + scoped_refptr > RunningGauge() const OVERRIDE; private: std::string LogPrefix() const; diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index c7b231bc94..a284215bc8 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -38,6 +38,7 @@ #include "kudu/consensus/log_anchor_registry.h" #include "kudu/consensus/opid.pb.h" #include "kudu/consensus/raft_consensus.h" +#include "kudu/fs/data_dirs.h" #include "kudu/gutil/basictypes.h" #include "kudu/gutil/bind.h" #include "kudu/gutil/bind_helpers.h" @@ -124,7 +125,6 @@ TabletReplica::TabletReplica( Callback mark_dirty_clbk) : meta_(DCHECK_NOTNULL(std::move(meta))), cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))), - tablet_id_(meta_->tablet_id()), local_peer_pb_(std::move(local_peer_pb)), log_anchor_registry_(new LogAnchorRegistry()), apply_pool_(apply_pool), diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h index bf9d63dd0e..50fab2dae8 100644 --- a/src/kudu/tablet/tablet_replica.h +++ b/src/kudu/tablet/tablet_replica.h @@ -254,10 +254,8 @@ class TabletReplica : public RefCountedThreadSafe, return log_anchor_registry_; } - // Returns the tablet_id of the tablet managed by this TabletReplica. - // Returns the correct tablet_id even if the underlying tablet is not available - // yet. - const std::string& tablet_id() const { return tablet_id_; } + const std::string& table_id() const { return meta_->table_id(); } + const std::string& tablet_id() const { return meta_->tablet_id(); } // Convenience method to return the permanent_uuid of this peer. std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); } @@ -322,7 +320,6 @@ class TabletReplica : public RefCountedThreadSafe, const scoped_refptr meta_; const scoped_refptr cmeta_manager_; - const std::string tablet_id_; const consensus::RaftPeerPB local_peer_pb_; scoped_refptr log_anchor_registry_; // Assigned in tablet_replica-test diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc index a7267debc2..a848bcb838 100644 --- a/src/kudu/tablet/tablet_replica_mm_ops.cc +++ b/src/kudu/tablet/tablet_replica_mm_ops.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -121,6 +122,20 @@ void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats } } +// +// TabletReplicaOpBase. +// +TabletReplicaOpBase::TabletReplicaOpBase(std::string name, + IOUsage io_usage, + TabletReplica* tablet_replica) + : MaintenanceOp(std::move(name), io_usage), + tablet_replica_(tablet_replica) { +} + +const std::string& TabletReplicaOpBase::table_id() const { + return tablet_replica_->table_id(); +} + // // FlushMRSOp. // @@ -260,9 +275,10 @@ scoped_refptr > FlushDeltaMemStoresOp::RunningGauge() cons // LogGCOp::LogGCOp(TabletReplica* tablet_replica) - : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_replica->tablet()->tablet_id().c_str()), - MaintenanceOp::LOW_IO_USAGE), - tablet_replica_(tablet_replica), + : TabletReplicaOpBase(StringPrintf("LogGCOp(%s)", + tablet_replica->tablet()->tablet_id().c_str()), + MaintenanceOp::LOW_IO_USAGE, + tablet_replica), log_gc_duration_(METRIC_log_gc_duration.Instantiate( tablet_replica->tablet()->GetMetricEntity())), log_gc_running_(METRIC_log_gc_running.Instantiate( diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h b/src/kudu/tablet/tablet_replica_mm_ops.h index 2404b489bf..72fdb39442 100644 --- a/src/kudu/tablet/tablet_replica_mm_ops.h +++ b/src/kudu/tablet/tablet_replica_mm_ops.h @@ -47,86 +47,92 @@ class FlushOpPerfImprovementPolicy { FlushOpPerfImprovementPolicy() {} }; +class TabletReplicaOpBase : public MaintenanceOp { + public: + explicit TabletReplicaOpBase(std::string name, IOUsage io_usage, TabletReplica* tablet_replica); + + protected: + const std::string& table_id() const OVERRIDE; + + TabletReplica *const tablet_replica_; +}; + // Maintenance op for MRS flush. Only one can happen at a time. -class FlushMRSOp : public MaintenanceOp { +class FlushMRSOp : public TabletReplicaOpBase { public: explicit FlushMRSOp(TabletReplica* tablet_replica) - : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_replica->tablet()->tablet_id().c_str()), - MaintenanceOp::HIGH_IO_USAGE), - tablet_replica_(tablet_replica) { + : TabletReplicaOpBase(StringPrintf("FlushMRSOp(%s)", + tablet_replica->tablet()->tablet_id().c_str()), + MaintenanceOp::HIGH_IO_USAGE, + tablet_replica) { time_since_flush_.start(); } - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: // Lock protecting time_since_flush_. mutable simple_spinlock lock_; Stopwatch time_since_flush_; - - TabletReplica *const tablet_replica_; }; // Maintenance op for DMS flush. // Reports stats for all the DMS this tablet contains but only flushes one in Perform(). -class FlushDeltaMemStoresOp : public MaintenanceOp { +class FlushDeltaMemStoresOp : public TabletReplicaOpBase { public: explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica) - : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)", - tablet_replica->tablet()->tablet_id().c_str()), - MaintenanceOp::HIGH_IO_USAGE), - tablet_replica_(tablet_replica) { + : TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)", + tablet_replica->tablet()->tablet_id().c_str()), + MaintenanceOp::HIGH_IO_USAGE, + tablet_replica) { time_since_flush_.start(); } - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE { + bool Prepare() OVERRIDE { return true; } - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: // Lock protecting time_since_flush_ mutable simple_spinlock lock_; Stopwatch time_since_flush_; - - TabletReplica *const tablet_replica_; }; // Maintenance task that runs log GC. Reports log retention that represents the amount of data // that can be GC'd. // // Only one LogGC op can run at a time. -class LogGCOp : public MaintenanceOp { +class LogGCOp : public TabletReplicaOpBase { public: explicit LogGCOp(TabletReplica* tablet_replica); - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE; - virtual bool Prepare() OVERRIDE; + bool Prepare() OVERRIDE; - virtual void Perform() OVERRIDE; + void Perform() OVERRIDE; - virtual scoped_refptr DurationHistogram() const OVERRIDE; + scoped_refptr DurationHistogram() const OVERRIDE; - virtual scoped_refptr > RunningGauge() const OVERRIDE; + scoped_refptr > RunningGauge() const OVERRIDE; private: - TabletReplica *const tablet_replica_; scoped_refptr log_gc_duration_; scoped_refptr > log_gc_running_; mutable Semaphore sem_; diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h index 99624b5943..b4db99c80d 100644 --- a/src/kudu/tserver/tablet_copy-test-base.h +++ b/src/kudu/tserver/tablet_copy-test-base.h @@ -57,7 +57,9 @@ class TabletCopyTest : public TabletServerTestBase { } virtual void TearDown() OVERRIDE { - ASSERT_OK(tablet_replica_->log_anchor_registry()->Unregister(&anchor_)); + if (tablet_replica_) { + ASSERT_OK(tablet_replica_->log_anchor_registry()->Unregister(&anchor_)); + } NO_FATALS(TabletServerTestBase::TearDown()); } diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index f12d181e8e..47334c63b2 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -230,6 +230,7 @@ set(UTIL_SRCS version_info.cc version_util.cc website_util.cc + yamlreader.cc zlib.cc ) @@ -257,6 +258,7 @@ set(UTIL_LIBS pb_util_proto protobuf version_info_proto + yaml zlib) if(NOT APPLE) @@ -293,7 +295,6 @@ set(UTIL_COMPRESSION_SRCS set(UTIL_COMPRESSION_LIBS kudu_util util_compression_proto - glog gutil lz4 @@ -467,6 +468,7 @@ ADD_KUDU_TEST(ttl_cache-test) ADD_KUDU_TEST(url-coding-test) ADD_KUDU_TEST(user-test) ADD_KUDU_TEST(version_util-test) +ADD_KUDU_TEST(yamlreader-test) if (NOT APPLE) ADD_KUDU_TEST(minidump-test) diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc index 5cb7a6372b..91f2baa11d 100644 --- a/src/kudu/util/async_util-test.cc +++ b/src/kudu/util/async_util-test.cc @@ -28,6 +28,7 @@ #include "kudu/gutil/basictypes.h" #include "kudu/gutil/callback.h" #include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -43,7 +44,7 @@ class AsyncUtilTest : public KuduTest { // Set up an alarm to fail the test in case of deadlock. alarm(30); } - ~AsyncUtilTest() { + virtual ~AsyncUtilTest() { // Disable the alarm on test exit. alarm(0); } @@ -99,31 +100,72 @@ TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) { } } -TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) { - thread waiter; - { - Synchronizer sync; - auto cb = sync.AsStatusCallback(); - waiter = thread([cb] { - SleepFor(MonoDelta::FromMilliseconds(5)); - cb.Run(Status::OK()); - }); - ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000))); - } - waiter.join(); +// Flavors of wait that Synchronizer is capable of: WaitFor() or WaitUntil(). +enum class TimedWaitFlavor { + WaitFor, + WaitUntil, +}; - { - Synchronizer sync; - auto cb = sync.AsStatusCallback(); - waiter = thread([cb] { - SleepFor(MonoDelta::FromMilliseconds(1000)); - cb.Run(Status::OK()); - }); - ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut()); +class AsyncUtilTimedWaitTest: + public AsyncUtilTest, + public ::testing::WithParamInterface { +}; + +TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitSuccess) { + const auto kWaitInterval = MonoDelta::FromMilliseconds(1000); + + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + auto waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(5)); + cb.Run(Status::OK()); + }); + SCOPED_CLEANUP({ + waiter.join(); + }); + const auto mode = GetParam(); + switch (mode) { + case TimedWaitFlavor::WaitFor: + ASSERT_OK(sync.WaitFor(kWaitInterval)); + break; + case TimedWaitFlavor::WaitUntil: + ASSERT_OK(sync.WaitUntil(MonoTime::Now() + kWaitInterval)); + break; + default: + FAIL() << "unsupported wait mode " << static_cast(mode); + break; } +} + +TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitTimeout) { + const auto kWaitInterval = MonoDelta::FromMilliseconds(5); - // Waiting on the thread gives TSAN to check that no thread safety issues - // occurred. - waiter.join(); + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + auto waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(1000)); + cb.Run(Status::OK()); + }); + SCOPED_CLEANUP({ + waiter.join(); + }); + const auto mode = GetParam(); + switch (mode) { + case TimedWaitFlavor::WaitFor: + ASSERT_TRUE(sync.WaitFor(kWaitInterval).IsTimedOut()); + break; + case TimedWaitFlavor::WaitUntil: + ASSERT_TRUE(sync.WaitUntil(MonoTime::Now() + kWaitInterval).IsTimedOut()); + break; + default: + FAIL() << "unsupported wait mode " << static_cast(mode); + break; + } } + +INSTANTIATE_TEST_CASE_P(WaitFlavors, + AsyncUtilTimedWaitTest, + ::testing::Values(TimedWaitFlavor::WaitFor, + TimedWaitFlavor::WaitUntil)); + } // namespace kudu diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h index 338c6c2973..61621d6b4f 100644 --- a/src/kudu/util/async_util.h +++ b/src/kudu/util/async_util.h @@ -44,7 +44,7 @@ namespace kudu { class Synchronizer { public: Synchronizer() - : data_(std::make_shared()) { + : data_(std::make_shared()) { } void StatusCB(const Status& status) { @@ -71,6 +71,13 @@ class Synchronizer { return data_->status; } + Status WaitUntil(const MonoTime& deadline) const { + if (PREDICT_FALSE(!data_->latch.WaitUntil(deadline))) { + return Status::TimedOut("timed out while waiting for the callback to be called"); + } + return data_->status; + } + void Reset() { data_->latch.Reset(1); } diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc index 6777e06b81..8c88dd74a9 100644 --- a/src/kudu/util/maintenance_manager-test.cc +++ b/src/kudu/util/maintenance_manager-test.cc @@ -15,16 +15,22 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/util/maintenance_manager.h" + +#include + +#include #include #include +#include #include #include #include #include #include -#include #include // IWYU pragma: keep +#include #include #include #include @@ -32,7 +38,6 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/util/maintenance_manager.h" #include "kudu/util/maintenance_manager.pb.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" @@ -42,9 +47,9 @@ #include "kudu/util/test_util.h" #include "kudu/util/thread.h" +using std::list; using std::shared_ptr; using std::string; -using std::vector; using strings::Substitute; METRIC_DEFINE_entity(test); @@ -57,17 +62,29 @@ METRIC_DEFINE_histogram(test, maintenance_op_duration, kudu::MetricUnit::kSeconds, "", 60000000LU, 2); DECLARE_int64(log_target_replay_size_mb); +DECLARE_string(maintenance_manager_table_priorities); +DECLARE_double(maintenance_op_multiplier); +DECLARE_int32(max_priority_range); + namespace kudu { -static const int kHistorySize = 4; +static const int kHistorySize = 6; static const char kFakeUuid[] = "12345"; class MaintenanceManagerTest : public KuduTest { public: - void SetUp() override { + void SetUp() OVERRIDE { + StartManager(2); + } + + void TearDown() OVERRIDE { + StopManager(); + } + + void StartManager(int32_t num_threads) { MaintenanceManager::Options options; - options.num_threads = 2; + options.num_threads = num_threads; options.polling_interval_ms = 1; options.history_size = kHistorySize; manager_.reset(new MaintenanceManager(options, kFakeUuid)); @@ -78,7 +95,7 @@ class MaintenanceManagerTest : public KuduTest { ASSERT_OK(manager_->Start()); } - void TearDown() override { + void StopManager() { manager_->Shutdown(); } @@ -95,7 +112,8 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) { class TestMaintenanceOp : public MaintenanceOp { public: TestMaintenanceOp(const std::string& name, - IOUsage io_usage) + IOUsage io_usage, + std::string table_id = "fake.table_id") : MaintenanceOp(name, io_usage), ram_anchored_(500), logs_retained_bytes_(0), @@ -105,12 +123,13 @@ class TestMaintenanceOp : public MaintenanceOp { maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)), remaining_runs_(1), prepared_runs_(0), - sleep_time_(MonoDelta::FromSeconds(0)) { + sleep_time_(MonoDelta::FromSeconds(0)), + table_id_(std::move(table_id)) { } - virtual ~TestMaintenanceOp() {} + ~TestMaintenanceOp() OVERRIDE = default; - virtual bool Prepare() OVERRIDE { + bool Prepare() OVERRIDE { std::lock_guard guard(lock_); if (remaining_runs_ == 0) { return false; @@ -121,7 +140,7 @@ class TestMaintenanceOp : public MaintenanceOp { return true; } - virtual void Perform() OVERRIDE { + void Perform() OVERRIDE { { std::lock_guard guard(lock_); DLOG(INFO) << "Performing op " << name(); @@ -135,7 +154,7 @@ class TestMaintenanceOp : public MaintenanceOp { SleepFor(sleep_time_); } - virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE { + void UpdateStats(MaintenanceOpStats* stats) OVERRIDE { std::lock_guard guard(lock_); stats->set_runnable(remaining_runs_ > 0); stats->set_ram_anchored(ram_anchored_); @@ -168,14 +187,18 @@ class TestMaintenanceOp : public MaintenanceOp { perf_improvement_ = perf_improvement; } - virtual scoped_refptr DurationHistogram() const OVERRIDE { + scoped_refptr DurationHistogram() const OVERRIDE { return maintenance_op_duration_; } - virtual scoped_refptr > RunningGauge() const OVERRIDE { + scoped_refptr > RunningGauge() const OVERRIDE { return maintenance_ops_running_; } + const std::string& table_id() const OVERRIDE { + return table_id_; + } + private: Mutex lock_; @@ -195,6 +218,7 @@ class TestMaintenanceOp : public MaintenanceOp { // The amount of time each op invocation will sleep. MonoDelta sleep_time_; + std::string table_id_; }; // Create an op and wait for it to start running. Unregister it while it is @@ -226,7 +250,7 @@ TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) { // Set the op to run up to 10 times, and each time should sleep for a second. op1.set_remaining_runs(10); - op1.set_sleep_time(MonoDelta::FromSeconds(1)); + op1.set_sleep_time(MonoDelta::FromMilliseconds(10)); manager_->RegisterOp(&op1); // Wait until two instances of the ops start running, since we have two MM threads. @@ -266,7 +290,7 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) { TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) { const int64_t kMB = 1024 * 1024; - manager_->Shutdown(); + StopManager(); TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE); op1.set_ram_anchored(0); @@ -318,7 +342,7 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) { TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE); op.set_perf_improvement(10); op.set_remaining_runs(2); - op.set_sleep_time(MonoDelta::FromSeconds(1)); + op.set_sleep_time(MonoDelta::FromMilliseconds(10)); manager_->RegisterOp(&op); // Check that running instances are added to the maintenance manager's collection, @@ -341,10 +365,11 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) { manager_->GetMaintenanceManagerStatusDump(&status_pb); ASSERT_EQ(status_pb.running_operations_size(), 0); } + // Test adding operations and make sure that the history of recently completed operations // is correct in that it wraps around and doesn't grow. TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) { - for (int i = 0; i < 5; i++) { + for (int i = 0; i < kHistorySize + 1; i++) { string name = Substitute("op$0", i); TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE); op.set_perf_improvement(1); @@ -358,12 +383,120 @@ TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) { MaintenanceManagerStatusPB status_pb; manager_->GetMaintenanceManagerStatusDump(&status_pb); - // The size should be at most the history_size. - ASSERT_GE(kHistorySize, status_pb.completed_operations_size()); + // The size should equal to the current completed OP size, + // and should be at most the kHistorySize. + ASSERT_EQ(std::min(kHistorySize, i + 1), status_pb.completed_operations_size()); // The most recently completed op should always be first, even if we wrap // around. ASSERT_EQ(name, status_pb.completed_operations(0).name()); } } +// Test maintenance OP factors. +// The OPs on different priority levels have different OP score multipliers. +TEST_F(MaintenanceManagerTest, TestOpFactors) { + ASSERT_GE(FLAGS_max_priority_range, 1); + FLAGS_maintenance_manager_table_priorities + = Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4", + -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1); + TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1"); + TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2"); + TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3"); + TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4"); + TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5"); + TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6"); + + manager_->UpdateTablePriorities(); + + ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -FLAGS_max_priority_range), + manager_->PerfImprovement(1, op1.table_id())); + ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -1), + manager_->PerfImprovement(1, op2.table_id())); + ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op3.table_id())); + ASSERT_DOUBLE_EQ(FLAGS_maintenance_op_multiplier, manager_->PerfImprovement(1, op4.table_id())); + ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, FLAGS_max_priority_range), + manager_->PerfImprovement(1, op5.table_id())); + ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op6.table_id())); +} + +// Test priority OP launching. +TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) { + StopManager(); + StartManager(1); + + ASSERT_NE("", gflags::SetCommandLineOption( + "maintenance_manager_table_priorities", + Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4", + -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1).c_str())); + + TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1"); + op1.set_perf_improvement(10); + op1.set_remaining_runs(1); + op1.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2"); + op2.set_perf_improvement(10); + op2.set_remaining_runs(1); + op2.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3"); + op3.set_perf_improvement(10); + op3.set_remaining_runs(1); + op3.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4"); + op4.set_perf_improvement(10); + op4.set_remaining_runs(1); + op4.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5"); + op5.set_perf_improvement(10); + op5.set_remaining_runs(1); + op5.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6"); + op6.set_perf_improvement(12); + op6.set_remaining_runs(1); + op6.set_sleep_time(MonoDelta::FromMilliseconds(1)); + + manager_->RegisterOp(&op1); + manager_->RegisterOp(&op2); + manager_->RegisterOp(&op3); + manager_->RegisterOp(&op4); + manager_->RegisterOp(&op5); + manager_->RegisterOp(&op6); + + ASSERT_EVENTUALLY([&]() { + MaintenanceManagerStatusPB status_pb; + manager_->GetMaintenanceManagerStatusDump(&status_pb); + ASSERT_EQ(status_pb.completed_operations_size(), 6); + }); + + // Wait for instances to complete. + manager_->UnregisterOp(&op1); + manager_->UnregisterOp(&op2); + manager_->UnregisterOp(&op3); + manager_->UnregisterOp(&op4); + manager_->UnregisterOp(&op5); + manager_->UnregisterOp(&op6); + + // Check that running instances are removed from collection after completion. + MaintenanceManagerStatusPB status_pb; + manager_->GetMaintenanceManagerStatusDump(&status_pb); + ASSERT_EQ(status_pb.running_operations_size(), 0); + ASSERT_EQ(status_pb.completed_operations_size(), 6); + // In perf_improvement score ascending order, the latter completed OP will list former. + list ordered_ops({"op1", + "op2", + "op3", + "op4", + "op6", + "op5"}); + ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size()); + for (const auto& instance : status_pb.completed_operations()) { + ASSERT_EQ(ordered_ops.front(), instance.name()); + ordered_ops.pop_front(); + } +} + } // namespace kudu diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc index ea607aa000..3231b31b6e 100644 --- a/src/kudu/util/maintenance_manager.cc +++ b/src/kudu/util/maintenance_manager.cc @@ -17,8 +17,9 @@ #include "kudu/util/maintenance_manager.h" +#include #include -#include +#include #include #include #include @@ -26,6 +27,7 @@ #include #include #include +#include #include #include @@ -33,6 +35,8 @@ #include "kudu/gutil/dynamic_annotations.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/debug/trace_logging.h" @@ -50,6 +54,8 @@ using std::pair; using std::string; +using std::vector; +using strings::Split; using strings::Substitute; DEFINE_int32(maintenance_manager_num_threads, 1, @@ -91,6 +97,29 @@ DEFINE_double(data_gc_prioritization_prob, 0.5, "such as delta compaction."); TAG_FLAG(data_gc_prioritization_prob, experimental); +DEFINE_string(maintenance_manager_table_priorities, "", + "Priorities of tables, semicolon-separated list of table-priority pairs, and each " + "table-priority pair is combined by table id, colon and priority level. Priority " + "level is ranged in [-FLAGS_max_priority_range, FLAGS_max_priority_range]"); +TAG_FLAG(maintenance_manager_table_priorities, advanced); +TAG_FLAG(maintenance_manager_table_priorities, experimental); +TAG_FLAG(maintenance_manager_table_priorities, runtime); + +DEFINE_double(maintenance_op_multiplier, 1.1, + "Multiplier applied on different priority levels, table maintenance OPs on level N " + "has multiplier of FLAGS_maintenance_op_multiplier^N, the last score will be " + "multiplied by this multiplier. Note: this multiplier is only take effect on " + "compaction OPs"); +TAG_FLAG(maintenance_op_multiplier, advanced); +TAG_FLAG(maintenance_op_multiplier, experimental); +TAG_FLAG(maintenance_op_multiplier, runtime); + +DEFINE_int32(max_priority_range, 5, + "Maximal priority range of OPs."); +TAG_FLAG(max_priority_range, advanced); +TAG_FLAG(max_priority_range, experimental); +TAG_FLAG(max_priority_range, runtime); + namespace kudu { MaintenanceOpStats::MaintenanceOpStats() { @@ -107,7 +136,7 @@ void MaintenanceOpStats::Clear() { last_modified_ = MonoTime(); } -MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage) +MaintenanceOp::MaintenanceOp(string name, IOUsage io_usage) : name_(std::move(name)), running_(0), cancel_(false), @@ -129,10 +158,10 @@ MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const { pb.set_thread_id(thread_id); pb.set_name(name); if (duration.Initialized()) { - pb.set_duration_millis(duration.ToMilliseconds()); + pb.set_duration_millis(static_cast(duration.ToMilliseconds())); } MonoDelta delta(MonoTime::Now() - start_mono_time); - pb.set_millis_since_start(delta.ToMilliseconds()); + pb.set_millis_since_start(static_cast(delta.ToMilliseconds())); return pb; } @@ -143,7 +172,7 @@ const MaintenanceManager::Options MaintenanceManager::kDefaultOptions = { }; MaintenanceManager::MaintenanceManager(const Options& options, - std::string server_uuid) + string server_uuid) : server_uuid_(std::move(server_uuid)), num_threads_(options.num_threads <= 0 ? FLAGS_maintenance_manager_num_threads : options.num_threads), @@ -264,8 +293,7 @@ void MaintenanceManager::RunSchedulerThread() { // 1) there are no free threads available to perform a maintenance op. // or 2) we just tried to schedule an op but found nothing to run. // However, if it's time to shut down, we want to do so immediately. - while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || disabled_for_tests()) && - !shutdown_) { + while (CouldNotLaunchNewOp(prev_iter_found_no_work)) { cond_.WaitFor(polling_interval); prev_iter_found_no_work = false; } @@ -274,42 +302,47 @@ void MaintenanceManager::RunSchedulerThread() { return; } - // Find the best op. - auto best_op_and_why = FindBestOp(); - auto* op = best_op_and_why.first; - const auto& note = best_op_and_why.second; + // TODO(yingchun): move it to SetFlag, callback once as a gflags setter handler. + UpdateTablePriorities(); // If we found no work to do, then we should sleep before trying again to schedule. // Otherwise, we can go right into trying to find the next op. - prev_iter_found_no_work = (op == nullptr); - if (!op) { - VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) - << "No maintenance operations look worth doing."; - continue; - } + prev_iter_found_no_work = !FindAndLaunchOp(&guard); + } +} - // Prepare the maintenance operation. - op->running_++; - running_ops_++; - guard.unlock(); - bool ready = op->Prepare(); - guard.lock(); - if (!ready) { - LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name() - << ". Re-running scheduler."; - op->running_--; - running_ops_--; - op->cond_->Signal(); - continue; - } +bool MaintenanceManager::FindAndLaunchOp(std::unique_lock* guard) { + // Find the best op. + auto best_op_and_why = FindBestOp(); + auto* op = best_op_and_why.first; + const auto& note = best_op_and_why.second; + + if (!op) { + VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) + << "No maintenance operations look worth doing."; + return false; + } - LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO) - << Substitute("Scheduling $0: $1", op->name(), note); - // Run the maintenance operation. - Status s = thread_pool_->SubmitFunc(boost::bind( - &MaintenanceManager::LaunchOp, this, op)); - CHECK(s.ok()); + // Prepare the maintenance operation. + IncreaseOpCount(op); + guard->unlock(); + bool ready = op->Prepare(); + guard->lock(); + if (!ready) { + LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name() + << ". Re-running scheduler."; + DecreaseOpCount(op); + op->cond_->Signal(); + return true; } + + LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO) + << Substitute("Scheduling $0: $1", op->name(), note); + // Run the maintenance operation. + Status s = thread_pool_->SubmitFunc(boost::bind(&MaintenanceManager::LaunchOp, this, op)); + CHECK(s.ok()); + + return true; } // Finding the best operation goes through four filters: @@ -335,8 +368,7 @@ void MaintenanceManager::RunSchedulerThread() { pair MaintenanceManager::FindBestOp() { TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp"); - size_t free_threads = num_threads_ - running_ops_; - if (free_threads == 0) { + if (!HasFreeThreads()) { return {nullptr, "no free threads"}; } @@ -367,8 +399,8 @@ pair MaintenanceManager::FindBestOp() { } const auto logs_retained_bytes = stats.logs_retained_bytes(); - if (logs_retained_bytes > low_io_most_logs_retained_bytes && - op->io_usage() == MaintenanceOp::LOW_IO_USAGE) { + if (op->io_usage() == MaintenanceOp::LOW_IO_USAGE && + logs_retained_bytes > low_io_most_logs_retained_bytes) { low_io_most_logs_retained_bytes_op = op; low_io_most_logs_retained_bytes = logs_retained_bytes; VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2) @@ -401,7 +433,7 @@ pair MaintenanceManager::FindBestOp() { op->name(), data_retained_bytes); } - const auto perf_improvement = stats.perf_improvement(); + const auto perf_improvement = PerfImprovement(stats.perf_improvement(), op->table_id()); if ((!best_perf_improvement_op) || (perf_improvement > best_perf_improvement)) { best_perf_improvement_op = op; @@ -420,7 +452,7 @@ pair MaintenanceManager::FindBestOp() { double capacity_pct; if (memory_pressure_func_(&capacity_pct)) { if (!most_ram_anchored_op) { - std::string msg = StringPrintf("System under memory pressure " + string msg = StringPrintf("System under memory pressure " "(%.2f%% of limit used). However, there are no ops currently " "runnable which would free memory.", capacity_pct); KLOG_EVERY_N_SECS(WARNING, 5) << msg; @@ -456,6 +488,16 @@ pair MaintenanceManager::FindBestOp() { return {nullptr, "no ops with positive improvement"}; } +double MaintenanceManager::PerfImprovement(double perf_improvement, + const string& table_id) const { + int32_t priority = 0; + if (!FindCopy(table_priorities_, table_id, &priority)) { + return perf_improvement; + } + + return perf_improvement * std::pow(FLAGS_maintenance_op_multiplier, priority); +} + void MaintenanceManager::LaunchOp(MaintenanceOp* op) { int64_t thread_id = Thread::CurrentThreadId(); OpInstance op_instance; @@ -481,9 +523,7 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) { completed_ops_count_++; op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds()); - - running_ops_--; - op->running_--; + DecreaseOpCount(op); op->cond_->Signal(); cond_.Signal(); // Wake up scheduler. }); @@ -507,9 +547,6 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) { void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) { DCHECK(out_pb != nullptr); std::lock_guard guard(lock_); - auto best_op_and_why = FindBestOp(); - auto* best_op = best_op_and_why.first; - for (const auto& val : ops_) { MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations(); MaintenanceOp* op(val.first); @@ -527,10 +564,6 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu op_pb->set_logs_retained_bytes(0); op_pb->set_perf_improvement(0.0); } - - if (best_op == op) { - out_pb->mutable_best_op()->CopyFrom(*op_pb); - } } { @@ -540,8 +573,9 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu } } + // The latest completed op will be dumped at first. for (int n = 1; n <= completed_ops_.size(); n++) { - int i = completed_ops_count_ - n; + int64_t i = completed_ops_count_ - n; if (i < 0) break; const auto& completed_op = completed_ops_[i % completed_ops_.size()]; @@ -551,8 +585,50 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu } } -std::string MaintenanceManager::LogPrefix() const { +string MaintenanceManager::LogPrefix() const { return Substitute("P $0: ", server_uuid_); } +bool MaintenanceManager::HasFreeThreads() { + return num_threads_ - running_ops_ > 0; +} + +bool MaintenanceManager::CouldNotLaunchNewOp(bool prev_iter_found_no_work) { + return (!HasFreeThreads() || prev_iter_found_no_work || disabled_for_tests()) && !shutdown_; +} + +void MaintenanceManager::UpdateTablePriorities() { + string table_priorities_str; + if (!google::GetCommandLineOption("maintenance_manager_table_priorities", + &table_priorities_str)) { + return; + } + + TablePriorities table_priorities; + int32_t value; + for (auto table_priority_str : Split(table_priorities_str, ";", strings::SkipEmpty())) { + vector table_priority = Split(table_priority_str, ":", strings::SkipEmpty()); + if (safe_strto32_base(table_priority[1].c_str(), &value, 10)) { + value = std::max(value, -FLAGS_max_priority_range); + value = std::min(value, FLAGS_max_priority_range); + table_priorities[table_priority[0]] = value; + } else { + LOG(WARNING) << "Some error occured when parse flag maintenance_manager_table_priorities: " + << table_priorities_str; + return; + } + } + table_priorities_.swap(table_priorities); +} + +void MaintenanceManager::IncreaseOpCount(MaintenanceOp *op) { + op->running_++; + running_ops_++; +} + +void MaintenanceManager::DecreaseOpCount(MaintenanceOp *op) { + op->running_--; + running_ops_--; +} + } // namespace kudu diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h index c5dd8ac83c..e3452a2f46 100644 --- a/src/kudu/util/maintenance_manager.h +++ b/src/kudu/util/maintenance_manager.h @@ -230,6 +230,8 @@ class MaintenanceOp { } private: + virtual const std::string& table_id() const = 0; + DISALLOW_COPY_AND_ASSIGN(MaintenanceOp); // The name of the operation. Op names must be unique. @@ -302,8 +304,11 @@ class MaintenanceManager : public std::enable_shared_from_this OpMapTy; + typedef std::unordered_map TablePriorities; // Return true if tests have currently disabled the maintenance // manager by way of changing the gflags at runtime. @@ -311,17 +316,32 @@ class MaintenanceManager : public std::enable_shared_from_this* guard); + // Find the best op, or null if there is nothing we want to run. // // Returns the op, as well as a string explanation of why that op was chosen, // suitable for logging. std::pair FindBestOp(); + double PerfImprovement(double perf_improvement, + const std::string& table_id) const; + void LaunchOp(MaintenanceOp* op); std::string LogPrefix() const; + bool HasFreeThreads(); + + bool CouldNotLaunchNewOp(bool prev_iter_found_no_work); + + void UpdateTablePriorities(); + + void IncreaseOpCount(MaintenanceOp *op); + void DecreaseOpCount(MaintenanceOp *op); + const std::string server_uuid_; + TablePriorities table_priorities_; const int32_t num_threads_; OpMapTy ops_; // Registered operations. Mutex lock_; @@ -330,7 +350,7 @@ class MaintenanceManager : public std::enable_shared_from_this completed_ops_; diff --git a/src/kudu/util/maintenance_manager.proto b/src/kudu/util/maintenance_manager.proto index b6b1203908..77c8cf8fd1 100644 --- a/src/kudu/util/maintenance_manager.proto +++ b/src/kudu/util/maintenance_manager.proto @@ -26,7 +26,7 @@ message MaintenanceManagerStatusPB { // Number of times this operation is currently running. required uint32 running = 2; required bool runnable = 3; - required uint64 ram_anchored_bytes = 4; + required int64 ram_anchored_bytes = 4; required int64 logs_retained_bytes = 5; required double perf_improvement = 6; } @@ -40,15 +40,12 @@ message MaintenanceManagerStatusPB { required int32 millis_since_start = 4; } - // The next operation that would run. - optional MaintenanceOpPB best_op = 1; - // List of all the operations. - repeated MaintenanceOpPB registered_operations = 2; + repeated MaintenanceOpPB registered_operations = 1; // This list isn't in order of anything. Can contain the same operation multiple times. - repeated OpInstancePB running_operations = 3; + repeated OpInstancePB running_operations = 2; // This list isn't in order of anything. Can contain the same operation multiple times. - repeated OpInstancePB completed_operations = 4; + repeated OpInstancePB completed_operations = 3; } diff --git a/src/kudu/util/yamlreader-test.cc b/src/kudu/util/yamlreader-test.cc new file mode 100644 index 0000000000..8394cf4440 --- /dev/null +++ b/src/kudu/util/yamlreader-test.cc @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/yamlreader.h" + +#include +#include +#include + +#include +#include +#include + +#include "kudu/util/env.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::unique_ptr; +using std::vector; + +namespace kudu { + +class YamlReaderTest : public KuduTest { +public: + Status GenerateYamlReader(const string& content, unique_ptr* result) { + string fname = GetTestPath("YamlReaderTest.json"); + unique_ptr writable_file; + RETURN_NOT_OK(env_->NewWritableFile(fname, &writable_file)); + RETURN_NOT_OK(writable_file->Append(Slice(content))); + RETURN_NOT_OK(writable_file->Close()); + result->reset(new YamlReader(fname)); + return Status::OK(); + } +}; + +TEST_F(YamlReaderTest, FileNotExist) { + YamlReader r("YamlReaderTest.NotExist"); + Status s = r.Init(); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_STR_CONTAINS( + s.ToString(), "YAML::LoadFile error"); +} + +TEST_F(YamlReaderTest, Corruption) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("foo", &r)); + ASSERT_OK(r->Init()); + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "foo", &val).IsCorruption()); +} + +TEST_F(YamlReaderTest, EmptyFile) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("", &r)); + ASSERT_OK(r->Init()); + + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "foo", &val).IsCorruption()); +} + +TEST_F(YamlReaderTest, KeyNotExist) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("foo: 1", &r)); + ASSERT_OK(r->Init()); + + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(r->node(), "bar", &val).IsNotFound()); +} + +TEST_F(YamlReaderTest, Scalar) { + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("bool_val: false", &r)); + ASSERT_OK(r->Init()); + bool val = true; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "bool_val", &val)); + ASSERT_EQ(val, false); + } + + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("int_val: 123", &r)); + ASSERT_OK(r->Init()); + int val = 0; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "int_val", &val)); + ASSERT_EQ(val, 123); + } + + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("double_val: 123.456", &r)); + ASSERT_OK(r->Init()); + double val = 0.0; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "double_val", &val)); + ASSERT_EQ(val, 123.456); + } + + { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("string_val: hello yaml", &r)); + ASSERT_OK(r->Init()); + string val; + ASSERT_OK(YamlReader::ExtractScalar(r->node(), "string_val", &val)); + ASSERT_EQ(val, "hello yaml"); + } +} + +TEST_F(YamlReaderTest, Map) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader( + "map_val: { key1: hello yaml, key2: 123.456 , key3: 123, key4: false}", &r)); + ASSERT_OK(r->Init()); + + YAML::Node node; + ASSERT_OK(YamlReader::ExtractMap(r->node(), "map_val", &node)); + + { + string val; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key1", &val)); + ASSERT_EQ(val, "hello yaml"); + } + + { + double val = 0.0; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key2", &val)); + ASSERT_EQ(val, 123.456); + } + + { + int val = 0; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key3", &val)); + ASSERT_EQ(val, 123); + } + + { + bool val = true; + ASSERT_OK(YamlReader::ExtractScalar(&node, "key4", &val)); + ASSERT_EQ(val, false); + } + + // Not exist key. + { + int val = 0; + ASSERT_TRUE(YamlReader::ExtractScalar(&node, "key5", &val).IsNotFound()); + } +} + +TEST_F(YamlReaderTest, Array) { + unique_ptr r; + ASSERT_OK(GenerateYamlReader("list_val: [1, 3, 5, 7, 9]", &r)); + ASSERT_OK(r->Init()); + const std::vector& expect_vals = { 1, 3, 5, 7, 9 }; + + std::vector vals; + ASSERT_OK(YamlReader::ExtractArray(r->node(), "list_val", &vals)); + ASSERT_EQ(vals, expect_vals); +} + +} // namespace kudu diff --git a/src/kudu/util/yamlreader.cc b/src/kudu/util/yamlreader.cc new file mode 100644 index 0000000000..a3ee8e0d88 --- /dev/null +++ b/src/kudu/util/yamlreader.cc @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/yamlreader.h" + +#include +#include + +#include +// IWYU pragma: no_include +// IWYU pragma: no_include + +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" + +using std::string; +using strings::Substitute; +using YAML::Node; +using YAML::NodeType; + +namespace kudu { + +YamlReader::YamlReader(string filename) : filename_(std::move(filename)) {} + +Status YamlReader::Init() { + try { + node_ = YAML::LoadFile(filename_); + } catch (std::exception& e) { + return Status::Corruption(Substitute("YAML::LoadFile error: $0", e.what())); + } + + return Status::OK(); +} + +Status YamlReader::ExtractMap(const Node* node, + const string& field, + Node* result) { + CHECK(result); + Node val; + RETURN_NOT_OK(ExtractField(node, field, &val)); + if (PREDICT_FALSE(!val.IsMap())) { + return Status::Corruption(Substitute( + "wrong type during field extraction: expected map but got $0", + TypeToString(val.Type()))); + } + *result = val; + return Status::OK(); +} + +Status YamlReader::ExtractField(const Node* node, + const string& field, + Node* result) { + if (PREDICT_FALSE(!node->IsDefined() || !node->IsMap())) { + return Status::Corruption("node is not map type"); + } + try { + *result = (*node)[field]; + } catch (std::exception& e) { + return Status::NotFound(Substitute("parse field $0 error: $1", field, e.what())); + } + if (PREDICT_FALSE(!result->IsDefined())) { + return Status::Corruption("Missing field", field); + } + + return Status::OK(); +} + +const char* YamlReader::TypeToString(NodeType::value t) { + switch (t) { + case NodeType::Undefined: + return "undefined"; + case NodeType::Null: + return "null"; + case NodeType::Scalar: + return "scalar"; + case NodeType::Sequence: + return "sequence"; + case NodeType::Map: + return "map"; + default: + LOG(FATAL) << "unexpected type: " << t; + } + return ""; +} + +} // namespace kudu diff --git a/src/kudu/util/yamlreader.h b/src/kudu/util/yamlreader.h new file mode 100644 index 0000000000..2888f9fb3f --- /dev/null +++ b/src/kudu/util/yamlreader.h @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include + +#include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +// IWYU pragma: no_include +#include // IWYU pragma: keep + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/status.h" + +namespace kudu { + +// Wraps the YAML parsing functionality of YAML::Node. +// +// This class can read yaml content from a file, extract scalar, map and array type of values. +class YamlReader { + public: + explicit YamlReader(std::string filename); + ~YamlReader() = default; + + Status Init(); + + // Extractor methods. + // + // Look for a field with the name of 'field' in the given node. + // Return Status::OK if it can be found and extracted as the specified type, + // or return Status::NotFound if it cannot be found, or Status::Corruption if + // the node is not extractable or the field can not be extracted as the type. + + template + static Status ExtractScalar(const YAML::Node* node, + const std::string& field, + T* result); + + static Status ExtractMap(const YAML::Node* node, + const std::string& field, + YAML::Node* result); + + template + static Status ExtractArray(const YAML::Node* node, + const std::string& field, + std::vector* result); + + const YAML::Node* node() const { return &node_; } + + private: + static const char* TypeToString(YAML::NodeType::value t); + + static Status ExtractField(const YAML::Node* node, + const std::string& field, + YAML::Node* result); + + std::string filename_; + YAML::Node node_; + + DISALLOW_COPY_AND_ASSIGN(YamlReader); +}; + +template +Status YamlReader::ExtractScalar(const YAML::Node* node, + const std::string& field, + T* result) { + CHECK(result); + YAML::Node val; + RETURN_NOT_OK(ExtractField(node, field, &val)); + if (PREDICT_FALSE(!val.IsScalar())) { + return Status::Corruption(strings::Substitute( + "wrong type during field extraction: expected scalar but got $0", + TypeToString(val.Type()))); + } + *result = val.as(); + return Status::OK(); +} + +template +Status YamlReader::ExtractArray(const YAML::Node* node, + const std::string& field, + std::vector* result) { + CHECK(result); + YAML::Node val; + RETURN_NOT_OK(ExtractField(node, field, &val)); + if (PREDICT_FALSE(!val.IsSequence())) { + return Status::Corruption(strings::Substitute( + "wrong type during field extraction: expected sequence but got $0", + TypeToString(val.Type()))); + } + result->reserve(val.size()); + for (YAML::const_iterator iter = val.begin(); iter != val.end(); ++iter) { + try { + if (PREDICT_FALSE(!iter->IsScalar())) { + return Status::Corruption(strings::Substitute( + "wrong type during field extraction: expected scalar but got $0", + TypeToString(iter->Type()))); + } + result->push_back(iter->as()); + } catch (std::exception& e) { + return Status::Corruption(strings::Substitute("parse list element error: $0", e.what())); + } + } + return Status::OK(); +} + +} // namespace kudu diff --git a/thirdparty/build-definitions.sh b/thirdparty/build-definitions.sh index 2ae2f59ae5..609b7b4e77 100644 --- a/thirdparty/build-definitions.sh +++ b/thirdparty/build-definitions.sh @@ -919,3 +919,32 @@ build_bison() { make -j$PARALLEL $EXTRA_MAKEFLAGS install popd } + +build_yaml() { + YAML_SHARED_BDIR=$TP_BUILD_DIR/$YAML_NAME.shared$MODE_SUFFIX + YAML_STATIC_BDIR=$TP_BUILD_DIR/$YAML_NAME.static$MODE_SUFFIX + for SHARED in ON OFF; do + if [ $SHARED = "ON" ]; then + YAML_BDIR=$YAML_SHARED_BDIR + else + YAML_BDIR=$YAML_STATIC_BDIR + fi + mkdir -p $YAML_BDIR + pushd $YAML_BDIR + rm -rf CMakeCache.txt CMakeFiles/ + CFLAGS="$EXTRA_CFLAGS -fPIC" \ + CXXFLAGS="$EXTRA_CXXFLAGS -fPIC" \ + LDFLAGS="$EXTRA_LDFLAGS" \ + LIBS="$EXTRA_LIBS" \ + cmake \ + -DCMAKE_BUILD_TYPE=Release \ + -DYAML_CPP_BUILD_TESTS=OFF \ + -DYAML_CPP_BUILD_TOOLS=OFF \ + -DBUILD_SHARED_LIBS=$SHARED \ + -DCMAKE_INSTALL_PREFIX=$PREFIX \ + $EXTRA_CMAKE_FLAGS \ + $YAML_SOURCE + ${NINJA:-make} -j$PARALLEL $EXTRA_MAKEFLAGS install + popd + done +} diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 0fe31ca991..0c2d451896 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -100,6 +100,7 @@ else "hadoop") F_HADOOP=1 ;; "hive") F_HIVE=1 ;; "sentry") F_SENTRY=1 ;; + "yaml") F_YAML=1 ;; *) echo "Unknown module: $arg"; exit 1 ;; esac done @@ -371,6 +372,10 @@ if [ -n "$F_UNINSTRUMENTED" -o -n "$F_THRIFT" ]; then build_thrift fi +if [ -n "$F_UNINSTRUMENTED" -o -n "$F_YAML" ]; then + build_yaml +fi + restore_env # If we're on macOS best to exit here, otherwise single dependency builds will try to @@ -551,6 +556,10 @@ if [ -n "$F_TSAN" -o -n "$F_THRIFT" ]; then build_thrift fi +if [ -n "$F_TSAN" -o -n "$F_YAML" ]; then + build_yaml +fi + restore_env finish diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh index a6fa2b948e..7eeccdfd04 100755 --- a/thirdparty/download-thirdparty.sh +++ b/thirdparty/download-thirdparty.sh @@ -430,5 +430,11 @@ fetch_and_patch \ $SENTRY_SOURCE \ $SENTRY_PATCHLEVEL +YAML_PATCHLEVEL=0 +fetch_and_patch \ + $YAML_NAME.tar.gz \ + $YAML_SOURCE \ + $YAML_PATCHLEVEL + echo "---------------" echo "Thirdparty dependencies downloaded successfully" diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh index 01eea406dd..df92495aa3 100644 --- a/thirdparty/vars.sh +++ b/thirdparty/vars.sh @@ -234,3 +234,8 @@ HADOOP_SOURCE=$TP_SOURCE_DIR/$HADOOP_NAME SENTRY_VERSION=505b42e81a9d85c4ebe8db3f48ad7a6e824a5db5 SENTRY_NAME=sentry-$SENTRY_VERSION SENTRY_SOURCE=$TP_SOURCE_DIR/$SENTRY_NAME + +YAML_VERSION=0.6.2 +YAML_NAME=yaml-cpp-yaml-cpp-$YAML_VERSION +YAML_SOURCE=$TP_SOURCE_DIR/$YAML_NAME +