aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNezih Yigitbasi <nyigitbasi@netflix.com>2016-06-15 14:07:36 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-06-15 14:07:36 -0700
commit4df8df5c2e68f5a5d231c401b04d762d7a648159 (patch)
tree3d5ff81f6e22fa75167e92ede32a53bbf3887545
parent5389013acc99367729dfc6deeb2cecc9edd1e24c (diff)
downloadspark-4df8df5c2e68f5a5d231c401b04d762d7a648159.tar.gz
spark-4df8df5c2e68f5a5d231c401b04d762d7a648159.tar.bz2
spark-4df8df5c2e68f5a5d231c401b04d762d7a648159.zip
[SPARK-15782][YARN] Set spark.jars system property in client mode
## What changes were proposed in this pull request? When `--packages` is specified with `spark-shell` the classes from those packages cannot be found, which I think is due to some of the changes in `SPARK-12343`. In particular `SPARK-12343` removes a line that sets the `spark.jars` system property in client mode, which is used by the repl main class to set the classpath. ## How was this patch tested? Tested manually. This system property is used by the repl to populate its classpath. If this is not set properly the classes for external packages cannot be found. tgravescs vanzin as you may be familiar with this part of the code. Author: Nezih Yigitbasi <nyigitbasi@netflix.com> Closes #13527 from nezihyigitbasi/repl-fix.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala12
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala11
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala4
5 files changed, 43 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d56946e932..d8701812eb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
- _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+ _jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f9d05409e1..aebd98b3b0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2352,6 +2352,26 @@ private[spark] object Utils extends Logging {
log.info(s"Started daemon with process name: ${Utils.getProcessName()}")
SignalUtils.registerLogger(log)
}
+
+ /**
+ * Unions two comma-separated lists of files and filters out empty strings.
+ */
+ def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = {
+ var allFiles = Set[String]()
+ leftList.foreach { value => allFiles ++= value.split(",") }
+ rightList.foreach { value => allFiles ++= value.split(",") }
+ allFiles.filter { _.nonEmpty }
+ }
+
+ def getUserJars(conf: SparkConf): Seq[String] = {
+ val sparkJars = conf.getOption("spark.jars")
+ if (conf.get("spark.master") == "yarn") {
+ val yarnJars = conf.getOption("spark.yarn.dist.jars")
+ unionFileLists(sparkJars, yarnJars).toSeq
+ } else {
+ sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+ }
+ }
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 2718976992..0b020592b0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -570,6 +570,18 @@ class SparkSubmitSuite
appArgs.executorMemory should be ("2.3g")
}
}
+
+ test("comma separated list of files are unioned correctly") {
+ val left = Option("/tmp/a.jar,/tmp/b.jar")
+ val right = Option("/tmp/c.jar,/tmp/a.jar")
+ val emptyString = Option("")
+ Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", "/tmp/b.jar", "/tmp/c.jar"))
+ Utils.unionFileLists(emptyString, emptyString) should be (Set.empty)
+ Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be (Set("/tmp/a.jar"))
+ Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
+ Utils.unionFileLists(None, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
+ Utils.unionFileLists(Option("/tmp/a.jar"), None) should be (Set("/tmp/a.jar"))
+ }
// scalastyle:on println
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index b1e95d8fdb..66de5e462a 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1067,12 +1067,19 @@ object SparkILoop extends Logging {
private def echo(msg: String) = Console println msg
def getAddedJars: Array[String] = {
+ val conf = new SparkConf().setMaster(getMaster())
val envJars = sys.env.get("ADD_JARS")
if (envJars.isDefined) {
logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
}
- val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
- val jars = propJars.orElse(envJars).getOrElse("")
+ val jars = {
+ val userJars = Utils.getUserJars(conf)
+ if (userJars.isEmpty) {
+ envJars.getOrElse("")
+ } else {
+ userJars.mkString(",")
+ }
+ }
Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
}
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 771670fa55..28fe84d6fe 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -54,9 +54,7 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
- val jars = conf.getOption("spark.jars")
- .map(_.replace(",", File.pathSeparator))
- .getOrElse("")
+ val jars = Utils.getUserJars(conf).mkString(File.pathSeparator)
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",