aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-28 21:57:58 -0700
committerReynold Xin <rxin@databricks.com>2016-04-28 21:57:58 -0700
commit6f9a18fe311925056cce83a44f187f122b6591cb (patch)
treeffbbd73a0399c6aad5ef6871e2e264c7ac9b3336
parent9c7c42bc6a35679cfffcdfb6feb26af834fec2e1 (diff)
downloadspark-6f9a18fe311925056cce83a44f187f122b6591cb.tar.gz
spark-6f9a18fe311925056cce83a44f187f122b6591cb.tar.bz2
spark-6f9a18fe311925056cce83a44f187f122b6591cb.zip
[HOTFIX][CORE] fix a concurrence issue in NewAccumulator
## What changes were proposed in this pull request? `AccumulatorContext` is not thread-safe, that's why all of its methods are synchronized. However, there is one exception: the `AccumulatorContext.originals`. `NewAccumulator` use it to check if it's registered, which is wrong as it's not synchronized. This PR mark `AccumulatorContext.originals` as `private` and now all access to `AccumulatorContext` is synchronized. ## How was this patch tested? I verified it locally. To be safe, we can let jenkins test it many times to make sure this problem is gone. Author: Wenchen Fan <wenchen@databricks.com> Closes #12773 from cloud-fan/debug.
-rw-r--r--core/src/main/scala/org/apache/spark/NewAccumulator.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala4
4 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
index edb9b741a8..aa21ccc1ff 100644
--- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
@@ -22,6 +22,8 @@ import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
@@ -57,7 +59,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
* registered before ues, or it will throw exception.
*/
final def isRegistered: Boolean =
- metadata != null && AccumulatorContext.originals.containsKey(metadata.id)
+ metadata != null && AccumulatorContext.get(metadata.id).isDefined
private def assertMetadataNotNull(): Unit = {
if (metadata == null) {
@@ -197,7 +199,7 @@ private[spark] object AccumulatorContext {
* TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
@GuardedBy("AccumulatorContext")
- val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
+ private val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
private[this] val nextId = new AtomicLong(0L)
@@ -207,6 +209,10 @@ private[spark] object AccumulatorContext {
*/
def newId(): Long = nextId.getAndIncrement
+ def numAccums: Int = synchronized(originals.size)
+
+ def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet)
+
/**
* Register an [[Accumulator]] created on the driver such that it can be used on the executors.
*
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 5f97e58845..9c90049715 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -191,7 +191,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
assert(ref.get.isEmpty)
AccumulatorContext.remove(accId)
- assert(!AccumulatorContext.originals.containsKey(accId))
+ assert(!AccumulatorContext.get(accId).isDefined)
}
test("get accum") {
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index e4474bb813..972e31c411 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private val myCleaner = new SaveAccumContextCleaner(this)
override def cleaner: Option[ContextCleaner] = Some(myCleaner)
}
- assert(AccumulatorContext.originals.isEmpty)
+ assert(AccumulatorContext.numAccums == 0)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
val numInternalAccums = TaskMetrics.empty.internalAccums.length
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
- assert(AccumulatorContext.originals.size === numInternalAccums * 2)
+ assert(AccumulatorContext.numAccums === numInternalAccums * 2)
val accumsRegistered = sc.cleaner match {
case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
case _ => Seq.empty[Long]
}
// Make sure the same set of accumulators is registered for cleanup
assert(accumsRegistered.size === numInternalAccums * 2)
- assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala)
+ assert(accumsRegistered.toSet === AccumulatorContext.accumIds)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0e6356b578..1095a73c58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -334,10 +334,10 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sql("SELECT * FROM t2").count()
AccumulatorContext.synchronized {
- val accsSize = AccumulatorContext.originals.size
+ val accsSize = AccumulatorContext.numAccums
sqlContext.uncacheTable("t1")
sqlContext.uncacheTable("t2")
- assert((accsSize - 2) == AccumulatorContext.originals.size)
+ assert((accsSize - 2) == AccumulatorContext.numAccums)
}
}