aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 2f42916439..0970b98071 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -79,14 +79,20 @@ class JdbcRDD[T: ClassTag](
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
- // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
- // rather than pulling entire resultset into memory.
- // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
- if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
+ val url = conn.getMetaData.getURL
+ if (url.startsWith("jdbc:mysql:")) {
+ // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
+ // streaming results, rather than pulling entire resultset into memory.
+ // See the below URL
+ // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+
stmt.setFetchSize(Integer.MIN_VALUE)
- logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
+ } else {
+ stmt.setFetchSize(100)
}
+ logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")
+
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()