aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/NewAccumulator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/NewAccumulator.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/NewAccumulator.scala10
1 files changed, 8 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
index edb9b741a8..aa21ccc1ff 100644
--- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
@@ -22,6 +22,8 @@ import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
@@ -57,7 +59,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
* registered before ues, or it will throw exception.
*/
final def isRegistered: Boolean =
- metadata != null && AccumulatorContext.originals.containsKey(metadata.id)
+ metadata != null && AccumulatorContext.get(metadata.id).isDefined
private def assertMetadataNotNull(): Unit = {
if (metadata == null) {
@@ -197,7 +199,7 @@ private[spark] object AccumulatorContext {
* TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
@GuardedBy("AccumulatorContext")
- val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
+ private val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
private[this] val nextId = new AtomicLong(0L)
@@ -207,6 +209,10 @@ private[spark] object AccumulatorContext {
*/
def newId(): Long = nextId.getAndIncrement
+ def numAccums: Int = synchronized(originals.size)
+
+ def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet)
+
/**
* Register an [[Accumulator]] created on the driver such that it can be used on the executors.
*