diff options
author | Prince J Wesley <princejohnwesley@gmail.com> | 2016-08-07 12:18:11 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-08-07 12:18:11 +0100 |
commit | bdfab9f942dcad7c1f3de9b6df5c01dee2392055 (patch) | |
tree | 24e18856afa0d25f3480aec6c367a30e63b12e33 /core/src/main | |
parent | 6c1ecb191bc086290e33d56b6a5706d962e84a3a (diff) | |
download | spark-bdfab9f942dcad7c1f3de9b6df5c01dee2392055.tar.gz spark-bdfab9f942dcad7c1f3de9b6df5c01dee2392055.tar.bz2 spark-bdfab9f942dcad7c1f3de9b6df5c01dee2392055.zip |
[SPARK-16909][SPARK CORE] Streaming for postgreSQL JDBC driver
As per the postgreSQL JDBC driver [implementation](https://github.com/pgjdbc/pgjdbc/blob/ab2a6d89081fc2c1fdb2a8600f413db33669022c/pgjdbc/src/main/java/org/postgresql/PGProperty.java#L99), the default record fetch size is 0(which means, it caches all record)
This fix enforces default record fetch size as 10 to enable streaming of data.
Author: Prince J Wesley <princejohnwesley@gmail.com>
Closes #14502 from princejwesley/spark-postgres.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 2f42916439..0970b98071 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -79,14 +79,20 @@ class JdbcRDD[T: ClassTag]( val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, - // rather than pulling entire resultset into memory. - // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html - if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { + val url = conn.getMetaData.getURL + if (url.startsWith("jdbc:mysql:")) { + // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force + // streaming results, rather than pulling entire resultset into memory. + // See the below URL + // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html + stmt.setFetchSize(Integer.MIN_VALUE) - logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") + } else { + stmt.setFetchSize(100) } + logInfo(s"statement fetch size set to: ${stmt.getFetchSize}") + stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery() |