aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/install-dev.bat5
-rwxr-xr-xR/install-dev.sh8
-rw-r--r--R/pkg/DESCRIPTION1
-rw-r--r--R/pkg/R/RDD.R2
-rw-r--r--R/pkg/R/pairRDD.R1
-rw-r--r--R/pkg/R/sparkR.R10
-rw-r--r--R/pkg/R/zzz.R20
-rw-r--r--R/pkg/inst/profile/general.R4
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRDD.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RUtils.scala65
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RRunner.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala3
-rwxr-xr-xmake-distribution.sh1
-rw-r--r--project/MimaExcludes.scala12
15 files changed, 133 insertions, 54 deletions
diff --git a/R/install-dev.bat b/R/install-dev.bat
index 008a5c668b..f32670b67d 100644
--- a/R/install-dev.bat
+++ b/R/install-dev.bat
@@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
+
+rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
+pushd %SPARK_HOME%\R\lib
+%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
+popd
diff --git a/R/install-dev.sh b/R/install-dev.sh
index 1edd551f8d..4972bb9217 100755
--- a/R/install-dev.sh
+++ b/R/install-dev.sh
@@ -34,7 +34,7 @@ LIB_DIR="$FWDIR/lib"
mkdir -p $LIB_DIR
-pushd $FWDIR
+pushd $FWDIR > /dev/null
# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'
@@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
-popd
+# Zip the SparkR package so that it can be distributed to worker nodes on YARN
+cd $LIB_DIR
+jar cfM "$LIB_DIR/sparkr.zip" SparkR
+
+popd > /dev/null
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index efc85bbc4b..d028821534 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -32,4 +32,3 @@ Collate:
'serialize.R'
'sparkR.R'
'utils.R'
- 'zzz.R'
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 89511141d3..d2d0967092 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
serializedFuncArr,
rdd@env$prev_serializedMode,
packageNamesArr,
- as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
} else {
@@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
rdd@env$prev_serializedMode,
serializedMode,
packageNamesArr,
- as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 0f1179e0aa..ebc6ff65e9 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -215,7 +215,6 @@ setMethod("partitionBy",
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
- as.character(.sparkREnv$libname),
broadcastArr,
callJMethod(jrdd, "classTag"))
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 048eb8ed54..172335809d 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -17,10 +17,6 @@
.sparkREnv <- new.env()
-sparkR.onLoad <- function(libname, pkgname) {
- .sparkREnv$libname <- libname
-}
-
# Utility function that returns TRUE if we have an active connection to the
# backend and FALSE otherwise
connExists <- function(env) {
@@ -80,7 +76,6 @@ sparkR.stop <- function() {
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
-#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkPackages Character string vector of packages from spark-packages.org
#' @export
#' @examples
@@ -101,7 +96,6 @@ sparkR.init <- function(
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
- sparkRLibDir = "",
sparkPackages = "") {
if (exists(".sparkRjsc", envir = .sparkREnv)) {
@@ -170,10 +164,6 @@ sparkR.init <- function(
sparkHome <- normalizePath(sparkHome)
}
- if (nchar(sparkRLibDir) != 0) {
- .sparkREnv$libname <- sparkRLibDir
- }
-
sparkEnvirMap <- new.env()
for (varname in names(sparkEnvir)) {
sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
diff --git a/R/pkg/R/zzz.R b/R/pkg/R/zzz.R
deleted file mode 100644
index 301feade65..0000000000
--- a/R/pkg/R/zzz.R
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-.onLoad <- function(libname, pkgname) {
- sparkR.onLoad(libname, pkgname)
-}
diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R
index 8fe711b622..2a8a8213d0 100644
--- a/R/pkg/inst/profile/general.R
+++ b/R/pkg/inst/profile/general.R
@@ -16,7 +16,7 @@
#
.First <- function() {
- home <- Sys.getenv("SPARK_HOME")
- .libPaths(c(file.path(home, "R", "lib"), .libPaths()))
+ packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
+ .libPaths(c(packageDir, .libPaths()))
Sys.setenv(NOAWT=1)
}
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index ff1702f7de..23a470d6af 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -39,7 +39,6 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
deserializer: String,
serializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Broadcast[Object]])
extends RDD[U](parent) with Logging {
protected var dataStream: DataInputStream = _
@@ -60,7 +59,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
// The stdout/stderr is shared by multiple tasks, because we use one daemon
// to launch child process as worker.
- val errThread = RRDD.createRWorker(rLibDir, listenPort)
+ val errThread = RRDD.createRWorker(listenPort)
// We use two sockets to separate input and output, then it's easy to manage
// the lifecycle of them to avoid deadlock.
@@ -235,11 +234,10 @@ private class PairwiseRRDD[T: ClassTag](
hashFunc: Array[Byte],
deserializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, (Int, Array[Byte])](
parent, numPartitions, hashFunc, deserializer,
- SerializationFormats.BYTE, packageNames, rLibDir,
+ SerializationFormats.BYTE, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
override protected def readData(length: Int): (Int, Array[Byte]) = {
@@ -266,10 +264,9 @@ private class RRDD[T: ClassTag](
deserializer: String,
serializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, Array[Byte]](
- parent, -1, func, deserializer, serializer, packageNames, rLibDir,
+ parent, -1, func, deserializer, serializer, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
override protected def readData(length: Int): Array[Byte] = {
@@ -293,10 +290,9 @@ private class StringRRDD[T: ClassTag](
func: Array[Byte],
deserializer: String,
packageNames: Array[Byte],
- rLibDir: String,
broadcastVars: Array[Object])
extends BaseRRDD[T, String](
- parent, -1, func, deserializer, SerializationFormats.STRING, packageNames, rLibDir,
+ parent, -1, func, deserializer, SerializationFormats.STRING, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
override protected def readData(length: Int): String = {
@@ -392,9 +388,10 @@ private[r] object RRDD {
thread
}
- private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = {
+ private def createRProcess(port: Int, script: String): BufferedStreamThread = {
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
val rOptions = "--vanilla"
+ val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
val rExecScript = rLibDir + "/SparkR/worker/" + script
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
// Unset the R_TESTS environment variable for workers.
@@ -413,7 +410,7 @@ private[r] object RRDD {
/**
* ProcessBuilder used to launch worker R processes.
*/
- def createRWorker(rLibDir: String, port: Int): BufferedStreamThread = {
+ def createRWorker(port: Int): BufferedStreamThread = {
val useDaemon = SparkEnv.get.conf.getBoolean("spark.sparkr.use.daemon", true)
if (!Utils.isWindows && useDaemon) {
synchronized {
@@ -421,7 +418,7 @@ private[r] object RRDD {
// we expect one connections
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val daemonPort = serverSocket.getLocalPort
- errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
+ errThread = createRProcess(daemonPort, "daemon.R")
// the socket used to send out the input of task
serverSocket.setSoTimeout(10000)
val sock = serverSocket.accept()
@@ -443,7 +440,7 @@ private[r] object RRDD {
errThread
}
} else {
- createRProcess(rLibDir, port, "worker.R")
+ createRProcess(port, "worker.R")
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
new file mode 100644
index 0000000000..d53abd3408
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.io.File
+
+import org.apache.spark.{SparkEnv, SparkException}
+
+private[spark] object RUtils {
+ /**
+ * Get the SparkR package path in the local spark distribution.
+ */
+ def localSparkRPackagePath: Option[String] = {
+ val sparkHome = sys.env.get("SPARK_HOME")
+ sparkHome.map(
+ Seq(_, "R", "lib").mkString(File.separator)
+ )
+ }
+
+ /**
+ * Get the SparkR package path in various deployment modes.
+ * This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
+ * and environment variable `SPARK_HOME` are set.
+ */
+ def sparkRPackagePath(isDriver: Boolean): String = {
+ val (master, deployMode) =
+ if (isDriver) {
+ (sys.props("spark.master"), sys.props("spark.submit.deployMode"))
+ } else {
+ val sparkConf = SparkEnv.get.conf
+ (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
+ }
+
+ val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
+ val isYarnClient = master.contains("yarn") && deployMode == "client"
+
+ // In YARN mode, the SparkR package is distributed as an archive symbolically
+ // linked to the "sparkr" file in the current directory. Note that this does not apply
+ // to the driver in client mode because it is run outside of the cluster.
+ if (isYarnCluster || (isYarnClient && !isDriver)) {
+ new File("sparkr").getAbsolutePath
+ } else {
+ // Otherwise, assume the package is local
+ // TODO: support this for Mesos
+ localSparkRPackagePath.getOrElse {
+ throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index 4165740312..c0cab22fa8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
-import org.apache.spark.api.r.RBackend
+import org.apache.spark.api.r.{RBackend, RUtils}
import org.apache.spark.util.RedirectThread
/**
@@ -71,9 +71,10 @@ object RRunner {
val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
val env = builder.environment()
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
- val sparkHome = System.getenv("SPARK_HOME")
+ val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
+ env.put("SPARKR_PACKAGE_DIR", rPackageDir)
env.put("R_PROFILE_USER",
- Seq(sparkHome, "R", "lib", "SparkR", "profile", "general.R").mkString(File.separator))
+ Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4cec9017b8..7089a7e267 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -37,6 +37,7 @@ import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
+import org.apache.spark.api.r.RUtils
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -79,6 +80,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
+ private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
@@ -262,6 +264,12 @@ object SparkSubmit {
}
}
+ // Update args.deployMode if it is null. It will be passed down as a Spark property later.
+ (args.deployMode, deployMode) match {
+ case (null, CLIENT) => args.deployMode = "client"
+ case (null, CLUSTER) => args.deployMode = "cluster"
+ case _ =>
+ }
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
@@ -347,6 +355,23 @@ object SparkSubmit {
}
}
+ // In YARN mode for an R app, add the SparkR package archive to archives
+ // that can be distributed with the job
+ if (args.isR && clusterManager == YARN) {
+ val rPackagePath = RUtils.localSparkRPackagePath
+ if (rPackagePath.isEmpty) {
+ printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
+ }
+ val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
+ if (!rPackageFile.exists()) {
+ printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
+ }
+ val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)
+
+ // Assigns a symbol link name "sparkr" to the shipped package.
+ args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
+ }
+
// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
@@ -375,6 +400,8 @@ object SparkSubmit {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
+ OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
+ sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1b64c329b5..e7878bde6f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -246,7 +246,7 @@ class SparkSubmitSuite
mainClass should be ("org.apache.spark.deploy.Client")
}
classpath should have size 0
- sysProps should have size 8
+ sysProps should have size 9
sysProps.keys should contain ("SPARK_SUBMIT")
sysProps.keys should contain ("spark.master")
sysProps.keys should contain ("spark.app.name")
@@ -255,6 +255,7 @@ class SparkSubmitSuite
sysProps.keys should contain ("spark.driver.cores")
sysProps.keys should contain ("spark.driver.supervise")
sysProps.keys should contain ("spark.shuffle.spill")
+ sysProps.keys should contain ("spark.submit.deployMode")
sysProps("spark.shuffle.spill") should be ("false")
}
diff --git a/make-distribution.sh b/make-distribution.sh
index 9f063da3a1..cac7032bb2 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -219,6 +219,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR"
if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
mkdir -p "$DISTDIR"/R/lib
cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib
+ cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib
fi
# Download and copy in tachyon, if requested
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 79089aae2a..4e4e810ec3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -83,6 +83,18 @@ object MimaExcludes {
"org.apache.spark.streaming.scheduler.InputInfo$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.streaming.scheduler.InputInfo")
+ ) ++ Seq(
+ // SPARK-6797 Support YARN modes for SparkR
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.PairwiseRRDD.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.RRDD.createRWorker"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.RRDD.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.StringRRDD.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.BaseRRDD.this")
)
case v if v.startsWith("1.4") =>