aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-04 15:04:33 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-04 15:04:33 +0800
commit6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17 (patch)
tree593ff90402b847fe4ed225e961d4d34e506eb62b /sql/hive
parente9b58e9ef89a9118b6d5a466d10db8e30d61f850 (diff)
downloadspark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.tar.gz
spark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.tar.bz2
spark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.zip
[SPARK-17324][SQL] Remove Direct Usage of HiveClient in InsertIntoHiveTable
### What changes were proposed in this pull request? This is another step to get rid of HiveClient from `HiveSessionState`. All the metastore interactions should be through `ExternalCatalog` interface. However, the existing implementation of `InsertIntoHiveTable ` still requires Hive clients. This PR is to remove HiveClient by moving the metastore interactions into `ExternalCatalog`. ### How was this patch tested? Existing test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #14888 from gatorsmile/removeClientFromInsertIntoHiveTable.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala44
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala61
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala38
6 files changed, 123 insertions, 58 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8541ae2322..1fe7f4d41d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -489,8 +489,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = withClient {
+ inheritTableSpecs: Boolean): Unit = withClient {
requireTableExists(db, table)
val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
@@ -500,12 +499,37 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.loadPartition(
loadPath,
- s"$db.$table",
+ db,
+ table,
orderedPartitionSpec,
isOverwrite,
holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
+ inheritTableSpecs)
+ }
+
+ override def loadDynamicPartitions(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ replace: Boolean,
+ numDP: Int,
+ holdDDLTime: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+
+ val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
+ getTable(db, table).partitionColumnNames.foreach { colName =>
+ orderedPartitionSpec.put(colName, partition(colName))
+ }
+
+ client.loadDynamicPartitions(
+ loadPath,
+ db,
+ table,
+ orderedPartitionSpec,
+ replace,
+ numDP,
+ holdDDLTime)
}
// --------------------------------------------------------------------------
@@ -554,6 +578,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
/**
+ * Returns the specified partition or None if it does not exist.
+ */
+ override def getPartitionOption(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
+ client.getPartitionOption(db, table, spec)
+ }
+
+ /**
* Returns the partition names from hive metastore for a given table in a database.
*/
override def listPartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index dc74fa257a..984d23bb09 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -195,12 +195,12 @@ private[hive] trait HiveClient {
/** Loads a static partition into an existing table. */
def loadPartition(
loadPath: String,
+ dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit
+ inheritTableSpecs: Boolean): Unit
/** Loads data into an existing table. */
def loadTable(
@@ -212,12 +212,12 @@ private[hive] trait HiveClient {
/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
loadPath: String,
+ dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit
+ holdDDLTime: Boolean): Unit
/** Create a function in an existing database. */
def createFunction(db: String, func: CatalogFunction): Unit
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 54ec61abed..dd33d750a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -615,21 +615,22 @@ private[hive] class HiveClientImpl(
def loadPartition(
loadPath: String,
+ dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = withHiveState {
+ inheritTableSpecs: Boolean): Unit = withHiveState {
+ val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
shim.loadPartition(
client,
new Path(loadPath), // TODO: Use URI
- tableName,
+ s"$dbName.$tableName",
partSpec,
replace,
holdDDLTime,
inheritTableSpecs,
- isSkewedStoreAsSubdir)
+ isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
}
def loadTable(
@@ -647,21 +648,22 @@ private[hive] class HiveClientImpl(
def loadDynamicPartitions(
loadPath: String,
+ dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit = withHiveState {
+ holdDDLTime: Boolean): Unit = withHiveState {
+ val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
shim.loadDynamicPartitions(
client,
new Path(loadPath),
- tableName,
+ s"$dbName.$tableName",
partSpec,
replace,
numDP,
holdDDLTime,
- listBucketingEnabled)
+ listBucketingEnabled = hiveTable.isStoredAsSubDirectories)
}
override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index eb0c31ced6..7eec9c787c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -51,7 +51,7 @@ case class InsertIntoHiveTable(
ifNotExists: Boolean) extends UnaryExecNode {
@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
- @transient private val client = sessionState.metadataHive
+ @transient private val externalCatalog = sqlContext.sharedState.externalCatalog
def output: Seq[Attribute] = Seq.empty
@@ -240,54 +240,45 @@ case class InsertIntoHiveTable(
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {
-
- // loadPartition call orders directories created on the iteration order of the this map
- val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
- table.hiveQlTable.getPartCols.asScala.foreach { entry =>
- orderedPartitionSpec.put(entry.getName, partitionSpec.getOrElse(entry.getName, ""))
- }
-
- // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
- // which is currently considered as a Hive native command.
- val inheritTableSpecs = true
- // TODO: Correctly set isSkewedStoreAsSubdir.
- val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
- client.synchronized {
- client.loadDynamicPartitions(
- outputPath.toString,
- table.catalogTable.qualifiedName,
- orderedPartitionSpec,
- overwrite,
- numDynamicPartitions,
- holdDDLTime,
- isSkewedStoreAsSubdir)
- }
+ externalCatalog.loadDynamicPartitions(
+ db = table.catalogTable.database,
+ table = table.catalogTable.identifier.table,
+ outputPath.toString,
+ partitionSpec,
+ overwrite,
+ numDynamicPartitions,
+ holdDDLTime = holdDDLTime)
} else {
// scalastyle:off
// ifNotExists is only valid with static partition, refer to
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
// scalastyle:on
val oldPart =
- client.getPartitionOption(
- table.catalogTable,
+ externalCatalog.getPartitionOption(
+ table.catalogTable.database,
+ table.catalogTable.identifier.table,
partitionSpec)
if (oldPart.isEmpty || !ifNotExists) {
- client.loadPartition(
- outputPath.toString,
- table.catalogTable.qualifiedName,
- orderedPartitionSpec,
- overwrite,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
+ // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
+ // which is currently considered as a Hive native command.
+ val inheritTableSpecs = true
+ externalCatalog.loadPartition(
+ table.catalogTable.database,
+ table.catalogTable.identifier.table,
+ outputPath.toString,
+ partitionSpec,
+ isOverwrite = overwrite,
+ holdDDLTime = holdDDLTime,
+ inheritTableSpecs = inheritTableSpecs)
}
}
} else {
- client.loadTable(
+ externalCatalog.loadTable(
+ table.catalogTable.database,
+ table.catalogTable.identifier.table,
outputPath.toString, // TODO: URI
- table.catalogTable.qualifiedName,
overwrite,
holdDDLTime)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 10b6cd1024..9a10957c8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -337,12 +337,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.loadPartition(
emptyDir,
- "default.src_part",
+ "default",
+ "src_part",
partSpec,
replace = false,
holdDDLTime = false,
- inheritTableSpecs = false,
- isSkewedStoreAsSubdir = false)
+ inheritTableSpecs = false)
}
test(s"$version: loadDynamicPartitions") {
@@ -352,12 +352,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.loadDynamicPartitions(
emptyDir,
- "default.src_part",
+ "default",
+ "src_part",
partSpec,
replace = false,
numDP = 1,
- false,
- false)
+ holdDDLTime = false)
}
test(s"$version: renamePartitions") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 7f3d96de85..eff32805bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -932,6 +932,44 @@ class HiveDDLSuite
}
}
+ test("insert skewed table") {
+ val tabName = "tab1"
+ withTable(tabName) {
+ // Spark SQL does not support creating skewed table. Thus, we have to use Hive client.
+ val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ client.runSqlHive(
+ s"""
+ |CREATE Table $tabName(col1 int, col2 int)
+ |PARTITIONED BY (part1 string, part2 string)
+ |SKEWED BY (col1) ON (3, 4) STORED AS DIRECTORIES
+ """.stripMargin)
+ val hiveTable =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
+
+ assert(hiveTable.unsupportedFeatures.contains("skewed columns"))
+
+ // Call loadDynamicPartitions against a skewed table with enabling list bucketing
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tabName
+ |PARTITION (part1='a', part2)
+ |SELECT 3, 4, 'b'
+ """.stripMargin)
+
+ // Call loadPartitions against a skewed table with enabling list bucketing
+ sql(
+ s"""
+ |INSERT INTO TABLE $tabName
+ |PARTITION (part1='a', part2='b')
+ |SELECT 1, 2
+ """.stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * from $tabName"),
+ Row(3, 4, "a", "b") :: Row(1, 2, "a", "b") :: Nil)
+ }
+ }
+
test("desc table for data source table - no user-defined schema") {
Seq("parquet", "json", "orc").foreach { fileFormat =>
withTable("t1") {