aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-01-13 10:08:14 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2017-01-13 10:08:14 -0800
commitb0e8eb6d3e9e80fa62625a5b9382d93af77250db (patch)
tree2a81369a6effca9e9b32dc69afdc91abcf206958 /R/pkg/inst
parent285a7798e267311730b0163d37d726a81465468a (diff)
downloadspark-b0e8eb6d3e9e80fa62625a5b9382d93af77250db.tar.gz
spark-b0e8eb6d3e9e80fa62625a5b9382d93af77250db.tar.bz2
spark-b0e8eb6d3e9e80fa62625a5b9382d93af77250db.zip
[SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter
## What changes were proposed in this pull request? To allow specifying number of partitions when the DataFrame is created ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16512 from felixcheung/rnumpart.
Diffstat (limited to 'R/pkg/inst')
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R4
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R23
2 files changed, 24 insertions, 3 deletions
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index a3d66c245a..2c41a6b075 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -381,8 +381,8 @@ test_that("aggregateRDD() on RDDs", {
test_that("zipWithUniqueId() on RDDs", {
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
actual <- collectRDD(zipWithUniqueId(rdd))
- expected <- list(list("a", 0), list("b", 3), list("c", 1),
- list("d", 4), list("e", 2))
+ expected <- list(list("a", 0), list("b", 1), list("c", 4),
+ list("d", 2), list("e", 5))
expect_equal(actual, expected)
rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 3e8b96a513..26017427ab 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -196,6 +196,26 @@ test_that("create DataFrame from RDD", {
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(as.list(collect(where(df, df$name == "John"))),
list(name = "John", age = 19L, height = 176.5))
+ expect_equal(getNumPartitions(toRDD(df)), 1)
+
+ df <- as.DataFrame(cars, numPartitions = 2)
+ expect_equal(getNumPartitions(toRDD(df)), 2)
+ df <- createDataFrame(cars, numPartitions = 3)
+ expect_equal(getNumPartitions(toRDD(df)), 3)
+ # validate limit by num of rows
+ df <- createDataFrame(cars, numPartitions = 60)
+ expect_equal(getNumPartitions(toRDD(df)), 50)
+ # validate when 1 < (length(coll) / numSlices) << length(coll)
+ df <- createDataFrame(cars, numPartitions = 20)
+ expect_equal(getNumPartitions(toRDD(df)), 20)
+
+ df <- as.DataFrame(data.frame(0))
+ expect_is(df, "SparkDataFrame")
+ df <- createDataFrame(list(list(1)))
+ expect_is(df, "SparkDataFrame")
+ df <- as.DataFrame(data.frame(0), numPartitions = 2)
+ # no data to partition, goes to 1
+ expect_equal(getNumPartitions(toRDD(df)), 1)
setHiveContext(sc)
sql("CREATE TABLE people (name string, age double, height float)")
@@ -213,7 +233,8 @@ test_that("createDataFrame uses files for large objects", {
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
- df <- suppressWarnings(createDataFrame(iris))
+ df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
+ expect_equal(getNumPartitions(toRDD(df)), 3)
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))