From bdfab9f942dcad7c1f3de9b6df5c01dee2392055 Mon Sep 17 00:00:00 2001 From: Prince J Wesley Date: Sun, 7 Aug 2016 12:18:11 +0100 Subject: [SPARK-16909][SPARK CORE] Streaming for postgreSQL JDBC driver As per the postgreSQL JDBC driver [implementation](https://github.com/pgjdbc/pgjdbc/blob/ab2a6d89081fc2c1fdb2a8600f413db33669022c/pgjdbc/src/main/java/org/postgresql/PGProperty.java#L99), the default record fetch size is 0(which means, it caches all record) This fix enforces default record fetch size as 10 to enable streaming of data. Author: Prince J Wesley Closes #14502 from princejwesley/spark-postgres. --- core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 2f42916439..0970b98071 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -79,14 +79,20 @@ class JdbcRDD[T: ClassTag]( val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, - // rather than pulling entire resultset into memory. - // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html - if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { + val url = conn.getMetaData.getURL + if (url.startsWith("jdbc:mysql:")) { + // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force + // streaming results, rather than pulling entire resultset into memory. + // See the below URL + // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html + stmt.setFetchSize(Integer.MIN_VALUE) - logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") + } else { + stmt.setFetchSize(100) } + logInfo(s"statement fetch size set to: ${stmt.getFetchSize}") + stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery() -- cgit v1.2.3