aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/install-dev.bat6
-rwxr-xr-xR/install-dev.sh4
-rw-r--r--R/pkg/R/sparkR.R14
-rw-r--r--R/pkg/inst/profile/general.R3
-rw-r--r--R/pkg/inst/worker/daemon.R5
-rw-r--r--R/pkg/inst/worker/worker.R3
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RUtils.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RRunner.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala43
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala5
-rwxr-xr-xmake-distribution.sh1
14 files changed, 121 insertions, 36 deletions
diff --git a/R/install-dev.bat b/R/install-dev.bat
index 008a5c668b..ed1c91ae3a 100644
--- a/R/install-dev.bat
+++ b/R/install-dev.bat
@@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
+
+rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
+pushd %SPARK_HOME%\R\lib
+%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
+popd
+
diff --git a/R/install-dev.sh b/R/install-dev.sh
index 59d98c9c7a..4972bb9217 100755
--- a/R/install-dev.sh
+++ b/R/install-dev.sh
@@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
+# Zip the SparkR package so that it can be distributed to worker nodes on YARN
+cd $LIB_DIR
+jar cfM "$LIB_DIR/sparkr.zip" SparkR
+
popd > /dev/null
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index ebe2b2b8dc..7ff3fa628b 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -48,6 +48,12 @@ sparkR.stop <- function() {
}
}
+ # Remove the R package lib path from .libPaths()
+ if (exists(".libPath", envir = env)) {
+ libPath <- get(".libPath", envir = env)
+ .libPaths(.libPaths()[.libPaths() != libPath])
+ }
+
if (exists(".backendLaunched", envir = env)) {
callJStatic("SparkRHandler", "stopBackend")
}
@@ -155,14 +161,20 @@ sparkR.init <- function(
f <- file(path, open="rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
+ rLibPath <- readString(f)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
- length(monitorPort) == 0 || monitorPort == 0) {
+ length(monitorPort) == 0 || monitorPort == 0 ||
+ length(rLibPath) != 1) {
stop("JVM failed to launch")
}
assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
+ if (rLibPath != "") {
+ assign(".libPath", rLibPath, envir = .sparkREnv)
+ .libPaths(c(rLibPath, .libPaths()))
+ }
}
.sparkREnv$backendPort <- backendPort
diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R
index 2a8a8213d0..c55fe9ba7a 100644
--- a/R/pkg/inst/profile/general.R
+++ b/R/pkg/inst/profile/general.R
@@ -17,6 +17,7 @@
.First <- function() {
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
- .libPaths(c(packageDir, .libPaths()))
+ dirs <- strsplit(packageDir, ",")[[1]]
+ .libPaths(c(dirs, .libPaths()))
Sys.setenv(NOAWT=1)
}
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
index 3584b418a7..f55beac6c8 100644
--- a/R/pkg/inst/worker/daemon.R
+++ b/R/pkg/inst/worker/daemon.R
@@ -18,10 +18,11 @@
# Worker daemon
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
-script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/")
+dirs <- strsplit(rLibDir, ",")[[1]]
+script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")
# preload SparkR package, speedup worker
-.libPaths(c(rLibDir, .libPaths()))
+.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index 0c3b0d1f4b..3ae072beca 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -35,10 +35,11 @@ bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
+dirs <- strsplit(rLibDir, ",")[[1]]
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
# SparkR namespace
-.libPaths(c(rLibDir, .libPaths()))
+.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
index b7e72d4d0e..8b3be0da2c 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
@@ -113,6 +113,7 @@ private[spark] object RBackend extends Logging {
val dos = new DataOutputStream(new FileOutputStream(f))
dos.writeInt(boundPort)
dos.writeInt(listenPort)
+ SerDe.writeString(dos, RUtils.rPackages.getOrElse(""))
dos.close()
f.renameTo(new File(path))
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 6b418e908c..7509b3d3f4 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -400,14 +400,14 @@ private[r] object RRDD {
val rOptions = "--vanilla"
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
- val rExecScript = rLibDir + "/SparkR/worker/" + script
+ val rExecScript = rLibDir(0) + "/SparkR/worker/" + script
val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript))
// Unset the R_TESTS environment variable for workers.
// This is set by R CMD check as startup.Rs
// (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R)
// and confuses worker script which tries to load a non-existent file
pb.environment().put("R_TESTS", "")
- pb.environment().put("SPARKR_RLIBDIR", rLibDir)
+ pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
pb.redirectErrorStream(true) // redirect stderr into stdout
val proc = pb.start()
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index fd5646b5b6..16157414fd 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -23,6 +23,10 @@ import java.util.Arrays
import org.apache.spark.{SparkEnv, SparkException}
private[spark] object RUtils {
+ // Local path where R binary packages built from R source code contained in the spark
+ // packages specified with "--packages" or "--jars" command line option reside.
+ var rPackages: Option[String] = None
+
/**
* Get the SparkR package path in the local spark distribution.
*/
@@ -34,11 +38,15 @@ private[spark] object RUtils {
}
/**
- * Get the SparkR package path in various deployment modes.
+ * Get the list of paths for R packages in various deployment modes, of which the first
+ * path is for the SparkR package itself. The second path is for R packages built as
+ * part of Spark Packages, if any exist. Spark Packages can be provided through the
+ * "--packages" or "--jars" command line options.
+ *
* This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
* and environment variable `SPARK_HOME` are set.
*/
- def sparkRPackagePath(isDriver: Boolean): String = {
+ def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
val (master, deployMode) =
if (isDriver) {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
@@ -51,15 +59,30 @@ private[spark] object RUtils {
val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"
// In YARN mode, the SparkR package is distributed as an archive symbolically
- // linked to the "sparkr" file in the current directory. Note that this does not apply
- // to the driver in client mode because it is run outside of the cluster.
+ // linked to the "sparkr" file in the current directory and additional R packages
+ // are distributed as an archive symbolically linked to the "rpkg" file in the
+ // current directory.
+ //
+ // Note that this does not apply to the driver in client mode because it is run
+ // outside of the cluster.
if (isYarnCluster || (isYarnClient && !isDriver)) {
- new File("sparkr").getAbsolutePath
+ val sparkRPkgPath = new File("sparkr").getAbsolutePath
+ val rPkgPath = new File("rpkg")
+ if (rPkgPath.exists()) {
+ Seq(sparkRPkgPath, rPkgPath.getAbsolutePath)
+ } else {
+ Seq(sparkRPkgPath)
+ }
} else {
// Otherwise, assume the package is local
// TODO: support this for Mesos
- localSparkRPackagePath.getOrElse {
- throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
+ val sparkRPkgPath = localSparkRPackagePath.getOrElse {
+ throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
+ }
+ if (!rPackages.isEmpty) {
+ Seq(sparkRPkgPath, rPackages.get)
+ } else {
+ Seq(sparkRPkgPath)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index 7d160b6790..d46dc87a92 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -100,20 +100,29 @@ private[deploy] object RPackageUtils extends Logging {
* Runs the standard R package installation code to build the R package from source.
* Multiple runs don't cause problems.
*/
- private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
+ private def rPackageBuilder(
+ dir: File,
+ printStream: PrintStream,
+ verbose: Boolean,
+ libDir: String): Boolean = {
// this code should be always running on the driver.
- val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
- throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
- val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
+ val installCmd = baseInstallCmd ++ Seq(libDir, pathToPkg)
if (verbose) {
print(s"Building R package with the command: $installCmd", printStream)
}
try {
val builder = new ProcessBuilder(installCmd.asJava)
builder.redirectErrorStream(true)
+
+ // Put the SparkR package directory into R library search paths in case this R package
+ // may depend on SparkR.
val env = builder.environment()
- env.clear()
+ val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
+ env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
+ env.put("R_PROFILE_USER",
+ Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))
+
val process = builder.start()
new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
process.waitFor() == 0
@@ -170,8 +179,11 @@ private[deploy] object RPackageUtils extends Logging {
if (checkManifestForR(jar)) {
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
val rSource = extractRFolder(jar, printStream, verbose)
+ if (RUtils.rPackages.isEmpty) {
+ RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
+ }
try {
- if (!rPackageBuilder(rSource, printStream, verbose)) {
+ if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
}
@@ -208,7 +220,7 @@ private[deploy] object RPackageUtils extends Logging {
}
}
- /** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
+ /** Zips all the R libraries built for distribution to the cluster. */
private[deploy] def zipRLibraries(dir: File, name: String): File = {
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
// create a zip file from scratch, do not append to existing file.
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index ed183cf16a..661f7317c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -82,9 +82,10 @@ object RRunner {
val env = builder.environment()
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
- env.put("SPARKR_PACKAGE_DIR", rPackageDir)
+ // Put the R package directories into an env variable of comma-separated paths
+ env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
env.put("R_PROFILE_USER",
- Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
+ Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 09d2ec90c9..2e912b59af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -83,6 +83,7 @@ object SparkSubmit {
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
+ private val R_PACKAGE_ARCHIVE = "rpkg.zip"
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
@@ -362,22 +363,46 @@ object SparkSubmit {
}
}
- // In YARN mode for an R app, add the SparkR package archive to archives
- // that can be distributed with the job
+ // In YARN mode for an R app, add the SparkR package archive and the R package
+ // archive containing all of the built R libraries to archives so that they can
+ // be distributed with the job
if (args.isR && clusterManager == YARN) {
- val rPackagePath = RUtils.localSparkRPackagePath
- if (rPackagePath.isEmpty) {
+ val sparkRPackagePath = RUtils.localSparkRPackagePath
+ if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
- val rPackageFile =
- RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
- if (!rPackageFile.exists()) {
+ val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
+ if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
- val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)
+ val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
+ // Distribute the SparkR package.
// Assigns a symbol link name "sparkr" to the shipped package.
- args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
+ args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
+
+ // Distribute the R package archive containing all the built R packages.
+ if (!RUtils.rPackages.isEmpty) {
+ val rPackageFile =
+ RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
+ if (!rPackageFile.exists()) {
+ printErrorAndExit("Failed to zip all the built R packages.")
+ }
+
+ val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
+ // Assigns a symbol link name "rpkg" to the shipped package.
+ args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
+ }
+ }
+
+ // TODO: Support distributing R packages with standalone cluster
+ if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
+ printErrorAndExit("Distributing R packages with standalone cluster is not supported.")
+ }
+
+ // TODO: Support SparkR with mesos cluster
+ if (args.isR && clusterManager == MESOS) {
+ printErrorAndExit("SparkR is not supported for Mesos cluster.")
}
// If we're running a R app, set the main class to our specific R runner
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 66a5051200..42e748ec6d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark._
+import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -369,9 +370,6 @@ class SparkSubmitSuite
}
test("correctly builds R packages included in a jar with --packages") {
- // TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins.
- // It's hard to write the test in SparkR (because we can't create the repository dynamically)
- /*
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -389,7 +387,6 @@ class SparkSubmitSuite
rScriptDir)
runSparkSubmit(args)
}
- */
}
test("resolves command line argument paths correctly") {
diff --git a/make-distribution.sh b/make-distribution.sh
index e1c2afdbc6..d7d27e253f 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -220,6 +220,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR"
if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
mkdir -p "$DISTDIR"/R/lib
cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib
+ cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib
fi
# Download and copy in tachyon, if requested