aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/R/install.R64
-rw-r--r--R/pkg/R/sparkR.R51
-rw-r--r--R/pkg/R/utils.R4
3 files changed, 80 insertions, 39 deletions
diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R
index c6ed88e032..69b0a523b8 100644
--- a/R/pkg/R/install.R
+++ b/R/pkg/R/install.R
@@ -70,9 +70,9 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
localDir = NULL, overwrite = FALSE) {
version <- paste0("spark-", packageVersion("SparkR"))
hadoopVersion <- tolower(hadoopVersion)
- hadoopVersionName <- hadoop_version_name(hadoopVersion)
+ hadoopVersionName <- hadoopVersionName(hadoopVersion)
packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
- localDir <- ifelse(is.null(localDir), spark_cache_path(),
+ localDir <- ifelse(is.null(localDir), sparkCachePath(),
normalizePath(localDir, mustWork = FALSE))
if (is.na(file.info(localDir)$isdir)) {
@@ -88,12 +88,14 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
# can use dir.exists(packageLocalDir) under R 3.2.0 or later
if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
- fmt <- "Spark %s for Hadoop %s is found, and SPARK_HOME set to %s"
+ fmt <- "%s for Hadoop %s found, with SPARK_HOME set to %s"
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageLocalDir)
message(msg)
Sys.setenv(SPARK_HOME = packageLocalDir)
return(invisible(packageLocalDir))
+ } else {
+ message("Spark not found in the cache directory. Installation will start.")
}
packageLocalPath <- paste0(packageLocalDir, ".tgz")
@@ -102,7 +104,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
if (tarExists && !overwrite) {
message("tar file found.")
} else {
- robust_download_tar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
+ robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
}
message(sprintf("Installing to %s", localDir))
@@ -116,33 +118,37 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
invisible(packageLocalDir)
}
-robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
+robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
# step 1: use user-provided url
if (!is.null(mirrorUrl)) {
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
message(msg)
- success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
+ success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
- if (success) return()
+ if (success) {
+ return()
+ } else {
+ message(paste0("Unable to download from mirrorUrl: ", mirrorUrl))
+ }
} else {
- message("Mirror site not provided.")
+ message("MirrorUrl not provided.")
}
# step 2: use url suggested from apache website
- message("Looking for site suggested from apache website...")
- mirrorUrl <- get_preferred_mirror(version, packageName)
+ message("Looking for preferred site from apache website...")
+ mirrorUrl <- getPreferredMirror(version, packageName)
if (!is.null(mirrorUrl)) {
- success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
+ success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
if (success) return()
} else {
- message("Unable to find suggested mirror site.")
+ message("Unable to find preferred mirror site.")
}
# step 3: use backup option
message("To use backup site...")
- mirrorUrl <- default_mirror_url()
- success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
+ mirrorUrl <- defaultMirrorUrl()
+ success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
if (success) {
return(packageLocalPath)
@@ -155,7 +161,7 @@ robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName,
}
}
-get_preferred_mirror <- function(version, packageName) {
+getPreferredMirror <- function(version, packageName) {
jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=",
file.path("spark", version, packageName),
".tgz&as_json=1")
@@ -175,10 +181,10 @@ get_preferred_mirror <- function(version, packageName) {
mirrorPreferred
}
-direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
+directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
packageRemotePath <- paste0(
file.path(mirrorUrl, version, packageName), ".tgz")
- fmt <- paste("Downloading Spark %s for Hadoop %s from:\n- %s")
+ fmt <- "Downloading %s for Hadoop %s from:\n- %s"
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageRemotePath)
message(msg)
@@ -192,11 +198,11 @@ direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName,
!isFail
}
-default_mirror_url <- function() {
+defaultMirrorUrl <- function() {
"http://www-us.apache.org/dist/spark"
}
-hadoop_version_name <- function(hadoopVersion) {
+hadoopVersionName <- function(hadoopVersion) {
if (hadoopVersion == "without") {
"without-hadoop"
} else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) {
@@ -208,7 +214,7 @@ hadoop_version_name <- function(hadoopVersion) {
# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
# adapt to Spark context
-spark_cache_path <- function() {
+sparkCachePath <- function() {
if (.Platform$OS.type == "windows") {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) {
@@ -231,3 +237,21 @@ spark_cache_path <- function() {
}
normalizePath(path, mustWork = FALSE)
}
+
+
+installInstruction <- function(mode) {
+ if (mode == "remote") {
+ paste0("Connecting to a remote Spark master. ",
+ "Please make sure Spark package is also installed in this machine.\n",
+ "- If there is one, set the path in sparkHome parameter or ",
+ "environment variable SPARK_HOME.\n",
+ "- If not, you may run install.spark function to do the job. ",
+ "Please make sure the Spark and the Hadoop versions ",
+ "match the versions on the cluster. ",
+ "SparkR package is compatible with Spark ", packageVersion("SparkR"), ".",
+ "If you need further help, ",
+ "contact the administrators of the cluster.")
+ } else {
+ stop(paste0("No instruction found for ", mode, " mode."))
+ }
+}
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 85815af1f3..de53b0bf79 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -366,25 +366,10 @@ sparkR.session <- function(
}
overrideEnvs(sparkConfigMap, paramMap)
}
- # do not download if it is run in the sparkR shell
- if (!nzchar(master) || is_master_local(master)) {
- if (!is_sparkR_shell()) {
- if (is.na(file.info(sparkHome)$isdir)) {
- msg <- paste0("Spark not found in SPARK_HOME: ",
- sparkHome,
- " .\nTo search in the cache directory. ",
- "Installation will start if not found.")
- message(msg)
- packageLocalDir <- install.spark()
- sparkHome <- packageLocalDir
- } else {
- msg <- paste0("Spark package is found in SPARK_HOME: ", sparkHome)
- message(msg)
- }
- }
- }
if (!exists(".sparkRjsc", envir = .sparkREnv)) {
+ retHome <- sparkCheckInstall(sparkHome, master)
+ if (!is.null(retHome)) sparkHome <- retHome
sparkExecutorEnvMap <- new.env()
sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap,
sparkJars, sparkPackages)
@@ -547,3 +532,35 @@ processSparkPackages <- function(packages) {
}
splittedPackages
}
+
+# Utility function that checks and install Spark to local folder if not found
+#
+# Installation will not be triggered if it's called from sparkR shell
+# or if the master url is not local
+#
+# @param sparkHome directory to find Spark package.
+# @param master the Spark master URL, used to check local or remote mode.
+# @return NULL if no need to update sparkHome, and new sparkHome otherwise.
+sparkCheckInstall <- function(sparkHome, master) {
+ if (!isSparkRShell()) {
+ if (!is.na(file.info(sparkHome)$isdir)) {
+ msg <- paste0("Spark package found in SPARK_HOME: ", sparkHome)
+ message(msg)
+ NULL
+ } else {
+ if (!nzchar(master) || isMasterLocal(master)) {
+ msg <- paste0("Spark not found in SPARK_HOME: ",
+ sparkHome)
+ message(msg)
+ packageLocalDir <- install.spark()
+ packageLocalDir
+ } else {
+ msg <- paste0("Spark not found in SPARK_HOME: ",
+ sparkHome, "\n", installInstruction("remote"))
+ stop(msg)
+ }
+ }
+ } else {
+ NULL
+ }
+}
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index d78c0a7a53..2809ce5d37 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -690,10 +690,10 @@ getSparkContext <- function() {
sc
}
-is_master_local <- function(master) {
+isMasterLocal <- function(master) {
grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE)
}
-is_sparkR_shell <- function() {
+isSparkRShell <- function() {
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}