aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-07-24 12:36:44 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2015-07-24 12:36:44 -0700
commit64135cbb3363e3b74dad3c0498cb9959c047d381 (patch)
treea16498a5db6e333411af9f8c67ba20e9592ca27c /core
parent9a11396113d4bb0e76e0520df4fc58e7a8ec9f69 (diff)
downloadspark-64135cbb3363e3b74dad3c0498cb9959c047d381.tar.gz
spark-64135cbb3363e3b74dad3c0498cb9959c047d381.tar.bz2
spark-64135cbb3363e3b74dad3c0498cb9959c047d381.zip
[SPARK-9067] [SQL] Close reader in NewHadoopRDD early if there is no more data
JIRA: https://issues.apache.org/jira/browse/SPARK-9067 According to the description of the JIRA ticket, calling `reader.close()` only after the task is finished will cause memory and file open limit problem since these resources are occupied even we don't need that anymore. This PR simply closes the reader early when we know there is no more data to read. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #7424 from viirya/close_reader and squashes the following commits: 3ff64e5 [Liang-Chi Hsieh] For comments. 3d20267 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader e152182 [Liang-Chi Hsieh] For comments. 5116cbe [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader 3ceb755 [Liang-Chi Hsieh] For comments. e34d98e [Liang-Chi Hsieh] For comments. 50ed729 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader 216912f [Liang-Chi Hsieh] Fix it. f429016 [Liang-Chi Hsieh] Release reader if we don't need it. a305621 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader 67569da [Liang-Chi Hsieh] Close reader early if there is no more data.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala37
1 files changed, 24 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index f827270ee6..f83a051f5d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -128,7 +128,7 @@ class NewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
- val reader = format.createRecordReader(
+ private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
@@ -141,6 +141,12 @@ class NewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
+ if (finished) {
+ // Close and release the reader here; close() will also be called when the task
+ // completes, but for tasks that read from many files, it helps to release the
+ // resources early.
+ close()
+ }
havePair = !finished
}
!finished
@@ -159,18 +165,23 @@ class NewHadoopRDD[K, V](
private def close() {
try {
- reader.close()
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
- } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
- split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
- // If we can't get the bytes read from the FS stats, fall back to the split size,
- // which may be inaccurate.
- try {
- inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
- } catch {
- case e: java.io.IOException =>
- logWarning("Unable to get input size to set InputMetrics for task", e)
+ if (reader != null) {
+ // Close reader and release it
+ reader.close()
+ reader = null
+
+ if (bytesReadCallback.isDefined) {
+ inputMetrics.updateBytesRead()
+ } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+ split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+ // If we can't get the bytes read from the FS stats, fall back to the split size,
+ // which may be inaccurate.
+ try {
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
}
}
} catch {