diff options
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index ff3309874f..506971362f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.jdbc import java.sql.DriverManager import java.util.Properties +import scala.collection.JavaConverters.propertiesAsScalaMapConverter + import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException @@ -208,4 +210,84 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + df.write.format("jdbc") + .options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST")) + .save() + + assert(2 === sqlContext.read.jdbc(url, "TEST.SAVETEST", new Properties).count) + assert( + 2 === sqlContext.read.jdbc(url, "TEST.SAVETEST", new Properties).collect()(0).length) + } + + test("save API with SaveMode.Overwrite") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + + df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.SAVETEST") + .options(properties.asScala) + .save() + df2.write.mode(SaveMode.Overwrite).format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.SAVETEST") + .options(properties.asScala) + .save() + assert(1 === spark.read.jdbc(url1, "TEST.SAVETEST", properties).count()) + assert(2 === spark.read.jdbc(url1, "TEST.SAVETEST", properties).collect()(0).length) + } + + test("save errors if url is not specified") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + val e = intercept[RuntimeException] { + df.write.format("jdbc") + .option("dbtable", "TEST.SAVETEST") + .options(properties.asScala) + .save() + }.getMessage + assert(e.contains("Option 'url' is required")) + } + + test("save errors if dbtable is not specified") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + val e = intercept[RuntimeException] { + df.write.format("jdbc") + .option("url", url1) + .options(properties.asScala) + .save() + }.getMessage + assert(e.contains("Option 'dbtable' is required")) + } + + test("save errors if wrong user/password combination") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + val e = intercept[org.h2.jdbc.JdbcSQLException] { + df.write.format("jdbc") + .option("dbtable", "TEST.SAVETEST") + .option("url", url1) + .save() + }.getMessage + assert(e.contains("Wrong user name or password")) + } + + test("save errors if partitionColumn and numPartitions and bounds not set") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + val e = intercept[java.lang.IllegalArgumentException] { + df.write.format("jdbc") + .option("dbtable", "TEST.SAVETEST") + .option("url", url1) + .option("partitionColumn", "foo") + .save() + }.getMessage + assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + + " and 'numPartitions' are required.")) + } } |