aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala40
1 files changed, 26 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index d220ab51d1..1a3bf2bb67 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -663,31 +663,43 @@ private[spark] class MemoryStore(
private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
unrollMemory: Long,
- unrolled: Iterator[T],
+ private[this] var unrolled: Iterator[T],
rest: Iterator[T])
extends Iterator[T] {
- private[this] var unrolledIteratorIsConsumed: Boolean = false
- private[this] var iter: Iterator[T] = {
- val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
- unrolledIteratorIsConsumed = true
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- })
- completionIterator ++ rest
+ private def releaseUnrollMemory(): Unit = {
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
+ // SPARK-17503: Garbage collects the unrolling memory before the life end of
+ // PartiallyUnrolledIterator.
+ unrolled = null
}
- override def hasNext: Boolean = iter.hasNext
- override def next(): T = iter.next()
+ override def hasNext: Boolean = {
+ if (unrolled == null) {
+ rest.hasNext
+ } else if (!unrolled.hasNext) {
+ releaseUnrollMemory()
+ rest.hasNext
+ } else {
+ true
+ }
+ }
+
+ override def next(): T = {
+ if (unrolled == null) {
+ rest.next()
+ } else {
+ unrolled.next()
+ }
+ }
/**
* Called to dispose of this iterator and free its memory.
*/
def close(): Unit = {
- if (!unrolledIteratorIsConsumed) {
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- unrolledIteratorIsConsumed = true
+ if (unrolled != null) {
+ releaseUnrollMemory()
}
- iter = null
}
}