diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/NewAccumulator.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/NewAccumulator.scala | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala index edb9b741a8..aa21ccc1ff 100644 --- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala @@ -22,6 +22,8 @@ import java.io.ObjectInputStream import java.util.concurrent.atomic.AtomicLong import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ + import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.util.Utils @@ -57,7 +59,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable { * registered before ues, or it will throw exception. */ final def isRegistered: Boolean = - metadata != null && AccumulatorContext.originals.containsKey(metadata.id) + metadata != null && AccumulatorContext.get(metadata.id).isDefined private def assertMetadataNotNull(): Unit = { if (metadata == null) { @@ -197,7 +199,7 @@ private[spark] object AccumulatorContext { * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). */ @GuardedBy("AccumulatorContext") - val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] + private val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] private[this] val nextId = new AtomicLong(0L) @@ -207,6 +209,10 @@ private[spark] object AccumulatorContext { */ def newId(): Long = nextId.getAndIncrement + def numAccums: Int = synchronized(originals.size) + + def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet) + /** * Register an [[Accumulator]] created on the driver such that it can be used on the executors. * |