From 6f9a18fe311925056cce83a44f187f122b6591cb Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 28 Apr 2016 21:57:58 -0700 Subject: [HOTFIX][CORE] fix a concurrence issue in NewAccumulator ## What changes were proposed in this pull request? `AccumulatorContext` is not thread-safe, that's why all of its methods are synchronized. However, there is one exception: the `AccumulatorContext.originals`. `NewAccumulator` use it to check if it's registered, which is wrong as it's not synchronized. This PR mark `AccumulatorContext.originals` as `private` and now all access to `AccumulatorContext` is synchronized. ## How was this patch tested? I verified it locally. To be safe, we can let jenkins test it many times to make sure this problem is gone. Author: Wenchen Fan Closes #12773 from cloud-fan/debug. --- core/src/main/scala/org/apache/spark/NewAccumulator.scala | 10 ++++++++-- core/src/test/scala/org/apache/spark/AccumulatorSuite.scala | 2 +- .../test/scala/org/apache/spark/InternalAccumulatorSuite.scala | 6 +++--- .../src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 4 ++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala index edb9b741a8..aa21ccc1ff 100644 --- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala @@ -22,6 +22,8 @@ import java.io.ObjectInputStream import java.util.concurrent.atomic.AtomicLong import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ + import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.util.Utils @@ -57,7 +59,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable { * registered before ues, or it will throw exception. */ final def isRegistered: Boolean = - metadata != null && AccumulatorContext.originals.containsKey(metadata.id) + metadata != null && AccumulatorContext.get(metadata.id).isDefined private def assertMetadataNotNull(): Unit = { if (metadata == null) { @@ -197,7 +199,7 @@ private[spark] object AccumulatorContext { * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). */ @GuardedBy("AccumulatorContext") - val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] + private val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] private[this] val nextId = new AtomicLong(0L) @@ -207,6 +209,10 @@ private[spark] object AccumulatorContext { */ def newId(): Long = nextId.getAndIncrement + def numAccums: Int = synchronized(originals.size) + + def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet) + /** * Register an [[Accumulator]] created on the driver such that it can be used on the executors. * diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 5f97e58845..9c90049715 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -191,7 +191,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex assert(ref.get.isEmpty) AccumulatorContext.remove(accId) - assert(!AccumulatorContext.originals.containsKey(accId)) + assert(!AccumulatorContext.get(accId).isDefined) } test("get accum") { diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index e4474bb813..972e31c411 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private val myCleaner = new SaveAccumContextCleaner(this) override def cleaner: Option[ContextCleaner] = Some(myCleaner) } - assert(AccumulatorContext.originals.isEmpty) + assert(AccumulatorContext.numAccums == 0) sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count() val numInternalAccums = TaskMetrics.empty.internalAccums.length // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage - assert(AccumulatorContext.originals.size === numInternalAccums * 2) + assert(AccumulatorContext.numAccums === numInternalAccums * 2) val accumsRegistered = sc.cleaner match { case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup case _ => Seq.empty[Long] } // Make sure the same set of accumulators is registered for cleanup assert(accumsRegistered.size === numInternalAccums * 2) - assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala) + assert(accumsRegistered.toSet === AccumulatorContext.accumIds) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 0e6356b578..1095a73c58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -334,10 +334,10 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("SELECT * FROM t2").count() AccumulatorContext.synchronized { - val accsSize = AccumulatorContext.originals.size + val accsSize = AccumulatorContext.numAccums sqlContext.uncacheTable("t1") sqlContext.uncacheTable("t2") - assert((accsSize - 2) == AccumulatorContext.originals.size) + assert((accsSize - 2) == AccumulatorContext.numAccums) } } -- cgit v1.2.3