aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-23 15:38:41 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-23 15:38:41 -0800
commita848f0ba84e37fd95d0f47863ec68326e3296b33 (patch)
tree5e3c630e3e269e8c1132133617bda59fc5461bc4
parent1311448ea8da38f9998f1ace79968af6e47d0278 (diff)
downloadspark-a848f0ba84e37fd95d0f47863ec68326e3296b33.tar.gz
spark-a848f0ba84e37fd95d0f47863ec68326e3296b33.tar.bz2
spark-a848f0ba84e37fd95d0f47863ec68326e3296b33.zip
[SPARK-18991][CORE] Change ContextCleaner.referenceBuffer to use ConcurrentHashMap to make it faster
## What changes were proposed in this pull request? The time complexity of ConcurrentHashMap's `remove` is O(1). Changing ContextCleaner.referenceBuffer's type from `ConcurrentLinkedQueue` to `ConcurrentHashMap's` will make the removal much faster. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16390 from zsxwing/SPARK-18991.
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala18
1 files changed, 12 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index af913454fc..4d884dec07 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -18,7 +18,8 @@
package org.apache.spark
import java.lang.ref.{ReferenceQueue, WeakReference}
-import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}
+import java.util.Collections
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -58,7 +59,12 @@ private class CleanupTaskWeakReference(
*/
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
- private val referenceBuffer = new ConcurrentLinkedQueue[CleanupTaskWeakReference]()
+ /**
+ * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
+ * have not been handled by the reference queue.
+ */
+ private val referenceBuffer =
+ Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
private val referenceQueue = new ReferenceQueue[AnyRef]
@@ -176,10 +182,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
- reference.map(_.task).foreach { task =>
- logDebug("Got cleaning task " + task)
- referenceBuffer.remove(reference.get)
- task match {
+ reference.foreach { ref =>
+ logDebug("Got cleaning task " + ref.task)
+ referenceBuffer.remove(ref)
+ ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>