aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-09-12 11:30:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-12 11:30:06 -0700
commit1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4 (patch)
treecdb3dea3db732a647977adc1cb9ddc86b8121840 /core/src
parent8087ecf8daad1587d0ce9040991b14320628a65e (diff)
downloadspark-1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4.tar.gz
spark-1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4.tar.bz2
spark-1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4.zip
[SPARK-17503][CORE] Fix memory leak in Memory store when unable to cache the whole RDD in memory
## What changes were proposed in this pull request? MemoryStore may throw OutOfMemoryError when trying to cache a super big RDD that cannot fit in memory. ``` scala> sc.parallelize(1 to 1000000000, 100).map(x => new Array[Long](1000)).cache().count() java.lang.OutOfMemoryError: Java heap space at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24) at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232) at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` Spark MemoryStore uses SizeTrackingVector as a temporary unrolling buffer to store all input values that it has read so far before transferring the values to storage memory cache. The problem is that when the input RDD is too big for caching in memory, the temporary unrolling memory SizeTrackingVector is not garbage collected in time. As SizeTrackingVector can occupy all available storage memory, it may cause the executor JVM to run out of memory quickly. More info can be found at https://issues.apache.org/jira/browse/SPARK-17503 ## How was this patch tested? Unit test and manual test. ### Before change Heap memory consumption <img width="702" alt="screen shot 2016-09-12 at 4 16 15 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429524/60d73a26-7906-11e6-9768-6f286f5c58c8.png"> Heap dump <img width="1402" alt="screen shot 2016-09-12 at 4 34 19 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429577/cbc1ef20-7906-11e6-847b-b5903f450b3b.png"> ### After change Heap memory consumption <img width="706" alt="screen shot 2016-09-12 at 4 29 10 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429503/4abe9342-7906-11e6-844a-b2f815072624.png"> Author: Sean Zhong <seanzhong@databricks.com> Closes #15056 from clockfly/memory_store_leak.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala40
-rw-r--r--core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala61
2 files changed, 87 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index d220ab51d1..1a3bf2bb67 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -663,31 +663,43 @@ private[spark] class MemoryStore(
private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
unrollMemory: Long,
- unrolled: Iterator[T],
+ private[this] var unrolled: Iterator[T],
rest: Iterator[T])
extends Iterator[T] {
- private[this] var unrolledIteratorIsConsumed: Boolean = false
- private[this] var iter: Iterator[T] = {
- val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
- unrolledIteratorIsConsumed = true
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- })
- completionIterator ++ rest
+ private def releaseUnrollMemory(): Unit = {
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
+ // SPARK-17503: Garbage collects the unrolling memory before the life end of
+ // PartiallyUnrolledIterator.
+ unrolled = null
}
- override def hasNext: Boolean = iter.hasNext
- override def next(): T = iter.next()
+ override def hasNext: Boolean = {
+ if (unrolled == null) {
+ rest.hasNext
+ } else if (!unrolled.hasNext) {
+ releaseUnrollMemory()
+ rest.hasNext
+ } else {
+ true
+ }
+ }
+
+ override def next(): T = {
+ if (unrolled == null) {
+ rest.next()
+ } else {
+ unrolled.next()
+ }
+ }
/**
* Called to dispose of this iterator and free its memory.
*/
def close(): Unit = {
- if (!unrolledIteratorIsConsumed) {
- memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
- unrolledIteratorIsConsumed = true
+ if (unrolled != null) {
+ releaseUnrollMemory()
}
- iter = null
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala
new file mode 100644
index 0000000000..02c2331dc3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.mockito.Matchers
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.memory.MemoryMode.ON_HEAP
+import org.apache.spark.storage.memory.{MemoryStore, PartiallyUnrolledIterator}
+
+class PartiallyUnrolledIteratorSuite extends SparkFunSuite with MockitoSugar {
+ test("join two iterators") {
+ val unrollSize = 1000
+ val unroll = (0 until unrollSize).iterator
+ val restSize = 500
+ val rest = (unrollSize until restSize + unrollSize).iterator
+
+ val memoryStore = mock[MemoryStore]
+ val joinIterator = new PartiallyUnrolledIterator(memoryStore, unrollSize, unroll, rest)
+
+ // Firstly iterate over unrolling memory iterator
+ (0 until unrollSize).foreach { value =>
+ assert(joinIterator.hasNext)
+ assert(joinIterator.hasNext)
+ assert(joinIterator.next() == value)
+ }
+
+ joinIterator.hasNext
+ joinIterator.hasNext
+ verify(memoryStore, times(1))
+ .releaseUnrollMemoryForThisTask(Matchers.eq(ON_HEAP), Matchers.eq(unrollSize.toLong))
+
+ // Secondly, iterate over rest iterator
+ (unrollSize until unrollSize + restSize).foreach { value =>
+ assert(joinIterator.hasNext)
+ assert(joinIterator.hasNext)
+ assert(joinIterator.next() == value)
+ }
+
+ joinIterator.close()
+ // MemoryMode.releaseUnrollMemoryForThisTask is called only once
+ verifyNoMoreInteractions(memoryStore)
+ }
+}