aboutsummaryrefslogblamecommitdiff
path: root/R/pkg/R/context.R
blob: 720990e1c60877a525771d98ca7ab50a67f09dd2 (plain) (tree)


















                                                                          

                                                 
                                                               
                                               
   
                           

 
















                                                                                       
                                                      

                                                                          
                                                                                 

                                     
                                                                                



                                   
















                                                                                       
                                                        

                                                                          
                                                                                 

                                     
                                                                                  



                                                 

















                                                                               





















                                                                                            
                                                                                 










                                                                               


























                                                                            










                                        























                                                                             









                                                           














                                                                      


                                                                                          
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# context.R: SparkContext driven functions

getMinPartitions <- function(sc, minPartitions) {
  if (is.null(minPartitions)) {
    defaultParallelism <- callJMethod(sc, "defaultParallelism")
    minPartitions <- min(defaultParallelism, 2)
  }
  as.integer(minPartitions)
}

# Create an RDD from a text file.
#
# This function reads a text file from HDFS, a local file system (available on all
# nodes), or any Hadoop-supported file system URI, and creates an
# RDD of strings from it.
#
# @param sc SparkContext to use
# @param path Path of file to read. A vector of multiple paths is allowed.
# @param minPartitions Minimum number of partitions to be created. If NULL, the default
#  value is chosen based on available parallelism.
# @return RDD where each item is of type \code{character}
# @export
# @examples
#\dontrun{
#  sc <- sparkR.init()
#  lines <- textFile(sc, "myfile.txt")
#}
textFile <- function(sc, path, minPartitions = NULL) {
  # Allow the user to have a more flexible definiton of the text file path
  path <- suppressWarnings(normalizePath(path))
  # Convert a string vector of paths to a string containing comma separated paths
  path <- paste(path, collapse = ",")

  jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
  # jrdd is of type JavaRDD[String]
  RDD(jrdd, "string")
}

# Load an RDD saved as a SequenceFile containing serialized objects.
#
# The file to be loaded should be one that was previously generated by calling
# saveAsObjectFile() of the RDD class.
#
# @param sc SparkContext to use
# @param path Path of file to read. A vector of multiple paths is allowed.
# @param minPartitions Minimum number of partitions to be created. If NULL, the default
#  value is chosen based on available parallelism.
# @return RDD containing serialized R objects.
# @seealso saveAsObjectFile
# @export
# @examples
#\dontrun{
#  sc <- sparkR.init()
#  rdd <- objectFile(sc, "myfile")
#}
objectFile <- function(sc, path, minPartitions = NULL) {
  # Allow the user to have a more flexible definiton of the text file path
  path <- suppressWarnings(normalizePath(path))
  # Convert a string vector of paths to a string containing comma separated paths
  path <- paste(path, collapse = ",")

  jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
  # Assume the RDD contains serialized R objects.
  RDD(jrdd, "byte")
}

# Create an RDD from a homogeneous list or vector.
#
# This function creates an RDD from a local homogeneous list in R. The elements
# in the list are split into \code{numSlices} slices and distributed to nodes
# in the cluster.
#
# @param sc SparkContext to use
# @param coll collection to parallelize
# @param numSlices number of partitions to create in the RDD
# @return an RDD created from this collection
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10, 2)
# # The RDD should contain 10 elements
# length(rdd)
#}
parallelize <- function(sc, coll, numSlices = 1) {
  # TODO: bound/safeguard numSlices
  # TODO: unit tests for if the split works for all primitives
  # TODO: support matrix, data frame, etc
  if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
    if (is.data.frame(coll)) {
      message(paste("context.R: A data frame is parallelized by columns."))
    } else {
      if (is.matrix(coll)) {
        message(paste("context.R: A matrix is parallelized by elements."))
      } else {
        message(paste("context.R: parallelize() currently only supports lists and vectors.",
                      "Calling as.list() to coerce coll into a list."))
      }
    }
    coll <- as.list(coll)
  }

  if (numSlices > length(coll))
    numSlices <- length(coll)

  sliceLen <- ceiling(length(coll) / numSlices)
  slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)])

  # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
  # 2-tuples of raws
  serializedSlices <- lapply(slices, serialize, connection = NULL)

  jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
                      "createRDDFromArray", sc, serializedSlices)

  RDD(jrdd, "byte")
}

# Include this specified package on all workers
#
# This function can be used to include a package on all workers before the
# user's code is executed. This is useful in scenarios where other R package
# functions are used in a function passed to functions like \code{lapply}.
# NOTE: The package is assumed to be installed on every node in the Spark
# cluster.
#
# @param sc SparkContext to use
# @param pkg Package name
#
# @export
# @examples
#\dontrun{
#  library(Matrix)
#
#  sc <- sparkR.init()
#  # Include the matrix library we will be using
#  includePackage(sc, Matrix)
#
#  generateSparse <- function(x) {
#    sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
#  }
#
#  rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
#  collect(rdd)
#}
includePackage <- function(sc, pkg) {
  pkg <- as.character(substitute(pkg))
  if (exists(".packages", .sparkREnv)) {
    packages <- .sparkREnv$.packages
  } else {
    packages <- list()
  }
  packages <- c(packages, pkg)
  .sparkREnv$.packages <- packages
}

# @title Broadcast a variable to all workers
#
# @description
# Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
# object for reading it in distributed functions.
#
# @param sc Spark Context to use
# @param object Object to be broadcast
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:2, 2L)
#
# # Large Matrix object that we want to broadcast
# randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
# randomMatBr <- broadcast(sc, randomMat)
#
# # Use the broadcast variable inside the function
# useBroadcast <- function(x) {
#   sum(value(randomMatBr) * x)
# }
# sumRDD <- lapply(rdd, useBroadcast)
#}
broadcast <- function(sc, object) {
  objName <- as.character(substitute(object))
  serializedObj <- serialize(object, connection = NULL)

  jBroadcast <- callJMethod(sc, "broadcast", serializedObj)
  id <- as.character(callJMethod(jBroadcast, "id"))

  Broadcast(id, object, jBroadcast, objName)
}

# @title Set the checkpoint directory
#
# Set the directory under which RDDs are going to be checkpointed. The
# directory must be a HDFS path if running on a cluster.
#
# @param sc Spark Context to use
# @param dirName Directory path
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# setCheckpointDir(sc, "~/checkpoint")
# rdd <- parallelize(sc, 1:2, 2L)
# checkpoint(rdd)
#}
setCheckpointDir <- function(sc, dirName) {
  invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}