diff options
author | koeninger <ckoeninger@digby.com> | 2013-04-22 21:12:52 -0500 |
---|---|---|
committer | koeninger <ckoeninger@digby.com> | 2013-04-22 21:12:52 -0500 |
commit | dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5 (patch) | |
tree | f62994743e2f3bd08420bf4e15e9944863232418 /core | |
parent | b2a3f24dde7a69587a5fea50d3e1e4e8f02a2dc3 (diff) | |
download | spark-dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5.tar.gz spark-dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5.tar.bz2 spark-dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5.zip |
prevent mysql driver from pulling entire resultset into memory. explicitly close resultset and statement.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/JdbcRDD.scala | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala index c8a5d76012..4c3054465c 100644 --- a/core/src/main/scala/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala @@ -15,7 +15,7 @@ import spark.util.NextIterator @param lowerBound the minimum value of the first placeholder @param upperBound the maximum value of the second placeholder The lower and upper bounds are inclusive. - @param numPartitions the amount of parallelism. + @param numPartitions the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20) @param mapRow a function from a ResultSet to a single row of the desired result type(s). @@ -40,10 +40,15 @@ class JdbcRDD[T: ClassManifest]( toArray override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { + context.addOnCompleteCallback{ () => closeIfNeeded() } val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() - context.addOnCompleteCallback{ () => closeIfNeeded() } - val stmt = conn.prepareStatement(sql) + val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + // force mysql driver to stream rather than pull entire resultset into memory + if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { + stmt.setFetchSize(Integer.MIN_VALUE) + logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") + } stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery() @@ -59,8 +64,18 @@ class JdbcRDD[T: ClassManifest]( override def close() { try { - logInfo("closing connection") - conn.close() + if (null != rs && ! rs.isClosed()) rs.close() + } catch { + case e: Exception => logWarning("Exception closing resultset", e) + } + try { + if (null != stmt && ! stmt.isClosed()) stmt.close() + } catch { + case e: Exception => logWarning("Exception closing statement", e) + } + try { + if (null != conn && ! stmt.isClosed()) conn.close() + logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) } |