diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-07-01 09:54:02 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-07-01 09:54:02 +0100 |
commit | 0ad6ce7e54b1d8f5946dde652fa5341d15059158 (patch) | |
tree | f945df79c6f34f9c5335b75607cc0896e1c76305 /sql/core | |
parent | 66283ee0b25de2a5daaa21d50a05a7fadec1de77 (diff) | |
download | spark-0ad6ce7e54b1d8f5946dde652fa5341d15059158.tar.gz spark-0ad6ce7e54b1d8f5946dde652fa5341d15059158.tar.bz2 spark-0ad6ce7e54b1d8f5946dde652fa5341d15059158.zip |
[SPARK-16222][SQL] JDBC Sources - Handling illegal input values for `fetchsize` and `batchsize`
#### What changes were proposed in this pull request?
For JDBC data sources, users can specify `batchsize` for multi-row inserts and `fetchsize` for multi-row fetch. A few issues exist:
- The property keys are case sensitive. Thus, the existing test cases for `fetchsize` use incorrect names, `fetchSize`. Basically, the test cases are broken.
- No test case exists for `batchsize`.
- We do not detect the illegal input values for `fetchsize` and `batchsize`.
For example, when `batchsize` is zero, we got the following exception:
```
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero
```
when `fetchsize` is less than zero, we got the exception from the underlying JDBC driver:
```
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.h2.jdbc.JdbcSQLException: Invalid value "-1" for parameter "rows" [90008-183]
```
This PR fixes all the above issues, and issue the appropriate exceptions when detecting the illegal inputs for `fetchsize` and `batchsize`. Also update the function descriptions.
#### How was this patch tested?
Test cases are fixed and added.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #13919 from gatorsmile/jdbcProperties.
Diffstat (limited to 'sql/core')
7 files changed, 98 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 35ba522786..e8c2885d77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -177,7 +177,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * clause expressions used to split the column `columnName` evenly. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string * tag/value. Normally at least a "user" and "password" property - * should be included. + * should be included. "fetchsize" can be used to control the + * number of rows per fetch. * @since 1.4.0 */ def jdbc( @@ -207,7 +208,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @param predicates Condition in the where clause for each partition. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string * tag/value. Normally at least a "user" and "password" property - * should be included. + * should be included. "fetchsize" can be used to control the + * number of rows per fetch. * @since 1.4.0 */ def jdbc( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ca3972d62d..f77af76d2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -391,7 +391,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @param table Name of the table in the external database. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string * tag/value. Normally at least a "user" and "password" property - * should be included. + * should be included. "batchsize" can be used to control the + * number of rows per insert. * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 44cfbb9fbd..24e2c1a5fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -390,7 +390,11 @@ private[sql] class JDBCRDD( val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause" val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - val fetchSize = properties.getProperty("fetchsize", "0").toInt + val fetchSize = properties.getProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "0").toInt + require(fetchSize >= 0, + s"Invalid value `${fetchSize.toString}` for parameter " + + s"`${JdbcUtils.JDBC_BATCH_FETCH_SIZE}`. The minimum value is 0. When the value is 0, " + + "the JDBC driver ignores the value and does the estimates.") stmt.setFetchSize(fetchSize) val rs = stmt.executeQuery() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 065c8572b0..3529ee6e3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -34,6 +34,10 @@ import org.apache.spark.sql.types._ */ object JdbcUtils extends Logging { + // the property names are case sensitive + val JDBC_BATCH_FETCH_SIZE = "fetchsize" + val JDBC_BATCH_INSERT_SIZE = "batchsize" + /** * Returns a factory for creating connections to the given JDBC URL. * @@ -154,6 +158,10 @@ object JdbcUtils extends Logging { nullTypes: Array[Int], batchSize: Int, dialect: JdbcDialect): Iterator[Byte] = { + require(batchSize >= 1, + s"Invalid value `${batchSize.toString}` for parameter " + + s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.") + val conn = getConnection() var committed = false val supportsTransactions = try { @@ -275,7 +283,7 @@ object JdbcUtils extends Logging { val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(url, properties) - val batchSize = properties.getProperty("batchsize", "1000").toInt + val batchSize = properties.getProperty(JDBC_BATCH_INSERT_SIZE, "1000").toInt df.foreachPartition { iterator => savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 2d6c3974a8..6baf1b6f16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -89,7 +89,7 @@ private object PostgresDialect extends JdbcDialect { // // See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor // - if (properties.getOrElse("fetchsize", "0").toInt > 0) { + if (properties.getOrElse(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "0").toInt > 0) { connection.setAutoCommit(false) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index fd6671a39b..11e66ad080 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -24,12 +24,13 @@ import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -83,7 +84,7 @@ class JDBCSuite extends SparkFunSuite |CREATE TEMPORARY TABLE fetchtwo |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass', - | fetchSize '2') + | ${JdbcUtils.JDBC_BATCH_FETCH_SIZE} '2') """.stripMargin.replaceAll("\n", " ")) sql( @@ -348,38 +349,49 @@ class JDBCSuite extends SparkFunSuite test("Basic API") { assert(spark.read.jdbc( - urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3) + urlWithUserAndPass, "TEST.PEOPLE", new Properties()).collect().length === 3) + } + + test("Basic API with illegal FetchSize") { + val properties = new Properties() + properties.setProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "-1") + val e = intercept[SparkException] { + spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect() + }.getMessage + assert(e.contains("Invalid value `-1` for parameter `fetchsize`")) } test("Basic API with FetchSize") { - val properties = new Properties - properties.setProperty("fetchSize", "2") - assert(spark.read.jdbc( - urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3) + (0 to 4).foreach { size => + val properties = new Properties() + properties.setProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, size.toString) + assert(spark.read.jdbc( + urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3) + } } test("Partitioning via JDBCPartitioningInfo API") { assert( - spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties) + spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties()) .collect().length === 3) } test("Partitioning via list-of-where-clauses API") { val parts = Array[String]("THEID < 2", "THEID >= 2") - assert(spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties) + assert(spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) .collect().length === 3) } test("Partitioning on column that might have null values.") { assert( - spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties) + spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties()) .collect().length === 4) assert( - spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties) + spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties()) .collect().length === 4) // partitioning on a nullable quoted column assert( - spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties) + spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties()) .collect().length === 4) } @@ -391,7 +403,7 @@ class JDBCSuite extends SparkFunSuite lowerBound = 0, upperBound = 4, numPartitions = 0, - connectionProperties = new Properties + connectionProperties = new Properties() ) assert(res.count() === 8) } @@ -404,7 +416,7 @@ class JDBCSuite extends SparkFunSuite lowerBound = 1, upperBound = 5, numPartitions = 10, - connectionProperties = new Properties + connectionProperties = new Properties() ) assert(res.count() === 8) } @@ -417,7 +429,7 @@ class JDBCSuite extends SparkFunSuite lowerBound = 5, upperBound = 5, numPartitions = 4, - connectionProperties = new Properties + connectionProperties = new Properties() ) assert(res.count() === 8) } @@ -431,7 +443,7 @@ class JDBCSuite extends SparkFunSuite lowerBound = 5, upperBound = 1, numPartitions = 3, - connectionProperties = new Properties + connectionProperties = new Properties() ) }.getMessage assert(e.contains("Operation not allowed: the lower bound of partitioning column " + @@ -495,8 +507,8 @@ class JDBCSuite extends SparkFunSuite test("test DATE types") { val rows = spark.read.jdbc( - urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect() - val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) + urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect() + val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) .cache().collect() assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) assert(rows(1).getAs[java.sql.Date](1) === null) @@ -504,8 +516,8 @@ class JDBCSuite extends SparkFunSuite } test("test DATE types in cache") { - val rows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect() - spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) + val rows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect() + spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) .cache().createOrReplaceTempView("mycached_date") val cachedRows = sql("select * from mycached_date").collect() assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) @@ -514,7 +526,7 @@ class JDBCSuite extends SparkFunSuite test("test types for null value") { val rows = spark.read.jdbc( - urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect() + urlWithUserAndPass, "TEST.NULLTYPES", new Properties()).collect() assert((0 to 14).forall(i => rows(0).isNullAt(i))) } @@ -560,7 +572,7 @@ class JDBCSuite extends SparkFunSuite test("Remap types via JdbcDialects") { JdbcDialects.registerDialect(testH2Dialect) - val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties) + val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties()) assert(df.schema.filter(_.dataType != org.apache.spark.sql.types.StringType).isEmpty) val rows = df.collect() assert(rows(0).get(0).isInstanceOf[String]) @@ -694,7 +706,7 @@ class JDBCSuite extends SparkFunSuite // Regression test for bug SPARK-11788 val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543"); val date = java.sql.Date.valueOf("1995-01-01") - val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) + val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect() assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) assert(rows(0).getAs[java.sql.Timestamp](2) @@ -714,7 +726,7 @@ class JDBCSuite extends SparkFunSuite } test("test credentials in the connection url are not in the plan output") { - val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties) + val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties()) val explain = ExplainCommand(df.queryExecution.logical, extended = true) spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach { r => assert(!List("testPass", "testUser").exists(r.toString.contains)) @@ -746,7 +758,7 @@ class JDBCSuite extends SparkFunSuite urlWithUserAndPass, "TEST.PEOPLE", predicates = Array[String](jdbcPartitionWhereClause), - new Properties) + new Properties()) df.createOrReplaceTempView("tempFrame") assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index ff66f53fcf..2c6449fa68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -22,7 +22,9 @@ import java.util.Properties import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -90,10 +92,34 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { test("Basic CREATE") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties) - assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) + df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties()) + assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).count()) assert( - 2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + 2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).collect()(0).length) + } + + test("Basic CREATE with illegal batchsize") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + (-1 to 0).foreach { size => + val properties = new Properties() + properties.setProperty(JdbcUtils.JDBC_BATCH_INSERT_SIZE, size.toString) + val e = intercept[SparkException] { + df.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", properties) + }.getMessage + assert(e.contains(s"Invalid value `$size` for parameter `batchsize`")) + } + } + + test("Basic CREATE with batchsize") { + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + + (1 to 3).foreach { size => + val properties = new Properties() + properties.setProperty(JdbcUtils.JDBC_BATCH_INSERT_SIZE, size.toString) + df.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", properties) + assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).count()) + } } test("CREATE with overwrite") { @@ -101,11 +127,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) df.write.jdbc(url1, "TEST.DROPTEST", properties) - assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count) + assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count()) assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties) - assert(1 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count) + assert(1 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) } @@ -113,10 +139,10 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) - df.write.jdbc(url, "TEST.APPENDTEST", new Properties) - df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties) - assert(3 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties).count) - assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length) + df.write.jdbc(url, "TEST.APPENDTEST", new Properties()) + df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties()) + assert(3 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).count()) + assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).collect()(0).length) } test("CREATE then INSERT to truncate") { @@ -125,7 +151,7 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { df.write.jdbc(url1, "TEST.TRUNCATETEST", properties) df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties) - assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count) + assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) } @@ -133,22 +159,22 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3) - df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties) + df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties()) intercept[org.apache.spark.SparkException] { - df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties) + df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties()) } } test("INSERT to JDBC Datasource") { sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") - assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } test("INSERT to JDBC Datasource with overwrite") { sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE") - assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } } |