diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-02-09 16:31:00 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-02-09 16:31:00 -0800 |
commit | fae830d15846f7ffdfe49eeb45e175a3cdd2c670 (patch) | |
tree | f75dca9c52c908321c1592d838e4ec3874f4181a /core | |
parent | 7fe4fe630a3fc9755ebd0325bb595d76381633e8 (diff) | |
download | spark-fae830d15846f7ffdfe49eeb45e175a3cdd2c670.tar.gz spark-fae830d15846f7ffdfe49eeb45e175a3cdd2c670.tar.bz2 spark-fae830d15846f7ffdfe49eeb45e175a3cdd2c670.zip |
[SPARK-13245][CORE] Call shuffleMetrics methods only in one thread for ShuffleBlockFetcherIterator
Call shuffleMetrics's incRemoteBytesRead and incRemoteBlocksFetched when polling FetchResult from `results` so as to always use shuffleMetrics in one thread.
Also fix a race condition that could cause memory leak.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #11138 from zsxwing/SPARK-13245.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index c6065df64a..c368a39e62 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.InputStream import java.util.concurrent.LinkedBlockingQueue +import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} import scala.util.control.NonFatal @@ -107,7 +108,8 @@ final class ShuffleBlockFetcherIterator( * Whether the iterator is still active. If isZombie is true, the callback interface will no * longer place fetched blocks into [[results]]. */ - @volatile private[this] var isZombie = false + @GuardedBy("this") + private[this] var isZombie = false initialize() @@ -126,14 +128,22 @@ final class ShuffleBlockFetcherIterator( * Mark the iterator as zombie, and release all buffers that haven't been deserialized yet. */ private[this] def cleanup() { - isZombie = true + synchronized { + isZombie = true + } releaseCurrentResultBuffer() // Release buffers in the results queue val iter = results.iterator() while (iter.hasNext) { val result = iter.next() result match { - case SuccessFetchResult(_, _, _, buf) => buf.release() + case SuccessFetchResult(_, address, _, buf) => { + if (address != blockManager.blockManagerId) { + shuffleMetrics.incRemoteBytesRead(buf.size) + shuffleMetrics.incRemoteBlocksFetched(1) + } + buf.release() + } case _ => } } @@ -154,13 +164,13 @@ final class ShuffleBlockFetcherIterator( override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { // Only add the buffer to results queue if the iterator is not zombie, // i.e. cleanup() has not been called yet. - if (!isZombie) { - // Increment the ref count because we need to pass this to a different thread. - // This needs to be released after use. - buf.retain() - results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf)) - shuffleMetrics.incRemoteBytesRead(buf.size) - shuffleMetrics.incRemoteBlocksFetched(1) + ShuffleBlockFetcherIterator.this.synchronized { + if (!isZombie) { + // Increment the ref count because we need to pass this to a different thread. + // This needs to be released after use. + buf.retain() + results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf)) + } } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } @@ -289,7 +299,13 @@ final class ShuffleBlockFetcherIterator( shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) result match { - case SuccessFetchResult(_, _, size, _) => bytesInFlight -= size + case SuccessFetchResult(_, address, size, buf) => { + if (address != blockManager.blockManagerId) { + shuffleMetrics.incRemoteBytesRead(buf.size) + shuffleMetrics.incRemoteBlocksFetched(1) + } + bytesInFlight -= size + } case _ => } // Send fetch requests up to maxBytesInFlight |