aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorHossein <hossein@databricks.com>2016-10-30 16:17:23 -0700
committerFelix Cheung <felixcheung@apache.org>2016-10-30 16:17:23 -0700
commit2881a2d1d1a650a91df2c6a01275eba14a43b42a (patch)
tree1083f14a8b284f1ebdb9e69a0b842edf6b14116d /R/pkg
parent8ae2da0b2551011e2f6cf02907a1e20c138a4b2f (diff)
downloadspark-2881a2d1d1a650a91df2c6a01275eba14a43b42a.tar.gz
spark-2881a2d1d1a650a91df2c6a01275eba14a43b42a.tar.bz2
spark-2881a2d1d1a650a91df2c6a01275eba14a43b42a.zip
[SPARK-17919] Make timeout to RBackend configurable in SparkR
## What changes were proposed in this pull request? This patch makes RBackend connection timeout configurable by user. ## How was this patch tested? N/A Author: Hossein <hossein@databricks.com> Closes #15471 from falaki/SPARK-17919.
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/R/backend.R20
-rw-r--r--R/pkg/R/client.R2
-rw-r--r--R/pkg/R/sparkR.R8
-rw-r--r--R/pkg/inst/worker/daemon.R4
-rw-r--r--R/pkg/inst/worker/worker.R7
5 files changed, 32 insertions, 9 deletions
diff --git a/R/pkg/R/backend.R b/R/pkg/R/backend.R
index 03e70bb2cb..0a789e6c37 100644
--- a/R/pkg/R/backend.R
+++ b/R/pkg/R/backend.R
@@ -108,13 +108,27 @@ invokeJava <- function(isStatic, objId, methodName, ...) {
conn <- get(".sparkRCon", .sparkREnv)
writeBin(requestMessage, conn)
- # TODO: check the status code to output error information
returnStatus <- readInt(conn)
+ handleErrors(returnStatus, conn)
+
+ # Backend will send +1 as keep alive value to prevent various connection timeouts
+ # on very long running jobs. See spark.r.heartBeatInterval
+ while (returnStatus == 1) {
+ returnStatus <- readInt(conn)
+ handleErrors(returnStatus, conn)
+ }
+
+ readObject(conn)
+}
+
+# Helper function to check for returned errors and print appropriate error message to user
+handleErrors <- function(returnStatus, conn) {
if (length(returnStatus) == 0) {
stop("No status is returned. Java SparkR backend might have failed.")
}
- if (returnStatus != 0) {
+
+ # 0 is success and +1 is reserved for heartbeats. Other negative values indicate errors.
+ if (returnStatus < 0) {
stop(readString(conn))
}
- readObject(conn)
}
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index 2d341d836c..9d82814211 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -19,7 +19,7 @@
# Creates a SparkR client connection object
# if one doesn't already exist
-connectBackend <- function(hostname, port, timeout = 6000) {
+connectBackend <- function(hostname, port, timeout) {
if (exists(".sparkRcon", envir = .sparkREnv)) {
if (isOpen(.sparkREnv[[".sparkRCon"]])) {
cat("SparkRBackend client connection already exists\n")
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index cc6d591bb2..6b4a2f2fdc 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -154,6 +154,7 @@ sparkR.sparkContext <- function(
packages <- processSparkPackages(sparkPackages)
existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
+ connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
if (existingPort != "") {
if (length(packages) != 0) {
warning(paste("sparkPackages has no effect when using spark-submit or sparkR shell",
@@ -187,6 +188,7 @@ sparkR.sparkContext <- function(
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
+ connectionTimeout <- readInt(f)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
@@ -194,7 +196,9 @@ sparkR.sparkContext <- function(
length(rLibPath) != 1) {
stop("JVM failed to launch")
}
- assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
+ assign(".monitorConn",
+ socketConnection(port = monitorPort, timeout = connectionTimeout),
+ envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
if (rLibPath != "") {
assign(".libPath", rLibPath, envir = .sparkREnv)
@@ -204,7 +208,7 @@ sparkR.sparkContext <- function(
.sparkREnv$backendPort <- backendPort
tryCatch({
- connectBackend("localhost", backendPort)
+ connectBackend("localhost", backendPort, timeout = connectionTimeout)
},
error = function(err) {
stop("Failed to connect JVM\n")
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
index b92e6be995..3a318b71ea 100644
--- a/R/pkg/inst/worker/daemon.R
+++ b/R/pkg/inst/worker/daemon.R
@@ -18,6 +18,7 @@
# Worker daemon
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
+connectionTimeout <- as.integer(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
dirs <- strsplit(rLibDir, ",")[[1]]
script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")
@@ -26,7 +27,8 @@ script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
-inputCon <- socketConnection(port = port, open = "rb", blocking = TRUE, timeout = 3600)
+inputCon <- socketConnection(
+ port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
while (TRUE) {
ready <- socketSelect(list(inputCon))
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index cfe41ded20..03e7450147 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -90,6 +90,7 @@ bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
+connectionTimeout <- as.integer(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
dirs <- strsplit(rLibDir, ",")[[1]]
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
@@ -98,8 +99,10 @@ dirs <- strsplit(rLibDir, ",")[[1]]
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
-inputCon <- socketConnection(port = port, blocking = TRUE, open = "rb")
-outputCon <- socketConnection(port = port, blocking = TRUE, open = "wb")
+inputCon <- socketConnection(
+ port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout)
+outputCon <- socketConnection(
+ port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
# read the index of the current partition inside the RDD
partition <- SparkR:::readInt(inputCon)