diff options
Diffstat (limited to 'sql')
7 files changed, 107 insertions, 9 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c326ac4cc1..8f98c8f447 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -96,6 +96,7 @@ private[hive] class HiveClientImpl( case hive.v1_0 => new Shim_v1_0() case hive.v1_1 => new Shim_v1_1() case hive.v1_2 => new Shim_v1_2() + case hive.v2_0 => new Shim_v2_0() } // Create an internal session state for this HiveClientImpl. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 9fe1c76d33..7280748361 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -833,3 +833,77 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { } } + +private[client] class Shim_v2_0 extends Shim_v1_2 { + private lazy val loadPartitionMethod = + findMethod( + classOf[Hive], + "loadPartition", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val loadTableMethod = + findMethod( + classOf[Hive], + "loadTable", + classOf[Path], + classOf[String], + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + JBoolean.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JLong.TYPE) + + override def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit = { + loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, + inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, + isSrcLocal: JBoolean, JBoolean.FALSE) + } + + override def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + isSrcLocal: Boolean): Unit = { + loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean, + JBoolean.FALSE, JBoolean.FALSE) + } + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + listBucketingEnabled: Boolean): Unit = { + loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, + numDP: JInteger, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index d2487a2c03..6f69a4adf2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -94,6 +94,7 @@ private[hive] object IsolatedClientLoader extends Logging { case "1.0" | "1.0.0" => hive.v1_0 case "1.1" | "1.1.0" => hive.v1_1 case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2 + case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0 } private def downloadVersion( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index 4e2193b6ab..790ad74e66 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive /** Support for interacting with different versions of the HiveMetastoreClient */ package object client { - private[hive] abstract class HiveVersion( + private[hive] sealed abstract class HiveVersion( val fullVersion: String, val extraDeps: Seq[String] = Nil, val exclusions: Seq[String] = Nil) @@ -62,6 +62,12 @@ package object client { "org.pentaho:pentaho-aggdesigner-algorithm", "net.hydromatic:linq4j", "net.hydromatic:quidem")) + + case object v2_0 extends HiveVersion("2.0.1", + exclusions = Seq("org.apache.curator:*", + "org.pentaho:pentaho-aggdesigner-algorithm")) + + val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0) } // scalastyle:on diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index f107149ada..3c57ee4c8b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -148,9 +148,16 @@ case class InsertIntoHiveTable( // We have to follow the Hive behavior here, to avoid troubles. For example, if we create // staging directory under the table director for Hive prior to 1.1, the staging directory will // be removed by Hive when Hive is trying to empty the table directory. - if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) { + val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) + val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0) + + // Ensure all the supported versions are considered here. + assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == + allSupportedHiveVersions) + + if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { oldVersionExternalTempPath(path, hadoopConf, scratchDir) - } else if (hiveVersion == v1_1 || hiveVersion == v1_2) { + } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { newVersionExternalTempPath(path, hadoopConf, stagingDir) } else { throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala index 591a968c82..e85ea5a594 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala @@ -35,22 +35,26 @@ private[client] class HiveClientBuilder { Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath)) } - private def buildConf() = { + private def buildConf(extraConf: Map[String, String]) = { lazy val warehousePath = Utils.createTempDir() lazy val metastorePath = Utils.createTempDir() metastorePath.delete() - Map( + extraConf ++ Map( "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true", "hive.metastore.warehouse.dir" -> warehousePath.toString) } - def buildClient(version: String, hadoopConf: Configuration): HiveClient = { + // for testing only + def buildClient( + version: String, + hadoopConf: Configuration, + extraConf: Map[String, String] = Map.empty): HiveClient = { IsolatedClientLoader.forVersion( hiveMetastoreVersion = version, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, hadoopConf = hadoopConf, - config = buildConf(), + config = buildConf(extraConf), ivyPath = ivyPath).createClient() } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 6feb277ca8..d61d10bf86 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -88,7 +88,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'") } - private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2") + private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0") private var client: HiveClient = null @@ -98,7 +98,12 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w System.gc() // Hack to avoid SEGV on some JVM versions. val hadoopConf = new Configuration() hadoopConf.set("test", "success") - client = buildClient(version, hadoopConf) + // Hive changed the default of datanucleus.schema.autoCreateAll from true to false since 2.0 + // For details, see the JIRA HIVE-6113 + if (version == "2.0") { + hadoopConf.set("datanucleus.schema.autoCreateAll", "true") + } + client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf)) } def table(database: String, tableName: String): CatalogTable = { |