aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-29 11:53:17 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-29 11:53:17 -0700
commit3664ee25f0a67de5ba76e9487a55a55216ae589f (patch)
treecd5f635a0bba0cbf852ef5ad2d146d46015875df
parented413bcc78d8d97a1a0cd0871d7a20f7170476d0 (diff)
downloadspark-3664ee25f0a67de5ba76e9487a55a55216ae589f.tar.gz
spark-3664ee25f0a67de5ba76e9487a55a55216ae589f.tar.bz2
spark-3664ee25f0a67de5ba76e9487a55a55216ae589f.zip
[SPARK-8066, SPARK-8067] [hive] Add support for Hive 1.0, 1.1 and 1.2.
Allow HiveContext to connect to metastores of those versions; some new shims had to be added to account for changing internal APIs. A new test was added to exercise the "reset()" path which now also requires a shim; and the test code was changed to use a directory under the build's target to store ivy dependencies. Without that, at least I consistently run into issues with Ivy messing up (or being confused) by my existing caches. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7026 from vanzin/SPARK-8067 and squashes the following commits: 3e2e67b [Marcelo Vanzin] [SPARK-8066, SPARK-8067] [hive] Add support for Hive 1.0, 1.1 and 1.2.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala70
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala33
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala25
5 files changed, 131 insertions, 15 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 2f771d7679..4c708cec57 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -97,6 +97,9 @@ private[hive] class ClientWrapper(
case hive.v12 => new Shim_v0_12()
case hive.v13 => new Shim_v0_13()
case hive.v14 => new Shim_v0_14()
+ case hive.v1_0 => new Shim_v1_0()
+ case hive.v1_1 => new Shim_v1_1()
+ case hive.v1_2 => new Shim_v1_2()
}
// Create an internal session state for this ClientWrapper.
@@ -456,7 +459,7 @@ private[hive] class ClientWrapper(
logDebug(s"Deleting table $t")
val table = client.getTable("default", t)
client.getIndexes("default", t, 255).foreach { index =>
- client.dropIndex("default", t, index.getIndexName, true)
+ shim.dropIndex(client, "default", t, index.getIndexName)
}
if (!table.isIndexTable) {
client.dropTable("default", t)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index e7c1779f80..1fa9d278e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.client
-import java.lang.{Boolean => JBoolean, Integer => JInteger}
+import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
import java.lang.reflect.{Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
@@ -94,6 +94,8 @@ private[client] sealed abstract class Shim {
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit
+ def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
+
protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
val method = findMethod(klass, name, args: _*)
require(Modifier.isStatic(method.getModifiers()),
@@ -166,6 +168,14 @@ private[client] class Shim_v0_12 extends Shim {
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
+ private lazy val dropIndexMethod =
+ findMethod(
+ classOf[Hive],
+ "dropIndex",
+ classOf[String],
+ classOf[String],
+ classOf[String],
+ JBoolean.TYPE)
override def setCurrentSessionState(state: SessionState): Unit = {
// Starting from Hive 0.13, setCurrentSessionState will internally override
@@ -234,6 +244,10 @@ private[client] class Shim_v0_12 extends Shim {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean)
}
+ override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
+ dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
+ }
+
}
private[client] class Shim_v0_13 extends Shim_v0_12 {
@@ -379,3 +393,57 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}
}
+
+private[client] class Shim_v1_0 extends Shim_v0_14 {
+
+}
+
+private[client] class Shim_v1_1 extends Shim_v1_0 {
+
+ private lazy val dropIndexMethod =
+ findMethod(
+ classOf[Hive],
+ "dropIndex",
+ classOf[String],
+ classOf[String],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+
+ override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
+ dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean)
+ }
+
+}
+
+private[client] class Shim_v1_2 extends Shim_v1_1 {
+
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JLong.TYPE)
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ holdDDLTime: Boolean,
+ listBucketingEnabled: Boolean): Unit = {
+ loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE,
+ 0: JLong)
+ }
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 0934ad5034..3d609a66f3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -41,9 +41,11 @@ private[hive] object IsolatedClientLoader {
*/
def forVersion(
version: String,
- config: Map[String, String] = Map.empty): IsolatedClientLoader = synchronized {
+ config: Map[String, String] = Map.empty,
+ ivyPath: Option[String] = None): IsolatedClientLoader = synchronized {
val resolvedVersion = hiveVersion(version)
- val files = resolvedVersions.getOrElseUpdate(resolvedVersion, downloadVersion(resolvedVersion))
+ val files = resolvedVersions.getOrElseUpdate(resolvedVersion,
+ downloadVersion(resolvedVersion, ivyPath))
new IsolatedClientLoader(hiveVersion(version), files, config)
}
@@ -51,9 +53,12 @@ private[hive] object IsolatedClientLoader {
case "12" | "0.12" | "0.12.0" => hive.v12
case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13
case "14" | "0.14" | "0.14.0" => hive.v14
+ case "1.0" | "1.0.0" => hive.v1_0
+ case "1.1" | "1.1.0" => hive.v1_1
+ case "1.2" | "1.2.0" => hive.v1_2
}
- private def downloadVersion(version: HiveVersion): Seq[URL] = {
+ private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): Seq[URL] = {
val hiveArtifacts = version.extraDeps ++
Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
@@ -64,7 +69,7 @@ private[hive] object IsolatedClientLoader {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
Some("http://www.datanucleus.org/downloads/maven2"),
- None,
+ ivyPath,
exclusions = version.exclusions)
}
val allFiles = classpath.split(",").map(new File(_)).toSet
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 27a3d8f589..b48082fe4b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -32,13 +32,36 @@ package object client {
// Hive 0.14 depends on calcite 0.9.2-incubating-SNAPSHOT which does not exist in
// maven central anymore, so override those with a version that exists.
//
- // org.pentaho:pentaho-aggdesigner-algorithm is also nowhere to be found, so exclude
- // it explicitly. If it's needed by the metastore client, users will have to dig it
- // out of somewhere and use configuration to point Spark at the correct jars.
+ // The other excluded dependencies are also nowhere to be found, so exclude them explicitly. If
+ // they're needed by the metastore client, users will have to dig them out of somewhere and use
+ // configuration to point Spark at the correct jars.
case object v14 extends HiveVersion("0.14.0",
- Seq("org.apache.calcite:calcite-core:1.3.0-incubating",
+ extraDeps = Seq("org.apache.calcite:calcite-core:1.3.0-incubating",
"org.apache.calcite:calcite-avatica:1.3.0-incubating"),
- Seq("org.pentaho:pentaho-aggdesigner-algorithm"))
+ exclusions = Seq("org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ case object v1_0 extends HiveVersion("1.0.0",
+ exclusions = Seq("eigenbase:eigenbase-properties",
+ "org.pentaho:pentaho-aggdesigner-algorithm",
+ "net.hydromatic:linq4j",
+ "net.hydromatic:quidem"))
+
+ // The curator dependency was added to the exclusions here because it seems to confuse the ivy
+ // library. org.apache.curator:curator is a pom dependency but ivy tries to find the jar for it,
+ // and fails.
+ case object v1_1 extends HiveVersion("1.1.0",
+ exclusions = Seq("eigenbase:eigenbase-properties",
+ "org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm",
+ "net.hydromatic:linq4j",
+ "net.hydromatic:quidem"))
+
+ case object v1_2 extends HiveVersion("1.2.0",
+ exclusions = Seq("eigenbase:eigenbase-properties",
+ "org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm",
+ "net.hydromatic:linq4j",
+ "net.hydromatic:quidem"))
}
// scalastyle:on
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9a571650b6..d52e162acb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive.client
+import java.io.File
+
import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.util.Utils
@@ -28,6 +30,12 @@ import org.apache.spark.util.Utils
* is not fully tested.
*/
class VersionsSuite extends SparkFunSuite with Logging {
+
+ // Do not use a temp path here to speed up subsequent executions of the unit test during
+ // development.
+ private val ivyPath = Some(
+ new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath())
+
private def buildConf() = {
lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = Utils.createTempDir()
@@ -38,7 +46,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test("success sanity check") {
- val badClient = IsolatedClientLoader.forVersion("13", buildConf()).client
+ val badClient = IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).client
val db = new HiveDatabase("default", "")
badClient.createDatabase(db)
}
@@ -67,19 +75,21 @@ class VersionsSuite extends SparkFunSuite with Logging {
// TODO: currently only works on mysql where we manually create the schema...
ignore("failure sanity check") {
val e = intercept[Throwable] {
- val badClient = quietly { IsolatedClientLoader.forVersion("13", buildConf()).client }
+ val badClient = quietly {
+ IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).client
+ }
}
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
- private val versions = Seq("12", "13", "14")
+ private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
private var client: ClientInterface = null
versions.foreach { version =>
test(s"$version: create client") {
client = null
- client = IsolatedClientLoader.forVersion(version, buildConf()).client
+ client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).client
}
test(s"$version: createDatabase") {
@@ -170,5 +180,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
false,
false)
}
+
+ test(s"$version: create index and reset") {
+ client.runSqlHive("CREATE TABLE indexed_table (key INT)")
+ client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
+ "as 'COMPACT' WITH DEFERRED REBUILD")
+ client.reset()
+ }
}
}