aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortedyu <yuzhihong@gmail.com>2016-04-30 07:54:53 +0800
committerWenchen Fan <wenchen@databricks.com>2016-04-30 07:54:53 +0800
commitdcfaeadea7e0013af98de626dec36306325f73e7 (patch)
treeae22e47c973ca892191f97c1da0b8ca45f59d087
parent83061be697f69f7e39deb9cda45742a323714231 (diff)
downloadspark-dcfaeadea7e0013af98de626dec36306325f73e7.tar.gz
spark-dcfaeadea7e0013af98de626dec36306325f73e7.tar.bz2
spark-dcfaeadea7e0013af98de626dec36306325f73e7.zip
[SPARK-15003] Use ConcurrentHashMap in place of HashMap for NewAccumulator.originals
## What changes were proposed in this pull request? This PR proposes to use ConcurrentHashMap in place of HashMap for NewAccumulator.originals This should result in better performance. ## How was this patch tested? Existing unit test suite cloud-fan Author: tedyu <yuzhihong@gmail.com> Closes #12776 from tedyu/master.
-rw-r--r--core/src/main/scala/org/apache/spark/NewAccumulator.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala3
2 files changed, 10 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
index aa21ccc1ff..1571e15b76 100644
--- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import java.{lang => jl}
import java.io.ObjectInputStream
+import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy
@@ -198,8 +199,7 @@ private[spark] object AccumulatorContext {
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
- @GuardedBy("AccumulatorContext")
- private val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
+ private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
private[this] val nextId = new AtomicLong(0L)
@@ -209,9 +209,7 @@ private[spark] object AccumulatorContext {
*/
def newId(): Long = nextId.getAndIncrement
- def numAccums: Int = synchronized(originals.size)
-
- def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet)
+ def numAccums: Int = originals.size
/**
* Register an [[Accumulator]] created on the driver such that it can be used on the executors.
@@ -224,23 +222,21 @@ private[spark] object AccumulatorContext {
* If an [[Accumulator]] with the same ID was already registered, this does nothing instead
* of overwriting it. We will never register same accumulator twice, this is just a sanity check.
*/
- def register(a: NewAccumulator[_, _]): Unit = synchronized {
- if (!originals.containsKey(a.id)) {
- originals.put(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a))
- }
+ def register(a: NewAccumulator[_, _]): Unit = {
+ originals.putIfAbsent(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a))
}
/**
* Unregister the [[Accumulator]] with the given ID, if any.
*/
- def remove(id: Long): Unit = synchronized {
+ def remove(id: Long): Unit = {
originals.remove(id)
}
/**
* Return the [[Accumulator]] registered with the given ID, if any.
*/
- def get(id: Long): Option[NewAccumulator[_, _]] = synchronized {
+ def get(id: Long): Option[NewAccumulator[_, _]] = {
Option(originals.get(id)).map { ref =>
// Since we are storing weak references, we must check whether the underlying data is valid.
val acc = ref.get
@@ -254,7 +250,7 @@ private[spark] object AccumulatorContext {
/**
* Clear all registered [[Accumulator]]s. For testing only.
*/
- def clear(): Unit = synchronized {
+ def clear(): Unit = {
originals.clear()
}
}
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 972e31c411..688eb6bde9 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -194,7 +194,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
// Make sure the same set of accumulators is registered for cleanup
assert(accumsRegistered.size === numInternalAccums * 2)
- assert(accumsRegistered.toSet === AccumulatorContext.accumIds)
+ assert(accumsRegistered.toSet.size === AccumulatorContext.numAccums)
+ accumsRegistered.foreach(id => assert(AccumulatorContext.get(id) != None))
}
/**