diff options
5 files changed, 43 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d56946e932..d8701812eb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) - _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten + _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f9d05409e1..aebd98b3b0 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2352,6 +2352,26 @@ private[spark] object Utils extends Logging { log.info(s"Started daemon with process name: ${Utils.getProcessName()}") SignalUtils.registerLogger(log) } + + /** + * Unions two comma-separated lists of files and filters out empty strings. + */ + def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = { + var allFiles = Set[String]() + leftList.foreach { value => allFiles ++= value.split(",") } + rightList.foreach { value => allFiles ++= value.split(",") } + allFiles.filter { _.nonEmpty } + } + + def getUserJars(conf: SparkConf): Seq[String] = { + val sparkJars = conf.getOption("spark.jars") + if (conf.get("spark.master") == "yarn") { + val yarnJars = conf.getOption("spark.yarn.dist.jars") + unionFileLists(sparkJars, yarnJars).toSeq + } else { + sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten + } + } } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 2718976992..0b020592b0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -570,6 +570,18 @@ class SparkSubmitSuite appArgs.executorMemory should be ("2.3g") } } + + test("comma separated list of files are unioned correctly") { + val left = Option("/tmp/a.jar,/tmp/b.jar") + val right = Option("/tmp/c.jar,/tmp/a.jar") + val emptyString = Option("") + Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", "/tmp/b.jar", "/tmp/c.jar")) + Utils.unionFileLists(emptyString, emptyString) should be (Set.empty) + Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be (Set("/tmp/a.jar")) + Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar")) + Utils.unionFileLists(None, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar")) + Utils.unionFileLists(Option("/tmp/a.jar"), None) should be (Set("/tmp/a.jar")) + } // scalastyle:on println // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index b1e95d8fdb..66de5e462a 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1067,12 +1067,19 @@ object SparkILoop extends Logging { private def echo(msg: String) = Console println msg def getAddedJars: Array[String] = { + val conf = new SparkConf().setMaster(getMaster()) val envJars = sys.env.get("ADD_JARS") if (envJars.isDefined) { logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead") } - val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) } - val jars = propJars.orElse(envJars).getOrElse("") + val jars = { + val userJars = Utils.getUserJars(conf) + if (userJars.isEmpty) { + envJars.getOrElse("") + } else { + userJars.mkString(",") + } + } Utils.resolveURIs(jars).split(",").filter(_.nonEmpty) } diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 771670fa55..28fe84d6fe 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -54,9 +54,7 @@ object Main extends Logging { // Visible for testing private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { interp = _interp - val jars = conf.getOption("spark.jars") - .map(_.replace(",", File.pathSeparator)) - .getOrElse("") + val jars = Utils.getUserJars(conf).mkString(File.pathSeparator) val interpArguments = List( "-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", |