#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# context.R: SparkContext driven functions
getMinPartitions <- function(sc, minPartitions) {
if (is.null(minPartitions)) {
defaultParallelism <- callJMethod(sc, "defaultParallelism")
minPartitions <- min(defaultParallelism, 2)
}
as.integer(minPartitions)
}
#' Create an RDD from a text file.
#'
#' This function reads a text file from HDFS, a local file system (available on all
#' nodes), or any Hadoop-supported file system URI, and creates an
#' RDD of strings from it.
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
#' @noRd
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
# Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
# jrdd is of type JavaRDD[String]
RDD(jrdd, "string")
}
#' Load an RDD saved as a SequenceFile containing serialized objects.
#'
#' The file to be loaded should be one that was previously generated by calling
#' saveAsObjectFile() of the RDD class.
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile
#' @noRd
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
# Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
# Assume the RDD contains serialized R objects.
RDD(jrdd, "byte")
}
#' Create an RDD from a homogeneous list or vector.
#'
#' This function creates an RDD from a local homogeneous list in R. The elements
#' in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster.
#'
#' @param sc SparkContext to use
#' @param coll collection to parallelize
#' @param numSlices number of partitions to create in the RDD
#' @return an RDD created from this collection
#' @noRd
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2)
#' # The RDD should contain 10 elements
#' length(rdd)
#'}
parallelize <- function(sc, coll, numSlices = 1) {
# TODO: bound/safeguard numSlices
# TODO: unit tests for if the split works for all primitives
# TODO: support matrix, data frame, etc
# nolint start
# suppress lintr warning: Place a space before left parenthesis, except in a function call.
if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
# nolint end
if (is.data.frame(coll)) {
message(paste("context.R: A data frame is parallelized by columns."))
} else {
if (is.matrix(coll)) {
message(paste("context.R: A matrix is parallelized by elements."))
} else {
message(paste("context.R: parallelize() currently only supports lists and vectors.",
"Calling as.list() to coerce coll into a list."))
}
}
coll <- as.list(coll)
}
if (numSlices > length(coll))
numSlices <- length(coll)
sliceLen <- ceiling(length(coll) / numSlices)
slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)])
# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
# 2-tuples of raws
serializedSlices <- lapply(slices, serialize, connection = NULL)
jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
"createRDDFromArray", sc, serializedSlices)
RDD(jrdd, "byte")
}
#' Include this specified package on all workers
#'
#' This function can be used to include a package on all workers before the
#' user's code is executed. This is useful in scenarios where other R package
#' functions are used in a function passed to functions like \code{lapply}.
#' NOTE: The package is assumed to be installed on every node in the Spark
#' cluster.
#'
#' @param sc SparkContext to use
#' @param pkg Package name
#' @noRd
#' @examples
#'\dontrun{
#' library(Matrix)
#'
#' sc <- sparkR.init()
#' # Include the matrix library we will be using
#' includePackage(sc, Matrix)
#'
#' generateSparse <- function(x) {
#' sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
#' }
#'
#' rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
#' collect(rdd)
#'}
includePackage <- function(sc, pkg) {
pkg <- as.character(substitute(pkg))
if (exists(".packages", .sparkREnv)) {
packages <- .sparkREnv$.packages
} else {
packages <- list()
}
packages <- c(packages, pkg)
.sparkREnv$.packages <- packages
}
#' Broadcast a variable to all workers
#'
#' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
#' object for reading it in distributed functions.
#'
#' @param sc Spark Context to use
#' @param object Object to be broadcast
#' @noRd
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2, 2L)
#'
#' # Large Matrix object that we want to broadcast
#' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
#' randomMatBr <- broadcast(sc, randomMat)
#'
#' # Use the broadcast variable inside the function
#' useBroadcast <- function(x) {
#' sum(value(randomMatBr) * x)
#' }
#' sumRDD <- lapply(rdd, useBroadcast)
#'}
broadcast <- function(sc, object) {
objName <- as.character(substitute(object))
serializedObj <- serialize(object, connection = NULL)
jBroadcast <- callJMethod(sc, "broadcast", serializedObj)
id <- as.character(callJMethod(jBroadcast, "id"))
Broadcast(id, object, jBroadcast, objName)
}
#' Set the checkpoint directory
#'
#' Set the directory under which RDDs are going to be checkpointed. The
#' directory must be a HDFS path if running on a cluster.
#'
#' @param sc Spark Context to use
#' @param dirName Directory path
#' @noRd
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "~/checkpoint")
#' rdd <- parallelize(sc, 1:2, 2L)
#' checkpoint(rdd)
#'}
setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
#' Run a function over a list of elements, distributing the computations with Spark
#'
#' Run a function over a list of elements, distributing the computations with Spark. Applies a
#' function in a manner that is similar to doParallel or lapply to elements of a list.
#' The computations are distributed using Spark. It is conceptually the same as the following code:
#' lapply(list, func)
#'
#' Known limitations:
#' \itemize{
#' \item variable scoping and capture: compared to R's rich support for variable resolutions,
#' the distributed nature of SparkR limits how variables are resolved at runtime. All the
#' variables that are available through lexical scoping are embedded in the closure of the
#' function and available as read-only variables within the function. The environment variables
#' should be stored into temporary variables outside the function, and not directly accessed
#' within the function.
#'
#' \item loading external packages: In order to use a package, you need to load it inside the
#' closure. For example, if you rely on the MASS module, here is how you would use it:
#' \preformatted{
#' train <- function(hyperparam) {
#' library(MASS)
#' lm.ridge("y ~ x+z", data, lambda=hyperparam)
#' model
#' }
#' }
#' }
#'
#' @rdname spark.lapply
#' @param list the list of elements
#' @param func a function that takes one argument.
#' @return a list of results (the exact type being determined by the function)
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' doubled <- spark.lapply(1:10, function(x){2 * x})
#'}
#' @note spark.lapply since 2.0.0
spark.lapply <- function(list, func) {
sc <- getSparkContext()
rdd <- parallelize(sc, list, length(list))
results <- map(rdd, func)
local <- collectRDD(results)
local
}
#' Set new log level
#'
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"
#'
#' @rdname setLogLevel
#' @param level New log level
#' @export
#' @examples
#'\dontrun{
#' setLogLevel("ERROR")
#'}
#' @note setLogLevel since 2.0.0
setLogLevel <- function(level) {
sc <- getSparkContext()
callJMethod(sc, "setLogLevel", level)
}