aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/worker/worker.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/inst/worker/worker.R')
-rw-r--r--R/pkg/inst/worker/worker.R7
1 files changed, 5 insertions, 2 deletions
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index cfe41ded20..03e7450147 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -90,6 +90,7 @@ bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
+connectionTimeout <- as.integer(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
dirs <- strsplit(rLibDir, ",")[[1]]
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
@@ -98,8 +99,10 @@ dirs <- strsplit(rLibDir, ",")[[1]]
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
-inputCon <- socketConnection(port = port, blocking = TRUE, open = "rb")
-outputCon <- socketConnection(port = port, blocking = TRUE, open = "wb")
+inputCon <- socketConnection(
+ port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout)
+outputCon <- socketConnection(
+ port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
# read the index of the current partition inside the RDD
partition <- SparkR:::readInt(inputCon)