diff options
author | Liang-Chi Hsieh <viirya@gmail.com> | 2015-05-20 22:23:49 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-05-20 22:24:04 -0700 |
commit | e70be6987b2cd705bef7b97917cdee00e3e80aef (patch) | |
tree | b9767c314825f7968df96d757d00e61a79d7c147 | |
parent | 9711e9bf1d8c88cba2e302637be4ef3564457741 (diff) | |
download | spark-e70be6987b2cd705bef7b97917cdee00e3e80aef.tar.gz spark-e70be6987b2cd705bef7b97917cdee00e3e80aef.tar.bz2 spark-e70be6987b2cd705bef7b97917cdee00e3e80aef.zip |
[SPARK-7746][SQL] Add FetchSize parameter for JDBC driver
JIRA: https://issues.apache.org/jira/browse/SPARK-7746
Looks like an easy to add parameter but can show significant performance improvement if the JDBC driver accepts it.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #6283 from viirya/jdbc_fetchsize and squashes the following commits:
de47f94 [Liang-Chi Hsieh] Don't keep fetchSize as single parameter.
b7bff2f [Liang-Chi Hsieh] Add FetchSize parameter for JDBC driver.
(cherry picked from commit d0eb9ffe978c663b7aa06e908cadee81767d23d1)
Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 8 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 33 |
2 files changed, 38 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index f7b19096ea..be03a237b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -211,7 +211,8 @@ private[sql] object JDBCRDD extends Logging { fqTable, requiredColumns, filters, - parts) + parts, + properties) } } @@ -227,7 +228,8 @@ private[sql] class JDBCRDD( fqTable: String, columns: Array[String], filters: Array[Filter], - partitions: Array[Partition]) + partitions: Array[Partition], + properties: Properties) extends RDD[Row](sc, Nil) { /** @@ -356,6 +358,8 @@ private[sql] class JDBCRDD( val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause" val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + val fetchSize = properties.getProperty("fetchSize", "0").toInt + stmt.setFetchSize(fetchSize) val rs = stmt.executeQuery() val conversions = getConversions(schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a8dddfb9b6..347f28351f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -67,7 +67,15 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) - + + sql( + s""" + |CREATE TEMPORARY TABLE fetchtwo + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass', + | fetchSize '2') + """.stripMargin.replaceAll("\n", " ")) + sql( s""" |CREATE TEMPORARY TABLE parts @@ -185,6 +193,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { assert(names(2).equals("mary")) } + test("SELECT first field when fetchSize is two") { + val names = sql("SELECT NAME FROM fetchtwo").collect().map(x => x.getString(0)).sortWith(_ < _) + assert(names.size === 3) + assert(names(0).equals("fred")) + assert(names(1).equals("joe 'foo' \"bar\"")) + assert(names(2).equals("mary")) + } + test("SELECT second field") { val ids = sql("SELECT THEID FROM foobar").collect().map(x => x.getInt(0)).sortWith(_ < _) assert(ids.size === 3) @@ -192,6 +208,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { assert(ids(1) === 2) assert(ids(2) === 3) } + + test("SELECT second field when fetchSize is two") { + val ids = sql("SELECT THEID FROM fetchtwo").collect().map(x => x.getInt(0)).sortWith(_ < _) + assert(ids.size === 3) + assert(ids(0) === 1) + assert(ids(1) === 2) + assert(ids(2) === 3) + } test("SELECT * partitioned") { assert(sql("SELECT * FROM parts").collect().size == 3) @@ -232,6 +256,13 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3) } + test("Basic API with FetchSize") { + val properties = new Properties + properties.setProperty("fetchSize", "2") + assert(TestSQLContext.read.jdbc( + urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3) + } + test("Partitioning via JDBCPartitioningInfo API") { assert( TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties) |