From b0e8eb6d3e9e80fa62625a5b9382d93af77250db Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Fri, 13 Jan 2017 10:08:14 -0800 Subject: [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter ## What changes were proposed in this pull request? To allow specifying number of partitions when the DataFrame is created ## How was this patch tested? manual, unit tests Author: Felix Cheung Closes #16512 from felixcheung/rnumpart. --- R/pkg/inst/tests/testthat/test_rdd.R | 4 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 23 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) (limited to 'R/pkg/inst') diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R index a3d66c245a..2c41a6b075 100644 --- a/R/pkg/inst/tests/testthat/test_rdd.R +++ b/R/pkg/inst/tests/testthat/test_rdd.R @@ -381,8 +381,8 @@ test_that("aggregateRDD() on RDDs", { test_that("zipWithUniqueId() on RDDs", { rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) actual <- collectRDD(zipWithUniqueId(rdd)) - expected <- list(list("a", 0), list("b", 3), list("c", 1), - list("d", 4), list("e", 2)) + expected <- list(list("a", 0), list("b", 1), list("c", 4), + list("d", 2), list("e", 5)) expect_equal(actual, expected) rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3e8b96a513..26017427ab 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -196,6 +196,26 @@ test_that("create DataFrame from RDD", { expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float"))) expect_equal(as.list(collect(where(df, df$name == "John"))), list(name = "John", age = 19L, height = 176.5)) + expect_equal(getNumPartitions(toRDD(df)), 1) + + df <- as.DataFrame(cars, numPartitions = 2) + expect_equal(getNumPartitions(toRDD(df)), 2) + df <- createDataFrame(cars, numPartitions = 3) + expect_equal(getNumPartitions(toRDD(df)), 3) + # validate limit by num of rows + df <- createDataFrame(cars, numPartitions = 60) + expect_equal(getNumPartitions(toRDD(df)), 50) + # validate when 1 < (length(coll) / numSlices) << length(coll) + df <- createDataFrame(cars, numPartitions = 20) + expect_equal(getNumPartitions(toRDD(df)), 20) + + df <- as.DataFrame(data.frame(0)) + expect_is(df, "SparkDataFrame") + df <- createDataFrame(list(list(1))) + expect_is(df, "SparkDataFrame") + df <- as.DataFrame(data.frame(0), numPartitions = 2) + # no data to partition, goes to 1 + expect_equal(getNumPartitions(toRDD(df)), 1) setHiveContext(sc) sql("CREATE TABLE people (name string, age double, height float)") @@ -213,7 +233,8 @@ test_that("createDataFrame uses files for large objects", { # To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value conf <- callJMethod(sparkSession, "conf") callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100") - df <- suppressWarnings(createDataFrame(iris)) + df <- suppressWarnings(createDataFrame(iris, numPartitions = 3)) + expect_equal(getNumPartitions(toRDD(df)), 3) # Resetting the conf back to default value callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10)) -- cgit v1.2.3