From 6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 4 Sep 2016 15:04:33 +0800 Subject: [SPARK-17324][SQL] Remove Direct Usage of HiveClient in InsertIntoHiveTable ### What changes were proposed in this pull request? This is another step to get rid of HiveClient from `HiveSessionState`. All the metastore interactions should be through `ExternalCatalog` interface. However, the existing implementation of `InsertIntoHiveTable ` still requires Hive clients. This PR is to remove HiveClient by moving the metastore interactions into `ExternalCatalog`. ### How was this patch tested? Existing test cases Author: gatorsmile Closes #14888 from gatorsmile/removeClientFromInsertIntoHiveTable. --- .../spark/sql/hive/HiveExternalCatalog.scala | 44 ++++++++++++++-- .../apache/spark/sql/hive/client/HiveClient.scala | 8 +-- .../spark/sql/hive/client/HiveClientImpl.scala | 18 ++++--- .../sql/hive/execution/InsertIntoHiveTable.scala | 61 +++++++++------------- .../spark/sql/hive/client/VersionsSuite.scala | 12 ++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 38 ++++++++++++++ 6 files changed, 123 insertions(+), 58 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8541ae2322..1fe7f4d41d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -489,8 +489,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = withClient { + inheritTableSpecs: Boolean): Unit = withClient { requireTableExists(db, table) val orderedPartitionSpec = new util.LinkedHashMap[String, String]() @@ -500,12 +499,37 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.loadPartition( loadPath, - s"$db.$table", + db, + table, orderedPartitionSpec, isOverwrite, holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) + inheritTableSpecs) + } + + override def loadDynamicPartitions( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean): Unit = withClient { + requireTableExists(db, table) + + val orderedPartitionSpec = new util.LinkedHashMap[String, String]() + getTable(db, table).partitionColumnNames.foreach { colName => + orderedPartitionSpec.put(colName, partition(colName)) + } + + client.loadDynamicPartitions( + loadPath, + db, + table, + orderedPartitionSpec, + replace, + numDP, + holdDDLTime) } // -------------------------------------------------------------------------- @@ -553,6 +577,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.getPartition(db, table, spec) } + /** + * Returns the specified partition or None if it does not exist. + */ + override def getPartitionOption( + db: String, + table: String, + spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient { + client.getPartitionOption(db, table, spec) + } + /** * Returns the partition names from hive metastore for a given table in a database. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index dc74fa257a..984d23bb09 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -195,12 +195,12 @@ private[hive] trait HiveClient { /** Loads a static partition into an existing table. */ def loadPartition( loadPath: String, + dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering replace: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit + inheritTableSpecs: Boolean): Unit /** Loads data into an existing table. */ def loadTable( @@ -212,12 +212,12 @@ private[hive] trait HiveClient { /** Loads new dynamic partitions into an existing table. */ def loadDynamicPartitions( loadPath: String, + dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering replace: Boolean, numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit + holdDDLTime: Boolean): Unit /** Create a function in an existing database. */ def createFunction(db: String, func: CatalogFunction): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 54ec61abed..dd33d750a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -615,21 +615,22 @@ private[hive] class HiveClientImpl( def loadPartition( loadPath: String, + dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = withHiveState { + inheritTableSpecs: Boolean): Unit = withHiveState { + val hiveTable = client.getTable(dbName, tableName, true /* throw exception */) shim.loadPartition( client, new Path(loadPath), // TODO: Use URI - tableName, + s"$dbName.$tableName", partSpec, replace, holdDDLTime, inheritTableSpecs, - isSkewedStoreAsSubdir) + isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories) } def loadTable( @@ -647,21 +648,22 @@ private[hive] class HiveClientImpl( def loadDynamicPartitions( loadPath: String, + dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, numDP: Int, - holdDDLTime: Boolean, - listBucketingEnabled: Boolean): Unit = withHiveState { + holdDDLTime: Boolean): Unit = withHiveState { + val hiveTable = client.getTable(dbName, tableName, true /* throw exception */) shim.loadDynamicPartitions( client, new Path(loadPath), - tableName, + s"$dbName.$tableName", partSpec, replace, numDP, holdDDLTime, - listBucketingEnabled) + listBucketingEnabled = hiveTable.isStoredAsSubDirectories) } override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index eb0c31ced6..7eec9c787c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -51,7 +51,7 @@ case class InsertIntoHiveTable( ifNotExists: Boolean) extends UnaryExecNode { @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - @transient private val client = sessionState.metadataHive + @transient private val externalCatalog = sqlContext.sharedState.externalCatalog def output: Seq[Attribute] = Seq.empty @@ -240,54 +240,45 @@ case class InsertIntoHiveTable( // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { - - // loadPartition call orders directories created on the iteration order of the this map - val orderedPartitionSpec = new util.LinkedHashMap[String, String]() - table.hiveQlTable.getPartCols.asScala.foreach { entry => - orderedPartitionSpec.put(entry.getName, partitionSpec.getOrElse(entry.getName, "")) - } - - // inheritTableSpecs is set to true. It should be set to false for an IMPORT query - // which is currently considered as a Hive native command. - val inheritTableSpecs = true - // TODO: Correctly set isSkewedStoreAsSubdir. - val isSkewedStoreAsSubdir = false if (numDynamicPartitions > 0) { - client.synchronized { - client.loadDynamicPartitions( - outputPath.toString, - table.catalogTable.qualifiedName, - orderedPartitionSpec, - overwrite, - numDynamicPartitions, - holdDDLTime, - isSkewedStoreAsSubdir) - } + externalCatalog.loadDynamicPartitions( + db = table.catalogTable.database, + table = table.catalogTable.identifier.table, + outputPath.toString, + partitionSpec, + overwrite, + numDynamicPartitions, + holdDDLTime = holdDDLTime) } else { // scalastyle:off // ifNotExists is only valid with static partition, refer to // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries // scalastyle:on val oldPart = - client.getPartitionOption( - table.catalogTable, + externalCatalog.getPartitionOption( + table.catalogTable.database, + table.catalogTable.identifier.table, partitionSpec) if (oldPart.isEmpty || !ifNotExists) { - client.loadPartition( - outputPath.toString, - table.catalogTable.qualifiedName, - orderedPartitionSpec, - overwrite, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) + // inheritTableSpecs is set to true. It should be set to false for an IMPORT query + // which is currently considered as a Hive native command. + val inheritTableSpecs = true + externalCatalog.loadPartition( + table.catalogTable.database, + table.catalogTable.identifier.table, + outputPath.toString, + partitionSpec, + isOverwrite = overwrite, + holdDDLTime = holdDDLTime, + inheritTableSpecs = inheritTableSpecs) } } } else { - client.loadTable( + externalCatalog.loadTable( + table.catalogTable.database, + table.catalogTable.identifier.table, outputPath.toString, // TODO: URI - table.catalogTable.qualifiedName, overwrite, holdDDLTime) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 10b6cd1024..9a10957c8e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -337,12 +337,12 @@ class VersionsSuite extends SparkFunSuite with Logging { client.loadPartition( emptyDir, - "default.src_part", + "default", + "src_part", partSpec, replace = false, holdDDLTime = false, - inheritTableSpecs = false, - isSkewedStoreAsSubdir = false) + inheritTableSpecs = false) } test(s"$version: loadDynamicPartitions") { @@ -352,12 +352,12 @@ class VersionsSuite extends SparkFunSuite with Logging { client.loadDynamicPartitions( emptyDir, - "default.src_part", + "default", + "src_part", partSpec, replace = false, numDP = 1, - false, - false) + holdDDLTime = false) } test(s"$version: renamePartitions") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 7f3d96de85..eff32805bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -932,6 +932,44 @@ class HiveDDLSuite } } + test("insert skewed table") { + val tabName = "tab1" + withTable(tabName) { + // Spark SQL does not support creating skewed table. Thus, we have to use Hive client. + val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + client.runSqlHive( + s""" + |CREATE Table $tabName(col1 int, col2 int) + |PARTITIONED BY (part1 string, part2 string) + |SKEWED BY (col1) ON (3, 4) STORED AS DIRECTORIES + """.stripMargin) + val hiveTable = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) + + assert(hiveTable.unsupportedFeatures.contains("skewed columns")) + + // Call loadDynamicPartitions against a skewed table with enabling list bucketing + sql( + s""" + |INSERT OVERWRITE TABLE $tabName + |PARTITION (part1='a', part2) + |SELECT 3, 4, 'b' + """.stripMargin) + + // Call loadPartitions against a skewed table with enabling list bucketing + sql( + s""" + |INSERT INTO TABLE $tabName + |PARTITION (part1='a', part2='b') + |SELECT 1, 2 + """.stripMargin) + + checkAnswer( + sql(s"SELECT * from $tabName"), + Row(3, 4, "a", "b") :: Row(1, 2, "a", "b") :: Nil) + } + } + test("desc table for data source table - no user-defined schema") { Seq("parquet", "json", "orc").foreach { fileFormat => withTable("t1") { -- cgit v1.2.3