aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-23 00:00:30 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-05-23 00:01:40 -0700
commit7af3818c6b2bf35bfa531ab7cc3a4a714385015e (patch)
treee7dcb33da71845eaed6808045725882d6ba07796 /R/pkg
parent4583cf4be17155c68178155acf6866d7cc8f7df0 (diff)
downloadspark-7af3818c6b2bf35bfa531ab7cc3a4a714385015e.tar.gz
spark-7af3818c6b2bf35bfa531ab7cc3a4a714385015e.tar.bz2
spark-7af3818c6b2bf35bfa531ab7cc3a4a714385015e.zip
[SPARK-6806] [SPARKR] [DOCS] Fill in SparkR examples in programming guide
sqlCtx -> sqlContext You can check the docs by: ``` $ cd docs $ SKIP_SCALADOC=1 jekyll serve ``` cc shivaram Author: Davies Liu <davies@databricks.com> Closes #5442 from davies/r_docs and squashes the following commits: 7a12ec6 [Davies Liu] remove rdd in R docs 8496b26 [Davies Liu] remove the docs related to RDD e23b9d6 [Davies Liu] delete R docs for RDD API 222e4ff [Davies Liu] Merge branch 'master' into r_docs 89684ce [Davies Liu] Merge branch 'r_docs' of github.com:davies/spark into r_docs f0a10e1 [Davies Liu] address comments from @shivaram f61de71 [Davies Liu] Update pairRDD.R 3ef7cf3 [Davies Liu] use + instead of function(a,b) a+b 2f10a77 [Davies Liu] address comments from @cafreeman 9c2a062 [Davies Liu] mention R api together with Python API 23f751a [Davies Liu] Fill in SparkR examples in programming guide
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/R/DataFrame.R176
-rw-r--r--R/pkg/R/RDD.R2
-rw-r--r--R/pkg/R/SQLContext.R165
-rw-r--r--R/pkg/R/pairRDD.R4
-rw-r--r--R/pkg/R/sparkR.R10
-rw-r--r--R/pkg/inst/profile/shell.R6
-rw-r--r--R/pkg/inst/tests/test_sparkSQL.R156
7 files changed, 259 insertions, 260 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a7fa32e291..ed8093c80d 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -65,9 +65,9 @@ dataFrame <- function(sdf, isCached = FALSE) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' printSchema(df)
#'}
setMethod("printSchema",
@@ -88,9 +88,9 @@ setMethod("printSchema",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' dfSchema <- schema(df)
#'}
setMethod("schema",
@@ -110,9 +110,9 @@ setMethod("schema",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' explain(df, TRUE)
#'}
setMethod("explain",
@@ -139,9 +139,9 @@ setMethod("explain",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' isLocal(df)
#'}
setMethod("isLocal",
@@ -162,9 +162,9 @@ setMethod("isLocal",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' showDF(df)
#'}
setMethod("showDF",
@@ -185,9 +185,9 @@ setMethod("showDF",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' df
#'}
setMethod("show", "DataFrame",
@@ -210,9 +210,9 @@ setMethod("show", "DataFrame",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' dtypes(df)
#'}
setMethod("dtypes",
@@ -234,9 +234,9 @@ setMethod("dtypes",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' columns(df)
#'}
setMethod("columns",
@@ -267,11 +267,11 @@ setMethod("names",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' registerTempTable(df, "json_df")
-#' new_df <- sql(sqlCtx, "SELECT * FROM json_df")
+#' new_df <- sql(sqlContext, "SELECT * FROM json_df")
#'}
setMethod("registerTempTable",
signature(x = "DataFrame", tableName = "character"),
@@ -293,9 +293,9 @@ setMethod("registerTempTable",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df <- read.df(sqlCtx, path, "parquet")
-#' df2 <- read.df(sqlCtx, path2, "parquet")
+#' sqlContext <- sparkRSQL.init(sc)
+#' df <- read.df(sqlContext, path, "parquet")
+#' df2 <- read.df(sqlContext, path2, "parquet")
#' registerTempTable(df, "table1")
#' insertInto(df2, "table1", overwrite = TRUE)
#'}
@@ -316,9 +316,9 @@ setMethod("insertInto",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' cache(df)
#'}
setMethod("cache",
@@ -341,9 +341,9 @@ setMethod("cache",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' persist(df, "MEMORY_AND_DISK")
#'}
setMethod("persist",
@@ -366,9 +366,9 @@ setMethod("persist",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' persist(df, "MEMORY_AND_DISK")
#' unpersist(df)
#'}
@@ -391,9 +391,9 @@ setMethod("unpersist",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' newDF <- repartition(df, 2L)
#'}
setMethod("repartition",
@@ -415,9 +415,9 @@ setMethod("repartition",
# @examples
#\dontrun{
# sc <- sparkR.init()
-# sqlCtx <- sparkRSQL.init(sc)
+# sqlContext <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
-# df <- jsonFile(sqlCtx, path)
+# df <- jsonFile(sqlContext, path)
# newRDD <- toJSON(df)
#}
setMethod("toJSON",
@@ -440,9 +440,9 @@ setMethod("toJSON",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
#'}
setMethod("saveAsParquetFile",
@@ -461,9 +461,9 @@ setMethod("saveAsParquetFile",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' distinctDF <- distinct(df)
#'}
setMethod("distinct",
@@ -486,9 +486,9 @@ setMethod("distinct",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' collect(sample(df, FALSE, 0.5))
#' collect(sample(df, TRUE, 0.5))
#'}
@@ -523,9 +523,9 @@ setMethod("sample_frac",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' count(df)
#' }
setMethod("count",
@@ -545,9 +545,9 @@ setMethod("count",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' collected <- collect(df)
#' firstName <- collected[[1]]$name
#' }
@@ -580,9 +580,9 @@ setMethod("collect",
#' @examples
#' \dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' limitedDF <- limit(df, 10)
#' }
setMethod("limit",
@@ -599,9 +599,9 @@ setMethod("limit",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' take(df, 2)
#' }
setMethod("take",
@@ -626,9 +626,9 @@ setMethod("take",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' head(df)
#' }
setMethod("head",
@@ -647,9 +647,9 @@ setMethod("head",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' first(df)
#' }
setMethod("first",
@@ -669,9 +669,9 @@ setMethod("first",
# @examples
#\dontrun{
# sc <- sparkR.init()
-# sqlCtx <- sparkRSQL.init(sc)
+# sqlContext <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
-# df <- jsonFile(sqlCtx, path)
+# df <- jsonFile(sqlContext, path)
# rdd <- toRDD(df)
# }
setMethod("toRDD",
@@ -938,9 +938,9 @@ setMethod("select",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' selectExpr(df, "col1", "(col2 * 5) as newCol")
#' }
setMethod("selectExpr",
@@ -964,9 +964,9 @@ setMethod("selectExpr",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
#' }
setMethod("withColumn",
@@ -988,9 +988,9 @@ setMethod("withColumn",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' newDF <- mutate(df, newCol = df$col1 * 5, newCol2 = df$col1 * 2)
#' names(newDF) # Will contain newCol, newCol2
#' }
@@ -1024,9 +1024,9 @@ setMethod("mutate",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' newDF <- withColumnRenamed(df, "col1", "newCol1")
#' }
setMethod("withColumnRenamed",
@@ -1055,9 +1055,9 @@ setMethod("withColumnRenamed",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' newDF <- rename(df, col1 = df$newCol1)
#' }
setMethod("rename",
@@ -1095,9 +1095,9 @@ setClassUnion("characterOrColumn", c("character", "Column"))
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' arrange(df, df$col1)
#' arrange(df, "col1")
#' arrange(df, asc(df$col1), desc(abs(df$col2)))
@@ -1137,9 +1137,9 @@ setMethod("orderBy",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' filter(df, "col1 > 0")
#' filter(df, df$col2 != "abcdefg")
#' }
@@ -1177,9 +1177,9 @@ setMethod("where",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlCtx, path)
-#' df2 <- jsonFile(sqlCtx, path2)
+#' sqlContext <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlContext, path)
+#' df2 <- jsonFile(sqlContext, path2)
#' join(df1, df2) # Performs a Cartesian
#' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
#' join(df1, df2, df1$col1 == df2$col2, "right_outer")
@@ -1219,9 +1219,9 @@ setMethod("join",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlCtx, path)
-#' df2 <- jsonFile(sqlCtx, path2)
+#' sqlContext <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlContext, path)
+#' df2 <- jsonFile(sqlContext, path2)
#' unioned <- unionAll(df, df2)
#' }
setMethod("unionAll",
@@ -1244,9 +1244,9 @@ setMethod("unionAll",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlCtx, path)
-#' df2 <- jsonFile(sqlCtx, path2)
+#' sqlContext <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlContext, path)
+#' df2 <- jsonFile(sqlContext, path2)
#' intersectDF <- intersect(df, df2)
#' }
setMethod("intersect",
@@ -1269,9 +1269,9 @@ setMethod("intersect",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlCtx, path)
-#' df2 <- jsonFile(sqlCtx, path2)
+#' sqlContext <- sparkRSQL.init(sc)
+#' df1 <- jsonFile(sqlContext, path)
+#' df2 <- jsonFile(sqlContext, path2)
#' exceptDF <- except(df, df2)
#' }
#' @rdname except
@@ -1308,9 +1308,9 @@ setMethod("except",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
@@ -1318,8 +1318,8 @@ setMethod("write.df",
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
if (is.null(source)) {
- sqlCtx <- get(".sparkRSQLsc", envir = .sparkREnv)
- source <- callJMethod(sqlCtx, "getConf", "spark.sql.sources.default",
+ sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
+ source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
@@ -1371,9 +1371,9 @@ setMethod("saveDF",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' saveAsTable(df, "myfile")
#' }
setMethod("saveAsTable",
@@ -1381,8 +1381,8 @@ setMethod("saveAsTable",
mode = 'character'),
function(df, tableName, source = NULL, mode="append", ...){
if (is.null(source)) {
- sqlCtx <- get(".sparkRSQLsc", envir = .sparkREnv)
- source <- callJMethod(sqlCtx, "getConf", "spark.sql.sources.default",
+ sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
+ source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
@@ -1408,9 +1408,9 @@ setMethod("saveAsTable",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' describe(df)
#' describe(df, "col1")
#' describe(df, "col1", "col2")
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index d3a68fff78..0513299515 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -239,7 +239,7 @@ setMethod("cache",
# @aliases persist,RDD-method
setMethod("persist",
signature(x = "RDD", newLevel = "character"),
- function(x, newLevel) {
+ function(x, newLevel = "MEMORY_ONLY") {
callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
x@env$isCached <- TRUE
x
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 531442e845..36cc612875 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -69,7 +69,7 @@ infer_type <- function(x) {
#'
#' Converts an RDD to a DataFrame by infer the types.
#'
-#' @param sqlCtx A SQLContext
+#' @param sqlContext A SQLContext
#' @param data An RDD or list or data.frame
#' @param schema a list of column names or named list (StructType), optional
#' @return an DataFrame
@@ -77,13 +77,13 @@ infer_type <- function(x) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
-#' df <- createDataFrame(sqlCtx, rdd)
+#' df <- createDataFrame(sqlContext, rdd)
#' }
# TODO(davies): support sampling and infer type from NA
-createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
+createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
if (is.data.frame(data)) {
# get the names of columns, they will be put into RDD
schema <- names(data)
@@ -102,7 +102,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
})
}
if (is.list(data)) {
- sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlCtx)
+ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext)
rdd <- parallelize(sc, data)
} else if (inherits(data, "RDD")) {
rdd <- data
@@ -146,7 +146,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
srdd <- callJMethod(jrdd, "rdd")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
- srdd, schema$jobj, sqlCtx)
+ srdd, schema$jobj, sqlContext)
dataFrame(sdf)
}
@@ -161,7 +161,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
# @examples
#\dontrun{
# sc <- sparkR.init()
-# sqlCtx <- sparkRSQL.init(sc)
+# sqlContext <- sparkRSQL.init(sc)
# rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
# df <- toDF(rdd)
# }
@@ -170,14 +170,14 @@ setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
setMethod("toDF", signature(x = "RDD"),
function(x, ...) {
- sqlCtx <- if (exists(".sparkRHivesc", envir = .sparkREnv)) {
+ sqlContext <- if (exists(".sparkRHivesc", envir = .sparkREnv)) {
get(".sparkRHivesc", envir = .sparkREnv)
} else if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
get(".sparkRSQLsc", envir = .sparkREnv)
} else {
stop("no SQL context available")
}
- createDataFrame(sqlCtx, x, ...)
+ createDataFrame(sqlContext, x, ...)
})
#' Create a DataFrame from a JSON file.
@@ -185,24 +185,24 @@ setMethod("toDF", signature(x = "RDD"),
#' Loads a JSON file (one object per line), returning the result as a DataFrame
#' It goes through the entire dataset once to determine the schema.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @return DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' }
-jsonFile <- function(sqlCtx, path) {
+jsonFile <- function(sqlContext, path) {
# Allow the user to have a more flexible definiton of the text file path
path <- normalizePath(path)
# Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
- sdf <- callJMethod(sqlCtx, "jsonFile", path)
+ sdf <- callJMethod(sqlContext, "jsonFile", path)
dataFrame(sdf)
}
@@ -211,7 +211,7 @@ jsonFile <- function(sqlCtx, path) {
#
# Loads an RDD storing one JSON object per string as a DataFrame.
#
-# @param sqlCtx SQLContext to use
+# @param sqlContext SQLContext to use
# @param rdd An RDD of JSON string
# @param schema A StructType object to use as schema
# @param samplingRatio The ratio of simpling used to infer the schema
@@ -220,16 +220,16 @@ jsonFile <- function(sqlCtx, path) {
# @examples
#\dontrun{
# sc <- sparkR.init()
-# sqlCtx <- sparkRSQL.init(sc)
+# sqlContext <- sparkRSQL.init(sc)
# rdd <- texFile(sc, "path/to/json")
-# df <- jsonRDD(sqlCtx, rdd)
+# df <- jsonRDD(sqlContext, rdd)
# }
# TODO: support schema
-jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) {
+jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
rdd <- serializeToString(rdd)
if (is.null(schema)) {
- sdf <- callJMethod(sqlCtx, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
+ sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
dataFrame(sdf)
} else {
stop("not implemented")
@@ -241,64 +241,63 @@ jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) {
#'
#' Loads a Parquet file, returning the result as a DataFrame.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param ... Path(s) of parquet file(s) to read.
#' @return DataFrame
#' @export
# TODO: Implement saveasParquetFile and write examples for both
-parquetFile <- function(sqlCtx, ...) {
+parquetFile <- function(sqlContext, ...) {
# Allow the user to have a more flexible definiton of the text file path
paths <- lapply(list(...), normalizePath)
- sdf <- callJMethod(sqlCtx, "parquetFile", paths)
+ sdf <- callJMethod(sqlContext, "parquetFile", paths)
dataFrame(sdf)
}
#' SQL Query
-#'
+#'
#' Executes a SQL query using Spark, returning the result as a DataFrame.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param sqlQuery A character vector containing the SQL query
#' @return DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' registerTempTable(df, "table")
-#' new_df <- sql(sqlCtx, "SELECT * FROM table")
+#' new_df <- sql(sqlContext, "SELECT * FROM table")
#' }
-sql <- function(sqlCtx, sqlQuery) {
- sdf <- callJMethod(sqlCtx, "sql", sqlQuery)
- dataFrame(sdf)
+sql <- function(sqlContext, sqlQuery) {
+ sdf <- callJMethod(sqlContext, "sql", sqlQuery)
+ dataFrame(sdf)
}
-
#' Create a DataFrame from a SparkSQL Table
#'
#' Returns the specified Table as a DataFrame. The Table must have already been registered
#' in the SQLContext.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param tableName The SparkSQL Table to convert to a DataFrame.
#' @return DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' registerTempTable(df, "table")
-#' new_df <- table(sqlCtx, "table")
+#' new_df <- table(sqlContext, "table")
#' }
-table <- function(sqlCtx, tableName) {
- sdf <- callJMethod(sqlCtx, "table", tableName)
+table <- function(sqlContext, tableName) {
+ sdf <- callJMethod(sqlContext, "table", tableName)
dataFrame(sdf)
}
@@ -307,22 +306,22 @@ table <- function(sqlCtx, tableName) {
#'
#' Returns a DataFrame containing names of tables in the given database.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param databaseName name of the database
#' @return a DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' tables(sqlCtx, "hive")
+#' sqlContext <- sparkRSQL.init(sc)
+#' tables(sqlContext, "hive")
#' }
-tables <- function(sqlCtx, databaseName = NULL) {
+tables <- function(sqlContext, databaseName = NULL) {
jdf <- if (is.null(databaseName)) {
- callJMethod(sqlCtx, "tables")
+ callJMethod(sqlContext, "tables")
} else {
- callJMethod(sqlCtx, "tables", databaseName)
+ callJMethod(sqlContext, "tables", databaseName)
}
dataFrame(jdf)
}
@@ -332,22 +331,22 @@ tables <- function(sqlCtx, databaseName = NULL) {
#'
#' Returns the names of tables in the given database as an array.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param databaseName name of the database
#' @return a list of table names
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' tableNames(sqlCtx, "hive")
+#' sqlContext <- sparkRSQL.init(sc)
+#' tableNames(sqlContext, "hive")
#' }
-tableNames <- function(sqlCtx, databaseName = NULL) {
+tableNames <- function(sqlContext, databaseName = NULL) {
if (is.null(databaseName)) {
- callJMethod(sqlCtx, "tableNames")
+ callJMethod(sqlContext, "tableNames")
} else {
- callJMethod(sqlCtx, "tableNames", databaseName)
+ callJMethod(sqlContext, "tableNames", databaseName)
}
}
@@ -356,58 +355,58 @@ tableNames <- function(sqlCtx, databaseName = NULL) {
#'
#' Caches the specified table in-memory.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param tableName The name of the table being cached
#' @return DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' registerTempTable(df, "table")
-#' cacheTable(sqlCtx, "table")
+#' cacheTable(sqlContext, "table")
#' }
-cacheTable <- function(sqlCtx, tableName) {
- callJMethod(sqlCtx, "cacheTable", tableName)
+cacheTable <- function(sqlContext, tableName) {
+ callJMethod(sqlContext, "cacheTable", tableName)
}
#' Uncache Table
#'
#' Removes the specified table from the in-memory cache.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param tableName The name of the table being uncached
#' @return DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- jsonFile(sqlContext, path)
#' registerTempTable(df, "table")
-#' uncacheTable(sqlCtx, "table")
+#' uncacheTable(sqlContext, "table")
#' }
-uncacheTable <- function(sqlCtx, tableName) {
- callJMethod(sqlCtx, "uncacheTable", tableName)
+uncacheTable <- function(sqlContext, tableName) {
+ callJMethod(sqlContext, "uncacheTable", tableName)
}
#' Clear Cache
#'
#' Removes all cached tables from the in-memory cache.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @examples
#' \dontrun{
-#' clearCache(sqlCtx)
+#' clearCache(sqlContext)
#' }
-clearCache <- function(sqlCtx) {
- callJMethod(sqlCtx, "clearCache")
+clearCache <- function(sqlContext) {
+ callJMethod(sqlContext, "clearCache")
}
#' Drop Temporary Table
@@ -415,22 +414,22 @@ clearCache <- function(sqlCtx) {
#' Drops the temporary table with the given table name in the catalog.
#' If the table has been cached/persisted before, it's also unpersisted.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param tableName The name of the SparkSQL table to be dropped.
#' @examples
#' \dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df <- read.df(sqlCtx, path, "parquet")
+#' sqlContext <- sparkRSQL.init(sc)
+#' df <- read.df(sqlContext, path, "parquet")
#' registerTempTable(df, "table")
-#' dropTempTable(sqlCtx, "table")
+#' dropTempTable(sqlContext, "table")
#' }
-dropTempTable <- function(sqlCtx, tableName) {
+dropTempTable <- function(sqlContext, tableName) {
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
- callJMethod(sqlCtx, "dropTempTable", tableName)
+ callJMethod(sqlContext, "dropTempTable", tableName)
}
#' Load an DataFrame
@@ -441,7 +440,7 @@ dropTempTable <- function(sqlCtx, tableName) {
#' If `source` is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param path The path of files to load
#' @param source the name of external data source
#' @return DataFrame
@@ -449,24 +448,24 @@ dropTempTable <- function(sqlCtx, tableName) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
+#' sqlContext <- sparkRSQL.init(sc)
+#' df <- read.df(sqlContext, "path/to/file.json", source = "json")
#' }
-read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
+read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
}
- sdf <- callJMethod(sqlCtx, "load", source, options)
+ sdf <- callJMethod(sqlContext, "load", source, options)
dataFrame(sdf)
}
#' @aliases loadDF
#' @export
-loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
- read.df(sqlCtx, path, source, ...)
+loadDF <- function(sqlContext, path = NULL, source = NULL, ...) {
+ read.df(sqlContext, path, source, ...)
}
#' Create an external table
@@ -478,7 +477,7 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
#' If `source` is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#'
-#' @param sqlCtx SQLContext to use
+#' @param sqlContext SQLContext to use
#' @param tableName A name of the table
#' @param path The path of files to load
#' @param source the name of external data source
@@ -487,15 +486,15 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' df <- sparkRSQL.createExternalTable(sqlCtx, "myjson", path="path/to/json", source="json")
+#' sqlContext <- sparkRSQL.init(sc)
+#' df <- sparkRSQL.createExternalTable(sqlContext, "myjson", path="path/to/json", source="json")
#' }
-createExternalTable <- function(sqlCtx, tableName, path = NULL, source = NULL, ...) {
+createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
}
- sdf <- callJMethod(sqlCtx, "createExternalTable", tableName, source, options)
+ sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
dataFrame(sdf)
}
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 7694652856..1e24286dbc 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -329,7 +329,7 @@ setMethod("reduceByKey",
convertEnvsToList(keys, vals)
}
locallyReduced <- lapplyPartition(x, reduceVals)
- shuffled <- partitionBy(locallyReduced, numPartitions)
+ shuffled <- partitionBy(locallyReduced, numToInt(numPartitions))
lapplyPartition(shuffled, reduceVals)
})
@@ -436,7 +436,7 @@ setMethod("combineByKey",
convertEnvsToList(keys, combiners)
}
locallyCombined <- lapplyPartition(x, combineLocally)
- shuffled <- partitionBy(locallyCombined, numPartitions)
+ shuffled <- partitionBy(locallyCombined, numToInt(numPartitions))
mergeAfterShuffle <- function(part) {
combiners <- new.env()
keys <- new.env()
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index bc82df01f0..68387f0f53 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -222,7 +222,7 @@ sparkR.init <- function(
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
+#' sqlContext <- sparkRSQL.init(sc)
#'}
sparkRSQL.init <- function(jsc) {
@@ -230,11 +230,11 @@ sparkRSQL.init <- function(jsc) {
return(get(".sparkRSQLsc", envir = .sparkREnv))
}
- sqlCtx <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+ sqlContext <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"createSQLContext",
jsc)
- assign(".sparkRSQLsc", sqlCtx, envir = .sparkREnv)
- sqlCtx
+ assign(".sparkRSQLsc", sqlContext, envir = .sparkREnv)
+ sqlContext
}
#' Initialize a new HiveContext.
@@ -246,7 +246,7 @@ sparkRSQL.init <- function(jsc) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' sqlCtx <- sparkRHive.init(sc)
+#' sqlContext <- sparkRHive.init(sc)
#'}
sparkRHive.init <- function(jsc) {
diff --git a/R/pkg/inst/profile/shell.R b/R/pkg/inst/profile/shell.R
index 33478d9e29..ca94f1d4e7 100644
--- a/R/pkg/inst/profile/shell.R
+++ b/R/pkg/inst/profile/shell.R
@@ -26,8 +26,8 @@
sc <- SparkR::sparkR.init(Sys.getenv("MASTER", unset = ""))
assign("sc", sc, envir=.GlobalEnv)
- sqlCtx <- SparkR::sparkRSQL.init(sc)
- assign("sqlCtx", sqlCtx, envir=.GlobalEnv)
+ sqlContext <- SparkR::sparkRSQL.init(sc)
+ assign("sqlContext", sqlContext, envir=.GlobalEnv)
cat("\n Welcome to SparkR!")
- cat("\n Spark context is available as sc, SQL context is available as sqlCtx\n")
+ cat("\n Spark context is available as sc, SQL context is available as sqlContext\n")
}
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 1768c57fd0..1857e636e8 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -23,7 +23,7 @@ context("SparkSQL functions")
sc <- sparkR.init()
-sqlCtx <- sparkRSQL.init(sc)
+sqlContext <- sparkRSQL.init(sc)
mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
@@ -67,25 +67,25 @@ test_that("structType and structField", {
test_that("create DataFrame from RDD", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
- df <- createDataFrame(sqlCtx, rdd, list("a", "b"))
+ df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
- df <- createDataFrame(sqlCtx, rdd)
+ df <- createDataFrame(sqlContext, rdd)
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("_1", "_2"))
schema <- structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE))
- df <- createDataFrame(sqlCtx, rdd, schema)
+ df <- createDataFrame(sqlContext, rdd, schema)
expect_true(inherits(df, "DataFrame"))
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) })
- df <- createDataFrame(sqlCtx, rdd)
+ df <- createDataFrame(sqlContext, rdd)
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 10)
expect_equal(columns(df), c("a", "b"))
@@ -121,17 +121,17 @@ test_that("toDF", {
test_that("create DataFrame from list or data.frame", {
l <- list(list(1, 2), list(3, 4))
- df <- createDataFrame(sqlCtx, l, c("a", "b"))
+ df <- createDataFrame(sqlContext, l, c("a", "b"))
expect_equal(columns(df), c("a", "b"))
l <- list(list(a=1, b=2), list(a=3, b=4))
- df <- createDataFrame(sqlCtx, l)
+ df <- createDataFrame(sqlContext, l)
expect_equal(columns(df), c("a", "b"))
a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
- df <- createDataFrame(sqlCtx, ldf)
+ df <- createDataFrame(sqlContext, ldf)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
@@ -142,7 +142,7 @@ test_that("create DataFrame from list or data.frame", {
test_that("create DataFrame with different data types", {
l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"),
f = as.POSIXct("2015-03-15 12:13:14.056"))
- df <- createDataFrame(sqlCtx, list(l))
+ df <- createDataFrame(sqlContext, list(l))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"),
c("d", "string"), c("e", "date"), c("f", "timestamp")))
expect_equal(count(df), 1)
@@ -154,7 +154,7 @@ test_that("create DataFrame with different data types", {
# e <- new.env()
# assign("n", 3L, envir = e)
# l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L))
-# df <- createDataFrame(sqlCtx, list(l), c("a", "b", "c", "d"))
+# df <- createDataFrame(sqlContext, list(l), c("a", "b", "c", "d"))
# expect_equal(dtypes(df), list(c("a", "array<int>"), c("b", "array<string>"),
# c("c", "map<string,int>"), c("d", "struct<a:string,b:int>")))
# expect_equal(count(df), 1)
@@ -163,7 +163,7 @@ test_that("create DataFrame with different data types", {
#})
test_that("jsonFile() on a local file returns a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)
})
@@ -171,88 +171,88 @@ test_that("jsonFile() on a local file returns a DataFrame", {
test_that("jsonRDD() on a RDD with json string", {
rdd <- parallelize(sc, mockLines)
expect_true(count(rdd) == 3)
- df <- jsonRDD(sqlCtx, rdd)
+ df <- jsonRDD(sqlContext, rdd)
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)
rdd2 <- flatMap(rdd, function(x) c(x, x))
- df <- jsonRDD(sqlCtx, rdd2)
+ df <- jsonRDD(sqlContext, rdd2)
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 6)
})
test_that("test cache, uncache and clearCache", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
registerTempTable(df, "table1")
- cacheTable(sqlCtx, "table1")
- uncacheTable(sqlCtx, "table1")
- clearCache(sqlCtx)
- dropTempTable(sqlCtx, "table1")
+ cacheTable(sqlContext, "table1")
+ uncacheTable(sqlContext, "table1")
+ clearCache(sqlContext)
+ dropTempTable(sqlContext, "table1")
})
test_that("test tableNames and tables", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
registerTempTable(df, "table1")
- expect_true(length(tableNames(sqlCtx)) == 1)
- df <- tables(sqlCtx)
+ expect_true(length(tableNames(sqlContext)) == 1)
+ df <- tables(sqlContext)
expect_true(count(df) == 1)
- dropTempTable(sqlCtx, "table1")
+ dropTempTable(sqlContext, "table1")
})
test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
registerTempTable(df, "table1")
- newdf <- sql(sqlCtx, "SELECT * FROM table1 where name = 'Michael'")
+ newdf <- sql(sqlContext, "SELECT * FROM table1 where name = 'Michael'")
expect_true(inherits(newdf, "DataFrame"))
expect_true(count(newdf) == 1)
- dropTempTable(sqlCtx, "table1")
+ dropTempTable(sqlContext, "table1")
})
test_that("insertInto() on a registered table", {
- df <- read.df(sqlCtx, jsonPath, "json")
+ df <- read.df(sqlContext, jsonPath, "json")
write.df(df, parquetPath, "parquet", "overwrite")
- dfParquet <- read.df(sqlCtx, parquetPath, "parquet")
+ dfParquet <- read.df(sqlContext, parquetPath, "parquet")
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
- df2 <- read.df(sqlCtx, jsonPath2, "json")
+ df2 <- read.df(sqlContext, jsonPath2, "json")
write.df(df2, parquetPath2, "parquet", "overwrite")
- dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")
+ dfParquet2 <- read.df(sqlContext, parquetPath2, "parquet")
registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1")
- expect_true(count(sql(sqlCtx, "select * from table1")) == 5)
- expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Michael")
- dropTempTable(sqlCtx, "table1")
+ expect_true(count(sql(sqlContext, "select * from table1")) == 5)
+ expect_true(first(sql(sqlContext, "select * from table1 order by age"))$name == "Michael")
+ dropTempTable(sqlContext, "table1")
registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1", overwrite = TRUE)
- expect_true(count(sql(sqlCtx, "select * from table1")) == 2)
- expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Bob")
- dropTempTable(sqlCtx, "table1")
+ expect_true(count(sql(sqlContext, "select * from table1")) == 2)
+ expect_true(first(sql(sqlContext, "select * from table1 order by age"))$name == "Bob")
+ dropTempTable(sqlContext, "table1")
})
test_that("table() returns a new DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
registerTempTable(df, "table1")
- tabledf <- table(sqlCtx, "table1")
+ tabledf <- table(sqlContext, "table1")
expect_true(inherits(tabledf, "DataFrame"))
expect_true(count(tabledf) == 3)
- dropTempTable(sqlCtx, "table1")
+ dropTempTable(sqlContext, "table1")
})
test_that("toRDD() returns an RRDD", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
testRDD <- toRDD(df)
expect_true(inherits(testRDD, "RDD"))
expect_true(count(testRDD) == 3)
})
test_that("union on two RDDs created from DataFrames returns an RRDD", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
RDD1 <- toRDD(df)
RDD2 <- toRDD(df)
unioned <- unionRDD(RDD1, RDD2)
@@ -274,7 +274,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
writeLines(textLines, textPath)
textRDD <- textFile(sc, textPath)
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
dfRDD <- toRDD(df)
unionByte <- unionRDD(rdd, dfRDD)
@@ -292,7 +292,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern="spark-test", fileext=".tmp")
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)
@@ -303,7 +303,7 @@ test_that("objectFile() works with row serialization", {
})
test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
testRDD <- lapply(df, function(row) {
row$newCol <- row$age + 5
row
@@ -315,7 +315,7 @@ test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
})
test_that("collect() returns a data.frame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
rdf <- collect(df)
expect_true(is.data.frame(rdf))
expect_true(names(rdf)[1] == "age")
@@ -324,20 +324,20 @@ test_that("collect() returns a data.frame", {
})
test_that("limit() returns DataFrame with the correct number of rows", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
dfLimited <- limit(df, 2)
expect_true(inherits(dfLimited, "DataFrame"))
expect_true(count(dfLimited) == 2)
})
test_that("collect() and take() on a DataFrame return the same number of rows and columns", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
expect_true(nrow(collect(df)) == nrow(take(df, 10)))
expect_true(ncol(collect(df)) == ncol(take(df, 10)))
})
test_that("multiple pipeline transformations starting with a DataFrame result in an RDD with the correct values", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
row
@@ -354,7 +354,7 @@ test_that("multiple pipeline transformations starting with a DataFrame result in
})
test_that("cache(), persist(), and unpersist() on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
expect_false(df@env$isCached)
cache(df)
expect_true(df@env$isCached)
@@ -373,7 +373,7 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
})
test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
testSchema <- schema(df)
expect_true(length(testSchema$fields()) == 2)
expect_true(testSchema$fields()[[1]]$dataType.toString() == "LongType")
@@ -394,7 +394,7 @@ test_that("schema(), dtypes(), columns(), names() return the correct values/form
})
test_that("head() and first() return the correct data", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
testHead <- head(df)
expect_true(nrow(testHead) == 3)
expect_true(ncol(testHead) == 2)
@@ -415,14 +415,14 @@ test_that("distinct() on DataFrames", {
jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPathWithDup)
- df <- jsonFile(sqlCtx, jsonPathWithDup)
+ df <- jsonFile(sqlContext, jsonPathWithDup)
uniques <- distinct(df)
expect_true(inherits(uniques, "DataFrame"))
expect_true(count(uniques) == 3)
})
test_that("sample on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_true(inherits(sampled, "DataFrame"))
@@ -435,7 +435,7 @@ test_that("sample on a DataFrame", {
})
test_that("select operators", {
- df <- select(jsonFile(sqlCtx, jsonPath), "name", "age")
+ df <- select(jsonFile(sqlContext, jsonPath), "name", "age")
expect_true(inherits(df$name, "Column"))
expect_true(inherits(df[[2]], "Column"))
expect_true(inherits(df[["age"]], "Column"))
@@ -461,7 +461,7 @@ test_that("select operators", {
})
test_that("select with column", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
df1 <- select(df, "name")
expect_true(columns(df1) == c("name"))
expect_true(count(df1) == 3)
@@ -472,7 +472,7 @@ test_that("select with column", {
})
test_that("selectExpr() on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
selected <- selectExpr(df, "age * 2")
expect_true(names(selected) == "(age * 2)")
expect_equal(collect(selected), collect(select(df, df$age * 2L)))
@@ -483,7 +483,7 @@ test_that("selectExpr() on a DataFrame", {
})
test_that("column calculation", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
d <- collect(select(df, alias(df$age + 1, "age2")))
expect_true(names(d) == c("age2"))
df2 <- select(df, lower(df$name), abs(df$age))
@@ -492,15 +492,15 @@ test_that("column calculation", {
})
test_that("read.df() from json file", {
- df <- read.df(sqlCtx, jsonPath, "json")
+ df <- read.df(sqlContext, jsonPath, "json")
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)
})
test_that("write.df() as parquet file", {
- df <- read.df(sqlCtx, jsonPath, "json")
+ df <- read.df(sqlContext, jsonPath, "json")
write.df(df, parquetPath, "parquet", mode="overwrite")
- df2 <- read.df(sqlCtx, parquetPath, "parquet")
+ df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_true(inherits(df2, "DataFrame"))
expect_true(count(df2) == 3)
})
@@ -553,7 +553,7 @@ test_that("column binary mathfunctions", {
"{\"a\":4, \"b\":8}")
jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPathWithDup)
- df <- jsonFile(sqlCtx, jsonPathWithDup)
+ df <- jsonFile(sqlContext, jsonPathWithDup)
expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5))
expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6))
expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7))
@@ -565,7 +565,7 @@ test_that("column binary mathfunctions", {
})
test_that("string operators", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
expect_equal(count(where(df, like(df$name, "A%"))), 1)
expect_equal(count(where(df, startsWith(df$name, "A"))), 1)
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
@@ -573,7 +573,7 @@ test_that("string operators", {
})
test_that("group by", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
df1 <- agg(df, name = "max", age = "sum")
expect_true(1 == count(df1))
df1 <- agg(df, age2 = max(df$age))
@@ -610,7 +610,7 @@ test_that("group by", {
})
test_that("arrange() and orderBy() on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
sorted <- arrange(df, df$age)
expect_true(collect(sorted)[1,2] == "Michael")
@@ -627,7 +627,7 @@ test_that("arrange() and orderBy() on a DataFrame", {
})
test_that("filter() on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
filtered <- filter(df, "age > 20")
expect_true(count(filtered) == 1)
expect_true(collect(filtered)$name == "Andy")
@@ -637,7 +637,7 @@ test_that("filter() on a DataFrame", {
})
test_that("join() on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
"{\"name\":\"Andy\", \"test\": \"no\"}",
@@ -645,7 +645,7 @@ test_that("join() on a DataFrame", {
"{\"name\":\"Bob\", \"test\": \"yes\"}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines2, jsonPath2)
- df2 <- jsonFile(sqlCtx, jsonPath2)
+ df2 <- jsonFile(sqlContext, jsonPath2)
joined <- join(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
@@ -668,7 +668,7 @@ test_that("join() on a DataFrame", {
})
test_that("toJSON() returns an RDD of the correct values", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
testRDD <- toJSON(df)
expect_true(inherits(testRDD, "RDD"))
expect_true(SparkR:::getSerializedMode(testRDD) == "string")
@@ -676,25 +676,25 @@ test_that("toJSON() returns an RDD of the correct values", {
})
test_that("showDF()", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
s <- capture.output(showDF(df))
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
})
test_that("isLocal()", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
expect_false(isLocal(df))
})
test_that("unionAll(), except(), and intersect() on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath2)
- df2 <- read.df(sqlCtx, jsonPath2, "json")
+ df2 <- read.df(sqlContext, jsonPath2, "json")
unioned <- arrange(unionAll(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
@@ -713,7 +713,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
})
test_that("withColumn() and withColumnRenamed()", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
expect_true(length(columns(newDF)) == 3)
expect_true(columns(newDF)[3] == "newAge")
@@ -725,7 +725,7 @@ test_that("withColumn() and withColumnRenamed()", {
})
test_that("mutate() and rename()", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
newDF <- mutate(df, newAge = df$age + 2)
expect_true(length(columns(newDF)) == 3)
expect_true(columns(newDF)[3] == "newAge")
@@ -737,25 +737,25 @@ test_that("mutate() and rename()", {
})
test_that("write.df() on DataFrame and works with parquetFile", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
- parquetDF <- parquetFile(sqlCtx, parquetPath)
+ parquetDF <- parquetFile(sqlContext, parquetPath)
expect_true(inherits(parquetDF, "DataFrame"))
expect_equal(count(df), count(parquetDF))
})
test_that("parquetFile works with multiple input paths", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode="overwrite")
- parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
+ parquetDF <- parquetFile(sqlContext, parquetPath, parquetPath2)
expect_true(inherits(parquetDF, "DataFrame"))
expect_true(count(parquetDF) == count(df)*2)
})
test_that("describe() on a DataFrame", {
- df <- jsonFile(sqlCtx, jsonPath)
+ df <- jsonFile(sqlContext, jsonPath)
stats <- describe(df, "age")
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")