aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorWilson Wu <wilson888888888@gmail.com>2016-03-14 09:13:29 +0000
committerSean Owen <sowen@cloudera.com>2016-03-14 09:13:29 +0000
commit31d069d4c2956306355d14087ca74ce1e6705217 (patch)
treebbbcfa1eca22c761eba73be7e91d554a6a332093 /core
parentacdf21970334cea9d6cfc287e4ccb8e72de9dee1 (diff)
downloadspark-31d069d4c2956306355d14087ca74ce1e6705217.tar.gz
spark-31d069d4c2956306355d14087ca74ce1e6705217.tar.bz2
spark-31d069d4c2956306355d14087ca74ce1e6705217.zip
[SPARK-13746][TESTS] stop using deprecated SynchronizedSet
trait SynchronizedSet in package mutable is deprecated Author: Wilson Wu <wilson888888888@gmail.com> Closes #11580 from wilson888888888/spark-synchronizedset.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala41
1 files changed, 25 insertions, 16 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index d1e806b2eb..e60678b300 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import java.lang.ref.WeakReference
-import scala.collection.mutable.{HashSet, SynchronizedSet}
+import scala.collection.mutable.HashSet
import scala.language.existentials
import scala.util.Random
@@ -442,25 +442,25 @@ class CleanerTester(
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {
- val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
- val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
- val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
- val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
+ val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds
+ val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds
+ val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds
+ val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal
val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = {
- toBeCleanedRDDIds -= rddId
+ toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId }
logInfo("RDD " + rddId + " cleaned")
}
def shuffleCleaned(shuffleId: Int): Unit = {
- toBeCleanedShuffleIds -= shuffleId
+ toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId }
logInfo("Shuffle " + shuffleId + " cleaned")
}
def broadcastCleaned(broadcastId: Long): Unit = {
- toBeCleanedBroadcstIds -= broadcastId
+ toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId }
logInfo("Broadcast " + broadcastId + " cleaned")
}
@@ -469,7 +469,7 @@ class CleanerTester(
}
def checkpointCleaned(rddId: Long): Unit = {
- toBeCheckpointIds -= rddId
+ toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId }
logInfo("checkpoint " + rddId + " cleaned")
}
}
@@ -578,18 +578,27 @@ class CleanerTester(
}
private def uncleanedResourcesToString = {
+ val s1 = toBeCleanedRDDIds.synchronized {
+ toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")
+ }
+ val s2 = toBeCleanedShuffleIds.synchronized {
+ toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")
+ }
+ val s3 = toBeCleanedBroadcstIds.synchronized {
+ toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")
+ }
s"""
- |\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}
- |\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}
- |\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")}
+ |\tRDDs = $s1
+ |\tShuffles = $s2
+ |\tBroadcasts = $s3
""".stripMargin
}
private def isAllCleanedUp =
- toBeCleanedRDDIds.isEmpty &&
- toBeCleanedShuffleIds.isEmpty &&
- toBeCleanedBroadcstIds.isEmpty &&
- toBeCheckpointIds.isEmpty
+ toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } &&
+ toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } &&
+ toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } &&
+ toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty }
private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {