From e70be6987b2cd705bef7b97917cdee00e3e80aef Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 20 May 2015 22:23:49 -0700 Subject: [SPARK-7746][SQL] Add FetchSize parameter for JDBC driver JIRA: https://issues.apache.org/jira/browse/SPARK-7746 Looks like an easy to add parameter but can show significant performance improvement if the JDBC driver accepts it. Author: Liang-Chi Hsieh Closes #6283 from viirya/jdbc_fetchsize and squashes the following commits: de47f94 [Liang-Chi Hsieh] Don't keep fetchSize as single parameter. b7bff2f [Liang-Chi Hsieh] Add FetchSize parameter for JDBC driver. (cherry picked from commit d0eb9ffe978c663b7aa06e908cadee81767d23d1) Signed-off-by: Reynold Xin --- .../scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 8 ++++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 33 +++++++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index f7b19096ea..be03a237b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -211,7 +211,8 @@ private[sql] object JDBCRDD extends Logging { fqTable, requiredColumns, filters, - parts) + parts, + properties) } } @@ -227,7 +228,8 @@ private[sql] class JDBCRDD( fqTable: String, columns: Array[String], filters: Array[Filter], - partitions: Array[Partition]) + partitions: Array[Partition], + properties: Properties) extends RDD[Row](sc, Nil) { /** @@ -356,6 +358,8 @@ private[sql] class JDBCRDD( val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause" val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + val fetchSize = properties.getProperty("fetchSize", "0").toInt + stmt.setFetchSize(fetchSize) val rs = stmt.executeQuery() val conversions = getConversions(schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a8dddfb9b6..347f28351f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -67,7 +67,15 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) - + + sql( + s""" + |CREATE TEMPORARY TABLE fetchtwo + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass', + | fetchSize '2') + """.stripMargin.replaceAll("\n", " ")) + sql( s""" |CREATE TEMPORARY TABLE parts @@ -185,6 +193,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { assert(names(2).equals("mary")) } + test("SELECT first field when fetchSize is two") { + val names = sql("SELECT NAME FROM fetchtwo").collect().map(x => x.getString(0)).sortWith(_ < _) + assert(names.size === 3) + assert(names(0).equals("fred")) + assert(names(1).equals("joe 'foo' \"bar\"")) + assert(names(2).equals("mary")) + } + test("SELECT second field") { val ids = sql("SELECT THEID FROM foobar").collect().map(x => x.getInt(0)).sortWith(_ < _) assert(ids.size === 3) @@ -192,6 +208,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { assert(ids(1) === 2) assert(ids(2) === 3) } + + test("SELECT second field when fetchSize is two") { + val ids = sql("SELECT THEID FROM fetchtwo").collect().map(x => x.getInt(0)).sortWith(_ < _) + assert(ids.size === 3) + assert(ids(0) === 1) + assert(ids(1) === 2) + assert(ids(2) === 3) + } test("SELECT * partitioned") { assert(sql("SELECT * FROM parts").collect().size == 3) @@ -232,6 +256,13 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3) } + test("Basic API with FetchSize") { + val properties = new Properties + properties.setProperty("fetchSize", "2") + assert(TestSQLContext.read.jdbc( + urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3) + } + test("Partitioning via JDBCPartitioningInfo API") { assert( TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties) -- cgit v1.2.3