aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-09-12 11:30:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-12 11:30:06 -0700
commit1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4 (patch)
treecdb3dea3db732a647977adc1cb9ddc86b8121840 /core/src/main/scala/org
parent8087ecf8daad1587d0ce9040991b14320628a65e (diff)
downloadspark-1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4.tar.gz
spark-1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4.tar.bz2
spark-1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4.zip
[SPARK-17503][CORE] Fix memory leak in Memory store when unable to cache the whole RDD in memory
## What changes were proposed in this pull request? MemoryStore may throw OutOfMemoryError when trying to cache a super big RDD that cannot fit in memory. ``` scala> sc.parallelize(1 to 1000000000, 100).map(x => new Array[Long](1000)).cache().count() java.lang.OutOfMemoryError: Java heap space at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24) at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232) at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` Spark MemoryStore uses SizeTrackingVector as a temporary unrolling buffer to store all input values that it has read so far before transferring the values to storage memory cache. The problem is that when the input RDD is too big for caching in memory, the temporary unrolling memory SizeTrackingVector is not garbage collected in time. As SizeTrackingVector can occupy all available storage memory, it may cause the executor JVM to run out of memory quickly. More info can be found at https://issues.apache.org/jira/browse/SPARK-17503 ## How was this patch tested? Unit test and manual test. ### Before change Heap memory consumption <img width="702" alt="screen shot 2016-09-12 at 4 16 15 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429524/60d73a26-7906-11e6-9768-6f286f5c58c8.png"> Heap dump <img width="1402" alt="screen shot 2016-09-12 at 4 34 19 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429577/cbc1ef20-7906-11e6-847b-b5903f450b3b.png"> ### After change Heap memory consumption <img width="706" alt="screen shot 2016-09-12 at 4 29 10 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429503/4abe9342-7906-11e6-844a-b2f815072624.png"> Author: Sean Zhong <seanzhong@databricks.com> Closes #15056 from clockfly/memory_store_leak.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala40
1 files changed, 26 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index d220ab51d1..1a3bf2bb67 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -663,31 +663,43 @@ private[spark] class MemoryStore(
private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
unrollMemory: Long,
- unrolled: Iterator[T],
+ private[this] var unrolled: Iterator[T],
rest: Iterator[T])
extends Iterator[T] {
- private[this] var unrolledIteratorIsConsumed: Boolean = false
- private[this] var iter: Iterator[T] = {
- val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
- unrolledIteratorIsConsumed = true
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- })
- completionIterator ++ rest
+ private def releaseUnrollMemory(): Unit = {
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
+ // SPARK-17503: Garbage collects the unrolling memory before the life end of
+ // PartiallyUnrolledIterator.
+ unrolled = null
}
- override def hasNext: Boolean = iter.hasNext
- override def next(): T = iter.next()
+ override def hasNext: Boolean = {
+ if (unrolled == null) {
+ rest.hasNext
+ } else if (!unrolled.hasNext) {
+ releaseUnrollMemory()
+ rest.hasNext
+ } else {
+ true
+ }
+ }
+
+ override def next(): T = {
+ if (unrolled == null) {
+ rest.next()
+ } else {
+ unrolled.next()
+ }
+ }
/**
* Called to dispose of this iterator and free its memory.
*/
def close(): Unit = {
- if (!unrolledIteratorIsConsumed) {
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- unrolledIteratorIsConsumed = true
+ if (unrolled != null) {
+ releaseUnrollMemory()
}
- iter = null
}
}