aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-02-09 16:31:00 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-02-09 16:31:00 -0800
commitfae830d15846f7ffdfe49eeb45e175a3cdd2c670 (patch)
treef75dca9c52c908321c1592d838e4ec3874f4181a
parent7fe4fe630a3fc9755ebd0325bb595d76381633e8 (diff)
downloadspark-fae830d15846f7ffdfe49eeb45e175a3cdd2c670.tar.gz
spark-fae830d15846f7ffdfe49eeb45e175a3cdd2c670.tar.bz2
spark-fae830d15846f7ffdfe49eeb45e175a3cdd2c670.zip
[SPARK-13245][CORE] Call shuffleMetrics methods only in one thread for ShuffleBlockFetcherIterator
Call shuffleMetrics's incRemoteBytesRead and incRemoteBlocksFetched when polling FetchResult from `results` so as to always use shuffleMetrics in one thread. Also fix a race condition that could cause memory leak. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11138 from zsxwing/SPARK-13245.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala38
1 files changed, 27 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index c6065df64a..c368a39e62 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import java.io.InputStream
import java.util.concurrent.LinkedBlockingQueue
+import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import scala.util.control.NonFatal
@@ -107,7 +108,8 @@ final class ShuffleBlockFetcherIterator(
* Whether the iterator is still active. If isZombie is true, the callback interface will no
* longer place fetched blocks into [[results]].
*/
- @volatile private[this] var isZombie = false
+ @GuardedBy("this")
+ private[this] var isZombie = false
initialize()
@@ -126,14 +128,22 @@ final class ShuffleBlockFetcherIterator(
* Mark the iterator as zombie, and release all buffers that haven't been deserialized yet.
*/
private[this] def cleanup() {
- isZombie = true
+ synchronized {
+ isZombie = true
+ }
releaseCurrentResultBuffer()
// Release buffers in the results queue
val iter = results.iterator()
while (iter.hasNext) {
val result = iter.next()
result match {
- case SuccessFetchResult(_, _, _, buf) => buf.release()
+ case SuccessFetchResult(_, address, _, buf) => {
+ if (address != blockManager.blockManagerId) {
+ shuffleMetrics.incRemoteBytesRead(buf.size)
+ shuffleMetrics.incRemoteBlocksFetched(1)
+ }
+ buf.release()
+ }
case _ =>
}
}
@@ -154,13 +164,13 @@ final class ShuffleBlockFetcherIterator(
override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
// Only add the buffer to results queue if the iterator is not zombie,
// i.e. cleanup() has not been called yet.
- if (!isZombie) {
- // Increment the ref count because we need to pass this to a different thread.
- // This needs to be released after use.
- buf.retain()
- results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))
- shuffleMetrics.incRemoteBytesRead(buf.size)
- shuffleMetrics.incRemoteBlocksFetched(1)
+ ShuffleBlockFetcherIterator.this.synchronized {
+ if (!isZombie) {
+ // Increment the ref count because we need to pass this to a different thread.
+ // This needs to be released after use.
+ buf.retain()
+ results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))
+ }
}
logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}
@@ -289,7 +299,13 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
result match {
- case SuccessFetchResult(_, _, size, _) => bytesInFlight -= size
+ case SuccessFetchResult(_, address, size, buf) => {
+ if (address != blockManager.blockManagerId) {
+ shuffleMetrics.incRemoteBytesRead(buf.size)
+ shuffleMetrics.incRemoteBlocksFetched(1)
+ }
+ bytesInFlight -= size
+ }
case _ =>
}
// Send fetch requests up to maxBytesInFlight