aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-05-20 22:23:49 -0700
committerReynold Xin <rxin@databricks.com>2015-05-20 22:23:49 -0700
commitd0eb9ffe978c663b7aa06e908cadee81767d23d1 (patch)
tree655728a2901416d5d87f6d879ee0bd5c104779fa /sql
parentddec173cba63df723cd94508121d8c06d8c153c6 (diff)
downloadspark-d0eb9ffe978c663b7aa06e908cadee81767d23d1.tar.gz
spark-d0eb9ffe978c663b7aa06e908cadee81767d23d1.tar.bz2
spark-d0eb9ffe978c663b7aa06e908cadee81767d23d1.zip
[SPARK-7746][SQL] Add FetchSize parameter for JDBC driver
JIRA: https://issues.apache.org/jira/browse/SPARK-7746 Looks like an easy to add parameter but can show significant performance improvement if the JDBC driver accepts it. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #6283 from viirya/jdbc_fetchsize and squashes the following commits: de47f94 [Liang-Chi Hsieh] Don't keep fetchSize as single parameter. b7bff2f [Liang-Chi Hsieh] Add FetchSize parameter for JDBC driver.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala33
2 files changed, 38 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index f7b19096ea..be03a237b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -211,7 +211,8 @@ private[sql] object JDBCRDD extends Logging {
fqTable,
requiredColumns,
filters,
- parts)
+ parts,
+ properties)
}
}
@@ -227,7 +228,8 @@ private[sql] class JDBCRDD(
fqTable: String,
columns: Array[String],
filters: Array[Filter],
- partitions: Array[Partition])
+ partitions: Array[Partition],
+ properties: Properties)
extends RDD[Row](sc, Nil) {
/**
@@ -356,6 +358,8 @@ private[sql] class JDBCRDD(
val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause"
val stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+ val fetchSize = properties.getProperty("fetchSize", "0").toInt
+ stmt.setFetchSize(fetchSize)
val rs = stmt.executeQuery()
val conversions = getConversions(schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index a8dddfb9b6..347f28351f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -67,7 +67,15 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
-
+
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE fetchtwo
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass',
+ | fetchSize '2')
+ """.stripMargin.replaceAll("\n", " "))
+
sql(
s"""
|CREATE TEMPORARY TABLE parts
@@ -185,6 +193,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
assert(names(2).equals("mary"))
}
+ test("SELECT first field when fetchSize is two") {
+ val names = sql("SELECT NAME FROM fetchtwo").collect().map(x => x.getString(0)).sortWith(_ < _)
+ assert(names.size === 3)
+ assert(names(0).equals("fred"))
+ assert(names(1).equals("joe 'foo' \"bar\""))
+ assert(names(2).equals("mary"))
+ }
+
test("SELECT second field") {
val ids = sql("SELECT THEID FROM foobar").collect().map(x => x.getInt(0)).sortWith(_ < _)
assert(ids.size === 3)
@@ -192,6 +208,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
assert(ids(1) === 2)
assert(ids(2) === 3)
}
+
+ test("SELECT second field when fetchSize is two") {
+ val ids = sql("SELECT THEID FROM fetchtwo").collect().map(x => x.getInt(0)).sortWith(_ < _)
+ assert(ids.size === 3)
+ assert(ids(0) === 1)
+ assert(ids(1) === 2)
+ assert(ids(2) === 3)
+ }
test("SELECT * partitioned") {
assert(sql("SELECT * FROM parts").collect().size == 3)
@@ -232,6 +256,13 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
}
+ test("Basic API with FetchSize") {
+ val properties = new Properties
+ properties.setProperty("fetchSize", "2")
+ assert(TestSQLContext.read.jdbc(
+ urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
+ }
+
test("Partitioning via JDBCPartitioningInfo API") {
assert(
TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)