aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala74
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala9
7 files changed, 107 insertions, 9 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c326ac4cc1..8f98c8f447 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -96,6 +96,7 @@ private[hive] class HiveClientImpl(
case hive.v1_0 => new Shim_v1_0()
case hive.v1_1 => new Shim_v1_1()
case hive.v1_2 => new Shim_v1_2()
+ case hive.v2_0 => new Shim_v2_0()
}
// Create an internal session state for this HiveClientImpl.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9fe1c76d33..7280748361 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -833,3 +833,77 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
}
}
+
+private[client] class Shim_v2_0 extends Shim_v1_2 {
+ private lazy val loadPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "loadPartition",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JLong.TYPE)
+
+ override def loadPartition(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, JBoolean.FALSE)
+ }
+
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
+ JBoolean.FALSE, JBoolean.FALSE)
+ }
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ listBucketingEnabled: Boolean): Unit = {
+ loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ numDP: JInteger, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong)
+ }
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index d2487a2c03..6f69a4adf2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -94,6 +94,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.0" | "1.0.0" => hive.v1_0
case "1.1" | "1.1.0" => hive.v1_1
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
+ case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
}
private def downloadVersion(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 4e2193b6ab..790ad74e66 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
/** Support for interacting with different versions of the HiveMetastoreClient */
package object client {
- private[hive] abstract class HiveVersion(
+ private[hive] sealed abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil)
@@ -62,6 +62,12 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"net.hydromatic:linq4j",
"net.hydromatic:quidem"))
+
+ case object v2_0 extends HiveVersion("2.0.1",
+ exclusions = Seq("org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0)
}
// scalastyle:on
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f107149ada..3c57ee4c8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -148,9 +148,16 @@ case class InsertIntoHiveTable(
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
- if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
+ val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
+ val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0)
+
+ // Ensure all the supported versions are considered here.
+ assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
+ allSupportedHiveVersions)
+
+ if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
- } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
+ } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
index 591a968c82..e85ea5a594 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
@@ -35,22 +35,26 @@ private[client] class HiveClientBuilder {
Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
}
- private def buildConf() = {
+ private def buildConf(extraConf: Map[String, String]) = {
lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = Utils.createTempDir()
metastorePath.delete()
- Map(
+ extraConf ++ Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
"hive.metastore.warehouse.dir" -> warehousePath.toString)
}
- def buildClient(version: String, hadoopConf: Configuration): HiveClient = {
+ // for testing only
+ def buildClient(
+ version: String,
+ hadoopConf: Configuration,
+ extraConf: Map[String, String] = Map.empty): HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
- config = buildConf(),
+ config = buildConf(extraConf),
ivyPath = ivyPath).createClient()
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 6feb277ca8..d61d10bf86 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -88,7 +88,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
- private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2")
+ private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0")
private var client: HiveClient = null
@@ -98,7 +98,12 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
System.gc() // Hack to avoid SEGV on some JVM versions.
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
- client = buildClient(version, hadoopConf)
+ // Hive changed the default of datanucleus.schema.autoCreateAll from true to false since 2.0
+ // For details, see the JIRA HIVE-6113
+ if (version == "2.0") {
+ hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+ }
+ client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
}
def table(database: String, tableName: String): CatalogTable = {