aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala82
1 files changed, 82 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index ff3309874f..506971362f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.jdbc
import java.sql.DriverManager
import java.util.Properties
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
@@ -208,4 +210,84 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
+
+ test("save works for format(\"jdbc\") if url and dbtable are set") {
+ val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ df.write.format("jdbc")
+ .options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST"))
+ .save()
+
+ assert(2 === sqlContext.read.jdbc(url, "TEST.SAVETEST", new Properties).count)
+ assert(
+ 2 === sqlContext.read.jdbc(url, "TEST.SAVETEST", new Properties).collect()(0).length)
+ }
+
+ test("save API with SaveMode.Overwrite") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+ val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
+
+ df.write.format("jdbc")
+ .option("url", url1)
+ .option("dbtable", "TEST.SAVETEST")
+ .options(properties.asScala)
+ .save()
+ df2.write.mode(SaveMode.Overwrite).format("jdbc")
+ .option("url", url1)
+ .option("dbtable", "TEST.SAVETEST")
+ .options(properties.asScala)
+ .save()
+ assert(1 === spark.read.jdbc(url1, "TEST.SAVETEST", properties).count())
+ assert(2 === spark.read.jdbc(url1, "TEST.SAVETEST", properties).collect()(0).length)
+ }
+
+ test("save errors if url is not specified") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ val e = intercept[RuntimeException] {
+ df.write.format("jdbc")
+ .option("dbtable", "TEST.SAVETEST")
+ .options(properties.asScala)
+ .save()
+ }.getMessage
+ assert(e.contains("Option 'url' is required"))
+ }
+
+ test("save errors if dbtable is not specified") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ val e = intercept[RuntimeException] {
+ df.write.format("jdbc")
+ .option("url", url1)
+ .options(properties.asScala)
+ .save()
+ }.getMessage
+ assert(e.contains("Option 'dbtable' is required"))
+ }
+
+ test("save errors if wrong user/password combination") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ val e = intercept[org.h2.jdbc.JdbcSQLException] {
+ df.write.format("jdbc")
+ .option("dbtable", "TEST.SAVETEST")
+ .option("url", url1)
+ .save()
+ }.getMessage
+ assert(e.contains("Wrong user name or password"))
+ }
+
+ test("save errors if partitionColumn and numPartitions and bounds not set") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ val e = intercept[java.lang.IllegalArgumentException] {
+ df.write.format("jdbc")
+ .option("dbtable", "TEST.SAVETEST")
+ .option("url", url1)
+ .option("partitionColumn", "foo")
+ .save()
+ }.getMessage
+ assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
+ " and 'numPartitions' are required."))
+ }
}