aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-14 11:49:16 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-14 11:49:22 -0700
commit4eb48ed1dadee80d78ada5d15884dd348c46ad27 (patch)
treea690f00a830f1cd1dcade8ba2bedb068d664ae94 /core
parentf3f2a4397da164f0ddfa5d60bf441099296c4346 (diff)
downloadspark-4eb48ed1dadee80d78ada5d15884dd348c46ad27.tar.gz
spark-4eb48ed1dadee80d78ada5d15884dd348c46ad27.tar.bz2
spark-4eb48ed1dadee80d78ada5d15884dd348c46ad27.zip
[SPARK-8065] [SQL] Add support for Hive 0.14 metastores
This change has two parts. The first one gets rid of "ReflectionMagic". That worked well for the differences between 0.12 and 0.13, but breaks in 0.14, since some of the APIs that need to be used have primitive types. I could not figure out a way to make that class work with primitive types. So instead I wrote some shims (I can already hear the collective sigh) that find the appropriate methods via reflection. This should be faster since the method instances are cached, and the code is not much uglier than before, with the advantage that all the ugliness is local to one file (instead of multiple switch statements on the version being used scattered in ClientWrapper). The second part is simple: add code to handle Hive 0.14. A few new methods had to be added to the new shims. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #6627 from vanzin/SPARK-8065 and squashes the following commits: 3fa4270 [Marcelo Vanzin] Indentation style. 4b8a3d4 [Marcelo Vanzin] Fix dep exclusion. be3d0cc [Marcelo Vanzin] Merge branch 'master' into SPARK-8065 ca3fb1e [Marcelo Vanzin] Merge branch 'master' into SPARK-8065 b43f13e [Marcelo Vanzin] Since exclusions seem to work, clean up some of the code. 73bd161 [Marcelo Vanzin] Botched merge. d2ddf01 [Marcelo Vanzin] Comment about excluded dep. 0c929d1 [Marcelo Vanzin] Merge branch 'master' into SPARK-8065 2c3c02e [Marcelo Vanzin] Try to fix tests by adding support for exclusions. 0a03470 [Marcelo Vanzin] Try to fix tests by upgrading calcite dependency. 13b2dfa [Marcelo Vanzin] Fix NPE. 6439d88 [Marcelo Vanzin] Minor style thing. 69b017b [Marcelo Vanzin] Style. a21cad8 [Marcelo Vanzin] Part II: Add shims / version for Hive 0.14. ae98c87 [Marcelo Vanzin] PART I: Get rid of reflection magic.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala16
2 files changed, 30 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b8978e25a0..cfcc6d3558 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -832,11 +832,7 @@ private[spark] object SparkSubmitUtils {
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
- val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
- val scalaDependencyExcludeRule =
- new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
- scalaDependencyExcludeRule.addConfiguration(ivyConfName)
- md.addExcludeRule(scalaDependencyExcludeRule)
+ md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
// other spark-streaming utility components. Underscore is there to differentiate between
@@ -845,13 +841,8 @@ private[spark] object SparkSubmitUtils {
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
components.foreach { comp =>
- val sparkArtifacts =
- new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
- val sparkDependencyExcludeRule =
- new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
- sparkDependencyExcludeRule.addConfiguration(ivyConfName)
-
- md.addExcludeRule(sparkDependencyExcludeRule)
+ md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
+ ivyConfName))
}
}
@@ -864,6 +855,7 @@ private[spark] object SparkSubmitUtils {
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
+ * @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
@@ -871,6 +863,7 @@ private[spark] object SparkSubmitUtils {
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
+ exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
""
@@ -928,6 +921,10 @@ private[spark] object SparkSubmitUtils {
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
+ exclusions.foreach { e =>
+ md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
+ }
+
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
@@ -944,6 +941,18 @@ private[spark] object SparkSubmitUtils {
}
}
}
+
+ private def createExclusion(
+ coords: String,
+ ivySettings: IvySettings,
+ ivyConfName: String): ExcludeRule = {
+ val c = extractMavenCoordinates(coords)(0)
+ val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
+ val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
+ rule.addConfiguration(ivyConfName)
+ rule
+ }
+
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 07d261cc42..3a8da9fb9e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -106,7 +106,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
- Option(tempIvyPath), true)
+ Option(tempIvyPath), isTest = true)
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
}
}
@@ -115,21 +115,23 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = new MavenCoordinate("my.awesome.lib", "mylib", "0.1")
// Local M2 repository
IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true)
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
+ isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
}
// Local Ivy Repository
val settings = new IvySettings
val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator)
IvyTestUtils.withRepository(main, None, Some(ivyLocal), true) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true)
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
+ isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
}
// Local ivy repository with modified home
val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
IvyTestUtils.withRepository(main, None, Some(dummyIvyLocal), true) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
- Some(tempIvyPath), true)
+ Some(tempIvyPath), isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
}
@@ -137,7 +139,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("dependency not found throws RuntimeException") {
intercept[RuntimeException] {
- SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, true)
+ SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true)
}
}
@@ -149,12 +151,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
",org.apache.spark:spark-core_fake:1.2.0"
- val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true)
+ val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString,
- Some(repo), None, true)
+ Some(repo), None, isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
}
}