aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrince J Wesley <princejohnwesley@gmail.com>2016-08-07 12:18:11 +0100
committerSean Owen <sowen@cloudera.com>2016-08-07 12:18:11 +0100
commitbdfab9f942dcad7c1f3de9b6df5c01dee2392055 (patch)
tree24e18856afa0d25f3480aec6c367a30e63b12e33
parent6c1ecb191bc086290e33d56b6a5706d962e84a3a (diff)
downloadspark-bdfab9f942dcad7c1f3de9b6df5c01dee2392055.tar.gz
spark-bdfab9f942dcad7c1f3de9b6df5c01dee2392055.tar.bz2
spark-bdfab9f942dcad7c1f3de9b6df5c01dee2392055.zip
[SPARK-16909][SPARK CORE] Streaming for postgreSQL JDBC driver
As per the postgreSQL JDBC driver [implementation](https://github.com/pgjdbc/pgjdbc/blob/ab2a6d89081fc2c1fdb2a8600f413db33669022c/pgjdbc/src/main/java/org/postgresql/PGProperty.java#L99), the default record fetch size is 0(which means, it caches all record) This fix enforces default record fetch size as 10 to enable streaming of data. Author: Prince J Wesley <princejohnwesley@gmail.com> Closes #14502 from princejwesley/spark-postgres.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 2f42916439..0970b98071 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -79,14 +79,20 @@ class JdbcRDD[T: ClassTag](
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
- // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
- // rather than pulling entire resultset into memory.
- // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
- if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
+ val url = conn.getMetaData.getURL
+ if (url.startsWith("jdbc:mysql:")) {
+ // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
+ // streaming results, rather than pulling entire resultset into memory.
+ // See the below URL
+ // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+
stmt.setFetchSize(Integer.MIN_VALUE)
- logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
+ } else {
+ stmt.setFetchSize(100)
}
+ logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")
+
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()