aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-07-13 08:21:47 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-07-13 08:21:47 -0700
commit7f487c8bde14dbdd244a3493ad11a129ef2bb327 (patch)
treee127a2d84beb0010eadc5c0bac664d15ebc22667 /core
parenta5bc803b7271028e34de1548d55b80ecfb812a7b (diff)
downloadspark-7f487c8bde14dbdd244a3493ad11a129ef2bb327.tar.gz
spark-7f487c8bde14dbdd244a3493ad11a129ef2bb327.tar.bz2
spark-7f487c8bde14dbdd244a3493ad11a129ef2bb327.zip
[SPARK-6797] [SPARKR] Add support for YARN cluster mode.
This PR enables SparkR to dynamically ship the SparkR binary package to the AM node in YARN cluster mode, thus it is no longer required that the SparkR package be installed on each worker node. This PR uses the JDK jar tool to package the SparkR package, because jar is thought to be available on both Linux/Windows platforms where JDK has been installed. This PR does not address the R worker involved in RDD API. Will address it in a separate JIRA issue. This PR does not address SBT build. SparkR installation and packaging by SBT will be addressed in a separate JIRA issue. R/install-dev.bat is not tested. shivaram , Could you help to test it? Author: Sun Rui <rui.sun@intel.com> Closes #6743 from sun-rui/SPARK-6797 and squashes the following commits: ca63c86 [Sun Rui] Adjust MimaExcludes after rebase. 7313374 [Sun Rui] Fix unit test errors. 72695fb [Sun Rui] Fix unit test failures. 193882f [Sun Rui] Fix Mima test error. fe25a33 [Sun Rui] Fix Mima test error. 35ecfa3 [Sun Rui] Fix comments. c38a005 [Sun Rui] Unzipped SparkR binary package is still required for standalone and Mesos modes. b05340c [Sun Rui] Fix scala style. 2ca5048 [Sun Rui] Fix comments. 1acefd1 [Sun Rui] Fix scala style. 0aa1e97 [Sun Rui] Fix scala style. 41d4f17 [Sun Rui] Add support for locating SparkR package for R workers required by RDD APIs. 49ff948 [Sun Rui] Invoke jar.exe with full path in install-dev.bat. 7b916c5 [Sun Rui] Use 'rem' consistently. 3bed438 [Sun Rui] Add a comment. 681afb0 [Sun Rui] Fix a bug that RRunner does not handle client deployment modes. cedfbe2 [Sun Rui] [SPARK-6797][SPARKR] Add support for YARN cluster mode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRDD.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RUtils.scala65
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RRunner.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala3
5 files changed, 107 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index ff1702f7de..23a470d6af 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -39,7 +39,6 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
deserializer: String,
serializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Broadcast[Object]])
extends RDD[U](parent) with Logging {
protected var dataStream: DataInputStream = _
@@ -60,7 +59,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
// The stdout/stderr is shared by multiple tasks, because we use one daemon
// to launch child process as worker.
- val errThread = RRDD.createRWorker(rLibDir, listenPort)
+ val errThread = RRDD.createRWorker(listenPort)
// We use two sockets to separate input and output, then it's easy to manage
// the lifecycle of them to avoid deadlock.
@@ -235,11 +234,10 @@ private class PairwiseRRDD[T: ClassTag](
hashFunc: Array[Byte],
deserializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, (Int, Array[Byte])](
parent, numPartitions, hashFunc, deserializer,
- SerializationFormats.BYTE, packageNames, rLibDir,
+ SerializationFormats.BYTE, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
override protected def readData(length: Int): (Int, Array[Byte]) = {
@@ -266,10 +264,9 @@ private class RRDD[T: ClassTag](
deserializer: String,
serializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, Array[Byte]](
- parent, -1, func, deserializer, serializer, packageNames, rLibDir,
+ parent, -1, func, deserializer, serializer, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
override protected def readData(length: Int): Array[Byte] = {
@@ -293,10 +290,9 @@ private class StringRRDD[T: ClassTag](
func: Array[Byte],
deserializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, String](
- parent, -1, func, deserializer, SerializationFormats.STRING, packageNames, rLibDir,
+ parent, -1, func, deserializer, SerializationFormats.STRING, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
override protected def readData(length: Int): String = {
@@ -392,9 +388,10 @@ private[r] object RRDD {
thread
}
- private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = {
+ private def createRProcess(port: Int, script: String): BufferedStreamThread = {
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
val rOptions = "--vanilla"
+ val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
val rExecScript = rLibDir + "/SparkR/worker/" + script
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
// Unset the R_TESTS environment variable for workers.
@@ -413,7 +410,7 @@ private[r] object RRDD {
/**
* ProcessBuilder used to launch worker R processes.
*/
- def createRWorker(rLibDir: String, port: Int): BufferedStreamThread = {
+ def createRWorker(port: Int): BufferedStreamThread = {
val useDaemon = SparkEnv.get.conf.getBoolean("spark.sparkr.use.daemon", true)
if (!Utils.isWindows && useDaemon) {
synchronized {
@@ -421,7 +418,7 @@ private[r] object RRDD {
// we expect one connections
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val daemonPort = serverSocket.getLocalPort
- errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
+ errThread = createRProcess(daemonPort, "daemon.R")
// the socket used to send out the input of task
serverSocket.setSoTimeout(10000)
val sock = serverSocket.accept()
@@ -443,7 +440,7 @@ private[r] object RRDD {
errThread
}
} else {
- createRProcess(rLibDir, port, "worker.R")
+ createRProcess(port, "worker.R")
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
new file mode 100644
index 0000000000..d53abd3408
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.io.File
+
+import org.apache.spark.{SparkEnv, SparkException}
+
+private[spark] object RUtils {
+ /**
+ * Get the SparkR package path in the local spark distribution.
+ */
+ def localSparkRPackagePath: Option[String] = {
+ val sparkHome = sys.env.get("SPARK_HOME")
+ sparkHome.map(
+ Seq(_, "R", "lib").mkString(File.separator)
+ )
+ }
+
+ /**
+ * Get the SparkR package path in various deployment modes.
+ * This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
+ * and environment variable `SPARK_HOME` are set.
+ */
+ def sparkRPackagePath(isDriver: Boolean): String = {
+ val (master, deployMode) =
+ if (isDriver) {
+ (sys.props("spark.master"), sys.props("spark.submit.deployMode"))
+ } else {
+ val sparkConf = SparkEnv.get.conf
+ (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
+ }
+
+ val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
+ val isYarnClient = master.contains("yarn") && deployMode == "client"
+
+ // In YARN mode, the SparkR package is distributed as an archive symbolically
+ // linked to the "sparkr" file in the current directory. Note that this does not apply
+ // to the driver in client mode because it is run outside of the cluster.
+ if (isYarnCluster || (isYarnClient && !isDriver)) {
+ new File("sparkr").getAbsolutePath
+ } else {
+ // Otherwise, assume the package is local
+ // TODO: support this for Mesos
+ localSparkRPackagePath.getOrElse {
+ throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index 4165740312..c0cab22fa8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
-import org.apache.spark.api.r.RBackend
+import org.apache.spark.api.r.{RBackend, RUtils}
import org.apache.spark.util.RedirectThread
/**
@@ -71,9 +71,10 @@ object RRunner {
val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
val env = builder.environment()
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
- val sparkHome = System.getenv("SPARK_HOME")
+ val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
+ env.put("SPARKR_PACKAGE_DIR", rPackageDir)
env.put("R_PROFILE_USER",
- Seq(sparkHome, "R", "lib", "SparkR", "profile", "general.R").mkString(File.separator))
+ Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4cec9017b8..7089a7e267 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -37,6 +37,7 @@ import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
+import org.apache.spark.api.r.RUtils
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -79,6 +80,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
+ private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
@@ -262,6 +264,12 @@ object SparkSubmit {
}
}
+ // Update args.deployMode if it is null. It will be passed down as a Spark property later.
+ (args.deployMode, deployMode) match {
+ case (null, CLIENT) => args.deployMode = "client"
+ case (null, CLUSTER) => args.deployMode = "cluster"
+ case _ =>
+ }
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
@@ -347,6 +355,23 @@ object SparkSubmit {
}
}
+ // In YARN mode for an R app, add the SparkR package archive to archives
+ // that can be distributed with the job
+ if (args.isR && clusterManager == YARN) {
+ val rPackagePath = RUtils.localSparkRPackagePath
+ if (rPackagePath.isEmpty) {
+ printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
+ }
+ val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
+ if (!rPackageFile.exists()) {
+ printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
+ }
+ val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)
+
+ // Assigns a symbol link name "sparkr" to the shipped package.
+ args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
+ }
+
// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
@@ -375,6 +400,8 @@ object SparkSubmit {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
+ OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
+ sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1b64c329b5..e7878bde6f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -246,7 +246,7 @@ class SparkSubmitSuite
mainClass should be ("org.apache.spark.deploy.Client")
}
classpath should have size 0
- sysProps should have size 8
+ sysProps should have size 9
sysProps.keys should contain ("SPARK_SUBMIT")
sysProps.keys should contain ("spark.master")
sysProps.keys should contain ("spark.app.name")
@@ -255,6 +255,7 @@ class SparkSubmitSuite
sysProps.keys should contain ("spark.driver.cores")
sysProps.keys should contain ("spark.driver.supervise")
sysProps.keys should contain ("spark.shuffle.spill")
+ sysProps.keys should contain ("spark.submit.deployMode")
sysProps("spark.shuffle.spill") should be ("false")
}