aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
Diffstat (limited to 'R')
-rw-r--r--R/install-dev.bat6
-rwxr-xr-xR/install-dev.sh4
-rw-r--r--R/pkg/R/sparkR.R14
-rw-r--r--R/pkg/inst/profile/general.R3
-rw-r--r--R/pkg/inst/worker/daemon.R5
-rw-r--r--R/pkg/inst/worker/worker.R3
6 files changed, 30 insertions, 5 deletions
diff --git a/R/install-dev.bat b/R/install-dev.bat
index 008a5c668b..ed1c91ae3a 100644
--- a/R/install-dev.bat
+++ b/R/install-dev.bat
@@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
+
+rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
+pushd %SPARK_HOME%\R\lib
+%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
+popd
+
diff --git a/R/install-dev.sh b/R/install-dev.sh
index 59d98c9c7a..4972bb9217 100755
--- a/R/install-dev.sh
+++ b/R/install-dev.sh
@@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
+# Zip the SparkR package so that it can be distributed to worker nodes on YARN
+cd $LIB_DIR
+jar cfM "$LIB_DIR/sparkr.zip" SparkR
+
popd > /dev/null
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index ebe2b2b8dc..7ff3fa628b 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -48,6 +48,12 @@ sparkR.stop <- function() {
}
}
+ # Remove the R package lib path from .libPaths()
+ if (exists(".libPath", envir = env)) {
+ libPath <- get(".libPath", envir = env)
+ .libPaths(.libPaths()[.libPaths() != libPath])
+ }
+
if (exists(".backendLaunched", envir = env)) {
callJStatic("SparkRHandler", "stopBackend")
}
@@ -155,14 +161,20 @@ sparkR.init <- function(
f <- file(path, open="rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
+ rLibPath <- readString(f)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
- length(monitorPort) == 0 || monitorPort == 0) {
+ length(monitorPort) == 0 || monitorPort == 0 ||
+ length(rLibPath) != 1) {
stop("JVM failed to launch")
}
assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
+ if (rLibPath != "") {
+ assign(".libPath", rLibPath, envir = .sparkREnv)
+ .libPaths(c(rLibPath, .libPaths()))
+ }
}
.sparkREnv$backendPort <- backendPort
diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R
index 2a8a8213d0..c55fe9ba7a 100644
--- a/R/pkg/inst/profile/general.R
+++ b/R/pkg/inst/profile/general.R
@@ -17,6 +17,7 @@
.First <- function() {
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
- .libPaths(c(packageDir, .libPaths()))
+ dirs <- strsplit(packageDir, ",")[[1]]
+ .libPaths(c(dirs, .libPaths()))
Sys.setenv(NOAWT=1)
}
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
index 3584b418a7..f55beac6c8 100644
--- a/R/pkg/inst/worker/daemon.R
+++ b/R/pkg/inst/worker/daemon.R
@@ -18,10 +18,11 @@
# Worker daemon
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
-script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/")
+dirs <- strsplit(rLibDir, ",")[[1]]
+script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")
# preload SparkR package, speedup worker
-.libPaths(c(rLibDir, .libPaths()))
+.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index 0c3b0d1f4b..3ae072beca 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -35,10 +35,11 @@ bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
+dirs <- strsplit(rLibDir, ",")[[1]]
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
# SparkR namespace
-.libPaths(c(rLibDir, .libPaths()))
+.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))