diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-23 15:38:41 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-23 15:38:41 -0800 |
commit | a848f0ba84e37fd95d0f47863ec68326e3296b33 (patch) | |
tree | 5e3c630e3e269e8c1132133617bda59fc5461bc4 | |
parent | 1311448ea8da38f9998f1ace79968af6e47d0278 (diff) | |
download | spark-a848f0ba84e37fd95d0f47863ec68326e3296b33.tar.gz spark-a848f0ba84e37fd95d0f47863ec68326e3296b33.tar.bz2 spark-a848f0ba84e37fd95d0f47863ec68326e3296b33.zip |
[SPARK-18991][CORE] Change ContextCleaner.referenceBuffer to use ConcurrentHashMap to make it faster
## What changes were proposed in this pull request?
The time complexity of ConcurrentHashMap's `remove` is O(1). Changing ContextCleaner.referenceBuffer's type from `ConcurrentLinkedQueue` to `ConcurrentHashMap's` will make the removal much faster.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16390 from zsxwing/SPARK-18991.
-rw-r--r-- | core/src/main/scala/org/apache/spark/ContextCleaner.scala | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index af913454fc..4d884dec07 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -18,7 +18,8 @@ package org.apache.spark import java.lang.ref.{ReferenceQueue, WeakReference} -import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} +import java.util.Collections +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ @@ -58,7 +59,12 @@ private class CleanupTaskWeakReference( */ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { - private val referenceBuffer = new ConcurrentLinkedQueue[CleanupTaskWeakReference]() + /** + * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they + * have not been handled by the reference queue. + */ + private val referenceBuffer = + Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap) private val referenceQueue = new ReferenceQueue[AnyRef] @@ -176,10 +182,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { .map(_.asInstanceOf[CleanupTaskWeakReference]) // Synchronize here to avoid being interrupted on stop() synchronized { - reference.map(_.task).foreach { task => - logDebug("Got cleaning task " + task) - referenceBuffer.remove(reference.get) - task match { + reference.foreach { ref => + logDebug("Got cleaning task " + ref.task) + referenceBuffer.remove(ref) + ref.task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => |