aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-03-03 16:59:52 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-03 16:59:52 -0800
commitf5fdbe04369d2c04972b7d9686d0c6c046f3d3dc (patch)
treeab4c29ce6ed77e90310eff843da577b0224e67c1 /sql
parent44281ca81d4eda02b627ba21841108438b7d1c27 (diff)
downloadspark-f5fdbe04369d2c04972b7d9686d0c6c046f3d3dc.tar.gz
spark-f5fdbe04369d2c04972b7d9686d0c6c046f3d3dc.tar.bz2
spark-f5fdbe04369d2c04972b7d9686d0c6c046f3d3dc.zip
[SPARK-13446][SQL] Support reading data from Hive 2.0.1 metastore
### What changes were proposed in this pull request? This PR is to make Spark work with Hive 2.0's metastores. Compared with Hive 1.2, Hive 2.0's metastore has an API update due to removal of `HOLD_DDLTIME` in https://issues.apache.org/jira/browse/HIVE-12224. Based on the following Hive JIRA description, `HOLD_DDLTIME` should be removed from our internal API too. (https://github.com/apache/spark/pull/17063 was submitted for it): > This arcane feature was introduced long ago via HIVE-1394 It was broken as soon as it landed, HIVE-1442 and is thus useless. Fact that no one has fixed it since informs that its not really used by anyone. Better is to remove it so no one hits the bug of HIVE-1442 In the next PR, we will support 2.1.0 metastore, whose APIs were changed due to https://issues.apache.org/jira/browse/HIVE-12730. However, before that, we need a code cleanup for stats collection and setting. ### How was this patch tested? Added test cases to VersionsSuite.scala Author: Xiao Li <gatorsmile@gmail.com> Closes #17061 from gatorsmile/Hive2.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala74
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala9
7 files changed, 107 insertions, 9 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c326ac4cc1..8f98c8f447 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -96,6 +96,7 @@ private[hive] class HiveClientImpl(
case hive.v1_0 => new Shim_v1_0()
case hive.v1_1 => new Shim_v1_1()
case hive.v1_2 => new Shim_v1_2()
+ case hive.v2_0 => new Shim_v2_0()
}
// Create an internal session state for this HiveClientImpl.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9fe1c76d33..7280748361 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -833,3 +833,77 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
}
}
+
+private[client] class Shim_v2_0 extends Shim_v1_2 {
+ private lazy val loadPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "loadPartition",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JLong.TYPE)
+
+ override def loadPartition(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, JBoolean.FALSE)
+ }
+
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
+ JBoolean.FALSE, JBoolean.FALSE)
+ }
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ listBucketingEnabled: Boolean): Unit = {
+ loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ numDP: JInteger, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong)
+ }
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index d2487a2c03..6f69a4adf2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -94,6 +94,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.0" | "1.0.0" => hive.v1_0
case "1.1" | "1.1.0" => hive.v1_1
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
+ case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
}
private def downloadVersion(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 4e2193b6ab..790ad74e66 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
/** Support for interacting with different versions of the HiveMetastoreClient */
package object client {
- private[hive] abstract class HiveVersion(
+ private[hive] sealed abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil)
@@ -62,6 +62,12 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"net.hydromatic:linq4j",
"net.hydromatic:quidem"))
+
+ case object v2_0 extends HiveVersion("2.0.1",
+ exclusions = Seq("org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0)
}
// scalastyle:on
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f107149ada..3c57ee4c8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -148,9 +148,16 @@ case class InsertIntoHiveTable(
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
- if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
+ val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
+ val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0)
+
+ // Ensure all the supported versions are considered here.
+ assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
+ allSupportedHiveVersions)
+
+ if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
- } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
+ } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
index 591a968c82..e85ea5a594 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
@@ -35,22 +35,26 @@ private[client] class HiveClientBuilder {
Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
}
- private def buildConf() = {
+ private def buildConf(extraConf: Map[String, String]) = {
lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = Utils.createTempDir()
metastorePath.delete()
- Map(
+ extraConf ++ Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
"hive.metastore.warehouse.dir" -> warehousePath.toString)
}
- def buildClient(version: String, hadoopConf: Configuration): HiveClient = {
+ // for testing only
+ def buildClient(
+ version: String,
+ hadoopConf: Configuration,
+ extraConf: Map[String, String] = Map.empty): HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
- config = buildConf(),
+ config = buildConf(extraConf),
ivyPath = ivyPath).createClient()
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 6feb277ca8..d61d10bf86 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -88,7 +88,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
- private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2")
+ private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0")
private var client: HiveClient = null
@@ -98,7 +98,12 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
System.gc() // Hack to avoid SEGV on some JVM versions.
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
- client = buildClient(version, hadoopConf)
+ // Hive changed the default of datanucleus.schema.autoCreateAll from true to false since 2.0
+ // For details, see the JIRA HIVE-6113
+ if (version == "2.0") {
+ hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+ }
+ client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
}
def table(database: String, tableName: String): CatalogTable = {