aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-02-27 22:59:35 -0800
committerPatrick Wendell <patrick@databricks.com>2015-02-27 22:59:35 -0800
commit6d8e5fbc0d83411174ffa59ff6a761a862eca32c (patch)
tree515cc1dbfe76dc9066034d1061f12d1707b5a864
parentdba08d1fc3bdb9245aefe695970354df088a93b6 (diff)
downloadspark-6d8e5fbc0d83411174ffa59ff6a761a862eca32c.tar.gz
spark-6d8e5fbc0d83411174ffa59ff6a761a862eca32c.tar.bz2
spark-6d8e5fbc0d83411174ffa59ff6a761a862eca32c.zip
[SPARK-5979][SPARK-6032] Smaller safer --packages fix
pwendell tdas This is the safer parts of PR #4754: - SPARK-5979: All dependencies with the groupId `org.apache.spark` passed through `--packages`, were being excluded from the dependency tree on the assumption that they would be in the assembly jar. This is not the case, therefore the exclusion rules had to be defined more explicitly. - SPARK-6032: Ivy prints a whole lot of logs while retrieving dependencies. These were printed to `System.out`. Moved the logging to `System.err`. Author: Burak Yavuz <brkyvz@gmail.com> Closes #4802 from brkyvz/simple-streaming-fix and squashes the following commits: e0f38cb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into simple-streaming-fix bad921c [Burak Yavuz] [SPARK-5979][SPARK-6032] Smaller safer fix
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala53
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala16
2 files changed, 51 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4c4110812e..4a74641f4e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils {
/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
- * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
- * simplicity for Spark Package users.
+ * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
@@ -747,6 +746,35 @@ private[spark] object SparkSubmitUtils {
md.addDependency(dd)
}
}
+
+ /** Add exclusion rules for dependencies already included in the spark-assembly */
+ private[spark] def addExclusionRules(
+ ivySettings: IvySettings,
+ ivyConfName: String,
+ md: DefaultModuleDescriptor): Unit = {
+ // Add scala exclusion rule
+ val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
+ val scalaDependencyExcludeRule =
+ new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
+ scalaDependencyExcludeRule.addConfiguration(ivyConfName)
+ md.addExcludeRule(scalaDependencyExcludeRule)
+
+ // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
+ // other spark-streaming utility components. Underscore is there to differentiate between
+ // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
+ val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
+ "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
+
+ components.foreach { comp =>
+ val sparkArtifacts =
+ new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
+ val sparkDependencyExcludeRule =
+ new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
+ sparkDependencyExcludeRule.addConfiguration(ivyConfName)
+
+ md.addExcludeRule(sparkDependencyExcludeRule)
+ }
+ }
/** A nice function to use in tests as well. Values are dummy strings. */
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
@@ -768,6 +796,9 @@ private[spark] object SparkSubmitUtils {
if (coordinates == null || coordinates.trim.isEmpty) {
""
} else {
+ val sysOut = System.out
+ // To prevent ivy from logging to system out
+ System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
@@ -811,19 +842,9 @@ private[spark] object SparkSubmitUtils {
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)
- // Add an exclusion rule for Spark and Scala Library
- val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
- val sparkDependencyExcludeRule =
- new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
- sparkDependencyExcludeRule.addConfiguration(ivyConfName)
- val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
- val scalaDependencyExcludeRule =
- new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
- scalaDependencyExcludeRule.addConfiguration(ivyConfName)
-
- // Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
- md.addExcludeRule(sparkDependencyExcludeRule)
- md.addExcludeRule(scalaDependencyExcludeRule)
+ // Add exclusion rules for Spark and Scala Library
+ addExclusionRules(ivySettings, ivyConfName, md)
+ // add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
// resolve dependencies
@@ -835,7 +856,7 @@ private[spark] object SparkSubmitUtils {
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
-
+ System.setOut(sysOut)
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index ad62b35f62..8bcca92609 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -117,8 +117,20 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
}
test("neglects Spark and Spark's dependencies") {
- val path = SparkSubmitUtils.resolveMavenCoordinates(
- "org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
+ val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
+ "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
+
+ val coordinates =
+ components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
+ ",org.apache.spark:spark-core_fake:1.2.0"
+
+ val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true)
assert(path === "", "should return empty path")
+ // Should not exclude the following dependency. Will throw an error, because it doesn't exist,
+ // but the fact that it is checking means that it wasn't excluded.
+ intercept[RuntimeException] {
+ SparkSubmitUtils.resolveMavenCoordinates(coordinates +
+ ",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true)
+ }
}
}