aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-07-01 09:54:02 +0100
committerSean Owen <sowen@cloudera.com>2016-07-01 09:54:02 +0100
commit0ad6ce7e54b1d8f5946dde652fa5341d15059158 (patch)
treef945df79c6f34f9c5335b75607cc0896e1c76305
parent66283ee0b25de2a5daaa21d50a05a7fadec1de77 (diff)
downloadspark-0ad6ce7e54b1d8f5946dde652fa5341d15059158.tar.gz
spark-0ad6ce7e54b1d8f5946dde652fa5341d15059158.tar.bz2
spark-0ad6ce7e54b1d8f5946dde652fa5341d15059158.zip
[SPARK-16222][SQL] JDBC Sources - Handling illegal input values for `fetchsize` and `batchsize`
#### What changes were proposed in this pull request? For JDBC data sources, users can specify `batchsize` for multi-row inserts and `fetchsize` for multi-row fetch. A few issues exist: - The property keys are case sensitive. Thus, the existing test cases for `fetchsize` use incorrect names, `fetchSize`. Basically, the test cases are broken. - No test case exists for `batchsize`. - We do not detect the illegal input values for `fetchsize` and `batchsize`. For example, when `batchsize` is zero, we got the following exception: ``` Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero ``` when `fetchsize` is less than zero, we got the exception from the underlying JDBC driver: ``` Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.h2.jdbc.JdbcSQLException: Invalid value "-1" for parameter "rows" [90008-183] ``` This PR fixes all the above issues, and issue the appropriate exceptions when detecting the illegal inputs for `fetchsize` and `batchsize`. Also update the function descriptions. #### How was this patch tested? Test cases are fixed and added. Author: gatorsmile <gatorsmile@gmail.com> Closes #13919 from gatorsmile/jdbcProperties.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala62
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala54
7 files changed, 98 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 35ba522786..e8c2885d77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -177,7 +177,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* clause expressions used to split the column `columnName` evenly.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
- * should be included.
+ * should be included. "fetchsize" can be used to control the
+ * number of rows per fetch.
* @since 1.4.0
*/
def jdbc(
@@ -207,7 +208,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @param predicates Condition in the where clause for each partition.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
- * should be included.
+ * should be included. "fetchsize" can be used to control the
+ * number of rows per fetch.
* @since 1.4.0
*/
def jdbc(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ca3972d62d..f77af76d2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -391,7 +391,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @param table Name of the table in the external database.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
- * should be included.
+ * should be included. "batchsize" can be used to control the
+ * number of rows per insert.
* @since 1.4.0
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 44cfbb9fbd..24e2c1a5fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -390,7 +390,11 @@ private[sql] class JDBCRDD(
val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause"
val stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
- val fetchSize = properties.getProperty("fetchsize", "0").toInt
+ val fetchSize = properties.getProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "0").toInt
+ require(fetchSize >= 0,
+ s"Invalid value `${fetchSize.toString}` for parameter " +
+ s"`${JdbcUtils.JDBC_BATCH_FETCH_SIZE}`. The minimum value is 0. When the value is 0, " +
+ "the JDBC driver ignores the value and does the estimates.")
stmt.setFetchSize(fetchSize)
val rs = stmt.executeQuery()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 065c8572b0..3529ee6e3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -34,6 +34,10 @@ import org.apache.spark.sql.types._
*/
object JdbcUtils extends Logging {
+ // the property names are case sensitive
+ val JDBC_BATCH_FETCH_SIZE = "fetchsize"
+ val JDBC_BATCH_INSERT_SIZE = "batchsize"
+
/**
* Returns a factory for creating connections to the given JDBC URL.
*
@@ -154,6 +158,10 @@ object JdbcUtils extends Logging {
nullTypes: Array[Int],
batchSize: Int,
dialect: JdbcDialect): Iterator[Byte] = {
+ require(batchSize >= 1,
+ s"Invalid value `${batchSize.toString}` for parameter " +
+ s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.")
+
val conn = getConnection()
var committed = false
val supportsTransactions = try {
@@ -275,7 +283,7 @@ object JdbcUtils extends Logging {
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(url, properties)
- val batchSize = properties.getProperty("batchsize", "1000").toInt
+ val batchSize = properties.getProperty(JDBC_BATCH_INSERT_SIZE, "1000").toInt
df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 2d6c3974a8..6baf1b6f16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -89,7 +89,7 @@ private object PostgresDialect extends JdbcDialect {
//
// See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
//
- if (properties.getOrElse("fetchsize", "0").toInt > 0) {
+ if (properties.getOrElse(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "0").toInt > 0) {
connection.setAutoCommit(false)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index fd6671a39b..11e66ad080 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -24,12 +24,13 @@ import java.util.{Calendar, GregorianCalendar, Properties}
import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -83,7 +84,7 @@ class JDBCSuite extends SparkFunSuite
|CREATE TEMPORARY TABLE fetchtwo
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass',
- | fetchSize '2')
+ | ${JdbcUtils.JDBC_BATCH_FETCH_SIZE} '2')
""".stripMargin.replaceAll("\n", " "))
sql(
@@ -348,38 +349,49 @@ class JDBCSuite extends SparkFunSuite
test("Basic API") {
assert(spark.read.jdbc(
- urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
+ urlWithUserAndPass, "TEST.PEOPLE", new Properties()).collect().length === 3)
+ }
+
+ test("Basic API with illegal FetchSize") {
+ val properties = new Properties()
+ properties.setProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "-1")
+ val e = intercept[SparkException] {
+ spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect()
+ }.getMessage
+ assert(e.contains("Invalid value `-1` for parameter `fetchsize`"))
}
test("Basic API with FetchSize") {
- val properties = new Properties
- properties.setProperty("fetchSize", "2")
- assert(spark.read.jdbc(
- urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
+ (0 to 4).foreach { size =>
+ val properties = new Properties()
+ properties.setProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, size.toString)
+ assert(spark.read.jdbc(
+ urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
+ }
}
test("Partitioning via JDBCPartitioningInfo API") {
assert(
- spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
+ spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties())
.collect().length === 3)
}
test("Partitioning via list-of-where-clauses API") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
- assert(spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties)
+ assert(spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
.collect().length === 3)
}
test("Partitioning on column that might have null values.") {
assert(
- spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties)
+ spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties())
.collect().length === 4)
assert(
- spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties)
+ spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties())
.collect().length === 4)
// partitioning on a nullable quoted column
assert(
- spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties)
+ spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties())
.collect().length === 4)
}
@@ -391,7 +403,7 @@ class JDBCSuite extends SparkFunSuite
lowerBound = 0,
upperBound = 4,
numPartitions = 0,
- connectionProperties = new Properties
+ connectionProperties = new Properties()
)
assert(res.count() === 8)
}
@@ -404,7 +416,7 @@ class JDBCSuite extends SparkFunSuite
lowerBound = 1,
upperBound = 5,
numPartitions = 10,
- connectionProperties = new Properties
+ connectionProperties = new Properties()
)
assert(res.count() === 8)
}
@@ -417,7 +429,7 @@ class JDBCSuite extends SparkFunSuite
lowerBound = 5,
upperBound = 5,
numPartitions = 4,
- connectionProperties = new Properties
+ connectionProperties = new Properties()
)
assert(res.count() === 8)
}
@@ -431,7 +443,7 @@ class JDBCSuite extends SparkFunSuite
lowerBound = 5,
upperBound = 1,
numPartitions = 3,
- connectionProperties = new Properties
+ connectionProperties = new Properties()
)
}.getMessage
assert(e.contains("Operation not allowed: the lower bound of partitioning column " +
@@ -495,8 +507,8 @@ class JDBCSuite extends SparkFunSuite
test("test DATE types") {
val rows = spark.read.jdbc(
- urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
- val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+ urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
+ val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
.cache().collect()
assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
assert(rows(1).getAs[java.sql.Date](1) === null)
@@ -504,8 +516,8 @@ class JDBCSuite extends SparkFunSuite
}
test("test DATE types in cache") {
- val rows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
- spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+ val rows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
+ spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
.cache().createOrReplaceTempView("mycached_date")
val cachedRows = sql("select * from mycached_date").collect()
assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
@@ -514,7 +526,7 @@ class JDBCSuite extends SparkFunSuite
test("test types for null value") {
val rows = spark.read.jdbc(
- urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect()
+ urlWithUserAndPass, "TEST.NULLTYPES", new Properties()).collect()
assert((0 to 14).forall(i => rows(0).isNullAt(i)))
}
@@ -560,7 +572,7 @@ class JDBCSuite extends SparkFunSuite
test("Remap types via JdbcDialects") {
JdbcDialects.registerDialect(testH2Dialect)
- val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
+ val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
assert(df.schema.filter(_.dataType != org.apache.spark.sql.types.StringType).isEmpty)
val rows = df.collect()
assert(rows(0).get(0).isInstanceOf[String])
@@ -694,7 +706,7 @@ class JDBCSuite extends SparkFunSuite
// Regression test for bug SPARK-11788
val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543");
val date = java.sql.Date.valueOf("1995-01-01")
- val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+ val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect()
assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
assert(rows(0).getAs[java.sql.Timestamp](2)
@@ -714,7 +726,7 @@ class JDBCSuite extends SparkFunSuite
}
test("test credentials in the connection url are not in the plan output") {
- val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
+ val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
val explain = ExplainCommand(df.queryExecution.logical, extended = true)
spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
r => assert(!List("testPass", "testUser").exists(r.toString.contains))
@@ -746,7 +758,7 @@ class JDBCSuite extends SparkFunSuite
urlWithUserAndPass,
"TEST.PEOPLE",
predicates = Array[String](jdbcPartitionWhereClause),
- new Properties)
+ new Properties())
df.createOrReplaceTempView("tempFrame")
assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index ff66f53fcf..2c6449fa68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -22,7 +22,9 @@ import java.util.Properties
import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkException
import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -90,10 +92,34 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
test("Basic CREATE") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
- df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties)
- assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count)
+ df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties())
+ assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).count())
assert(
- 2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length)
+ 2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).collect()(0).length)
+ }
+
+ test("Basic CREATE with illegal batchsize") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ (-1 to 0).foreach { size =>
+ val properties = new Properties()
+ properties.setProperty(JdbcUtils.JDBC_BATCH_INSERT_SIZE, size.toString)
+ val e = intercept[SparkException] {
+ df.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", properties)
+ }.getMessage
+ assert(e.contains(s"Invalid value `$size` for parameter `batchsize`"))
+ }
+ }
+
+ test("Basic CREATE with batchsize") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+ (1 to 3).foreach { size =>
+ val properties = new Properties()
+ properties.setProperty(JdbcUtils.JDBC_BATCH_INSERT_SIZE, size.toString)
+ df.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", properties)
+ assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).count())
+ }
}
test("CREATE with overwrite") {
@@ -101,11 +127,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
df.write.jdbc(url1, "TEST.DROPTEST", properties)
- assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+ assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties)
- assert(1 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+ assert(1 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
}
@@ -113,10 +139,10 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
- df.write.jdbc(url, "TEST.APPENDTEST", new Properties)
- df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties)
- assert(3 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties).count)
- assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length)
+ df.write.jdbc(url, "TEST.APPENDTEST", new Properties())
+ df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties())
+ assert(3 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).count())
+ assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).collect()(0).length)
}
test("CREATE then INSERT to truncate") {
@@ -125,7 +151,7 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties)
- assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
+ assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
}
@@ -133,22 +159,22 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
- df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
+ df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties())
intercept[org.apache.spark.SparkException] {
- df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
+ df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties())
}
}
test("INSERT to JDBC Datasource") {
sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
- assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+ assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
test("INSERT to JDBC Datasource with overwrite") {
sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
- assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+ assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
}