aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests
diff options
context:
space:
mode:
authorfelixcheung <felixcheung_m@hotmail.com>2016-04-19 15:59:47 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-04-19 15:59:47 -0700
commitecd877e8335ff6bb06c96d3045ccade80676e714 (patch)
tree95af2af9dc9d84807f1f8b4386fa91b681c5b2d0 /R/pkg/inst/tests
parent008a8bbef0d3475610c13fff778a425900912650 (diff)
downloadspark-ecd877e8335ff6bb06c96d3045ccade80676e714.tar.gz
spark-ecd877e8335ff6bb06c96d3045ccade80676e714.tar.bz2
spark-ecd877e8335ff6bb06c96d3045ccade80676e714.zip
[SPARK-12224][SPARKR] R support for JDBC source
Add R API for `read.jdbc`, `write.jdbc`. Tested this quite a bit manually with different combinations of parameters. It's not clear if we could have automated tests in R for this - Scala `JDBCSuite` depends on Java H2 in-memory database. Refactored some code into util so they could be tested. Core's R SerDe code needs to be updated to allow access to java.util.Properties as `jobj` handle which is required by DataFrameReader/Writer's `jdbc` method. It would be possible, though more code to add a `sql/r/SQLUtils` helper function. Tested: ``` # with postgresql ../bin/sparkR --driver-class-path /usr/share/java/postgresql-9.4.1207.jre7.jar # read.jdbc df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", user = "user", password = "12345") df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", user = "user", password = 12345) # partitionColumn and numPartitions test df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", partitionColumn = "did", lowerBound = 0, upperBound = 200, numPartitions = 4, user = "user", password = 12345) a <- SparkR:::toRDD(df) SparkR:::getNumPartitions(a) [1] 4 SparkR:::collectPartition(a, 2L) # defaultParallelism test df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", partitionColumn = "did", lowerBound = 0, upperBound = 200, user = "user", password = 12345) SparkR:::getNumPartitions(a) [1] 2 # predicates test df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", predicates = list("did<=105"), user = "user", password = 12345) count(df) == 1 # write.jdbc, default save mode "error" irisDf <- as.DataFrame(sqlContext, iris) write.jdbc(irisDf, "jdbc:postgresql://localhost/db", "films2", user = "user", password = "12345") "error, already exists" write.jdbc(irisDf, "jdbc:postgresql://localhost/db", "iris", user = "user", password = "12345") ``` Author: felixcheung <felixcheung_m@hotmail.com> Closes #10480 from felixcheung/rreadjdbc.
Diffstat (limited to 'R/pkg/inst/tests')
-rw-r--r--R/pkg/inst/tests/testthat/test_utils.R24
1 files changed, 24 insertions, 0 deletions
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
index 4218138f64..01694ab5c4 100644
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ b/R/pkg/inst/tests/testthat/test_utils.R
@@ -140,3 +140,27 @@ test_that("cleanClosure on R functions", {
expect_equal(ls(env), "aBroadcast")
expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast)
})
+
+test_that("varargsToJProperties", {
+ jprops <- newJObject("java.util.Properties")
+ expect_true(class(jprops) == "jobj")
+
+ jprops <- varargsToJProperties(abc = "123")
+ expect_true(class(jprops) == "jobj")
+ expect_equal(callJMethod(jprops, "getProperty", "abc"), "123")
+
+ jprops <- varargsToJProperties(abc = "abc", b = 1)
+ expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc")
+ expect_equal(callJMethod(jprops, "getProperty", "b"), "1")
+
+ jprops <- varargsToJProperties()
+ expect_equal(callJMethod(jprops, "size"), 0L)
+})
+
+test_that("convertToJSaveMode", {
+ s <- convertToJSaveMode("error")
+ expect_true(class(s) == "jobj")
+ expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ")
+ expect_error(convertToJSaveMode("foo"),
+ 'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
+})