From dcfaeadea7e0013af98de626dec36306325f73e7 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sat, 30 Apr 2016 07:54:53 +0800 Subject: [SPARK-15003] Use ConcurrentHashMap in place of HashMap for NewAccumulator.originals ## What changes were proposed in this pull request? This PR proposes to use ConcurrentHashMap in place of HashMap for NewAccumulator.originals This should result in better performance. ## How was this patch tested? Existing unit test suite cloud-fan Author: tedyu Closes #12776 from tedyu/master. --- .../main/scala/org/apache/spark/NewAccumulator.scala | 20 ++++++++------------ .../org/apache/spark/InternalAccumulatorSuite.scala | 3 ++- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala index aa21ccc1ff..1571e15b76 100644 --- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.{lang => jl} import java.io.ObjectInputStream +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import javax.annotation.concurrent.GuardedBy @@ -198,8 +199,7 @@ private[spark] object AccumulatorContext { * once the RDDs and user-code that reference them are cleaned up. * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). */ - @GuardedBy("AccumulatorContext") - private val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] + private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] private[this] val nextId = new AtomicLong(0L) @@ -209,9 +209,7 @@ private[spark] object AccumulatorContext { */ def newId(): Long = nextId.getAndIncrement - def numAccums: Int = synchronized(originals.size) - - def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet) + def numAccums: Int = originals.size /** * Register an [[Accumulator]] created on the driver such that it can be used on the executors. @@ -224,23 +222,21 @@ private[spark] object AccumulatorContext { * If an [[Accumulator]] with the same ID was already registered, this does nothing instead * of overwriting it. We will never register same accumulator twice, this is just a sanity check. */ - def register(a: NewAccumulator[_, _]): Unit = synchronized { - if (!originals.containsKey(a.id)) { - originals.put(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a)) - } + def register(a: NewAccumulator[_, _]): Unit = { + originals.putIfAbsent(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a)) } /** * Unregister the [[Accumulator]] with the given ID, if any. */ - def remove(id: Long): Unit = synchronized { + def remove(id: Long): Unit = { originals.remove(id) } /** * Return the [[Accumulator]] registered with the given ID, if any. */ - def get(id: Long): Option[NewAccumulator[_, _]] = synchronized { + def get(id: Long): Option[NewAccumulator[_, _]] = { Option(originals.get(id)).map { ref => // Since we are storing weak references, we must check whether the underlying data is valid. val acc = ref.get @@ -254,7 +250,7 @@ private[spark] object AccumulatorContext { /** * Clear all registered [[Accumulator]]s. For testing only. */ - def clear(): Unit = synchronized { + def clear(): Unit = { originals.clear() } } diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 972e31c411..688eb6bde9 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -194,7 +194,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } // Make sure the same set of accumulators is registered for cleanup assert(accumsRegistered.size === numInternalAccums * 2) - assert(accumsRegistered.toSet === AccumulatorContext.accumIds) + assert(accumsRegistered.toSet.size === AccumulatorContext.numAccums) + accumsRegistered.foreach(id => assert(AccumulatorContext.get(id) != None)) } /** -- cgit v1.2.3