aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorkoeninger <ckoeninger@digby.com>2013-04-22 21:12:52 -0500
committerkoeninger <ckoeninger@digby.com>2013-04-22 21:12:52 -0500
commitdfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5 (patch)
treef62994743e2f3bd08420bf4e15e9944863232418 /core
parentb2a3f24dde7a69587a5fea50d3e1e4e8f02a2dc3 (diff)
downloadspark-dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5.tar.gz
spark-dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5.tar.bz2
spark-dfac0aa5c2e5f46955b008b1e8d9ee5d8069efa5.zip
prevent mysql driver from pulling entire resultset into memory. explicitly close resultset and statement.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/JdbcRDD.scala25
1 files changed, 20 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala
index c8a5d76012..4c3054465c 100644
--- a/core/src/main/scala/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala
@@ -15,7 +15,7 @@ import spark.util.NextIterator
@param lowerBound the minimum value of the first placeholder
@param upperBound the maximum value of the second placeholder
The lower and upper bounds are inclusive.
- @param numPartitions the amount of parallelism.
+ @param numPartitions the number of partitions.
Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
the query would be executed twice, once with (1, 10) and once with (11, 20)
@param mapRow a function from a ResultSet to a single row of the desired result type(s).
@@ -40,10 +40,15 @@ class JdbcRDD[T: ClassManifest](
toArray
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
+ context.addOnCompleteCallback{ () => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
- context.addOnCompleteCallback{ () => closeIfNeeded() }
- val stmt = conn.prepareStatement(sql)
+ val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+ // force mysql driver to stream rather than pull entire resultset into memory
+ if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
+ stmt.setFetchSize(Integer.MIN_VALUE)
+ logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
+ }
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()
@@ -59,8 +64,18 @@ class JdbcRDD[T: ClassManifest](
override def close() {
try {
- logInfo("closing connection")
- conn.close()
+ if (null != rs && ! rs.isClosed()) rs.close()
+ } catch {
+ case e: Exception => logWarning("Exception closing resultset", e)
+ }
+ try {
+ if (null != stmt && ! stmt.isClosed()) stmt.close()
+ } catch {
+ case e: Exception => logWarning("Exception closing statement", e)
+ }
+ try {
+ if (null != conn && ! stmt.isClosed()) conn.close()
+ logInfo("closed connection")
} catch {
case e: Exception => logWarning("Exception closing connection", e)
}