diff options
author | Wilson Wu <wilson888888888@gmail.com> | 2016-03-14 09:13:29 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-14 09:13:29 +0000 |
commit | 31d069d4c2956306355d14087ca74ce1e6705217 (patch) | |
tree | bbbcfa1eca22c761eba73be7e91d554a6a332093 /core/src/test | |
parent | acdf21970334cea9d6cfc287e4ccb8e72de9dee1 (diff) | |
download | spark-31d069d4c2956306355d14087ca74ce1e6705217.tar.gz spark-31d069d4c2956306355d14087ca74ce1e6705217.tar.bz2 spark-31d069d4c2956306355d14087ca74ce1e6705217.zip |
[SPARK-13746][TESTS] stop using deprecated SynchronizedSet
trait SynchronizedSet in package mutable is deprecated
Author: Wilson Wu <wilson888888888@gmail.com>
Closes #11580 from wilson888888888/spark-synchronizedset.
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala | 41 |
1 files changed, 25 insertions, 16 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index d1e806b2eb..e60678b300 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.lang.ref.WeakReference -import scala.collection.mutable.{HashSet, SynchronizedSet} +import scala.collection.mutable.HashSet import scala.language.existentials import scala.util.Random @@ -442,25 +442,25 @@ class CleanerTester( checkpointIds: Seq[Long] = Seq.empty) extends Logging { - val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds - val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds - val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds - val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds + val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds + val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds + val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds + val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds val isDistributed = !sc.isLocal val cleanerListener = new CleanerListener { def rddCleaned(rddId: Int): Unit = { - toBeCleanedRDDIds -= rddId + toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId } logInfo("RDD " + rddId + " cleaned") } def shuffleCleaned(shuffleId: Int): Unit = { - toBeCleanedShuffleIds -= shuffleId + toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId } logInfo("Shuffle " + shuffleId + " cleaned") } def broadcastCleaned(broadcastId: Long): Unit = { - toBeCleanedBroadcstIds -= broadcastId + toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId } logInfo("Broadcast " + broadcastId + " cleaned") } @@ -469,7 +469,7 @@ class CleanerTester( } def checkpointCleaned(rddId: Long): Unit = { - toBeCheckpointIds -= rddId + toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId } logInfo("checkpoint " + rddId + " cleaned") } } @@ -578,18 +578,27 @@ class CleanerTester( } private def uncleanedResourcesToString = { + val s1 = toBeCleanedRDDIds.synchronized { + toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]") + } + val s2 = toBeCleanedShuffleIds.synchronized { + toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]") + } + val s3 = toBeCleanedBroadcstIds.synchronized { + toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]") + } s""" - |\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")} - |\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")} - |\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")} + |\tRDDs = $s1 + |\tShuffles = $s2 + |\tBroadcasts = $s3 """.stripMargin } private def isAllCleanedUp = - toBeCleanedRDDIds.isEmpty && - toBeCleanedShuffleIds.isEmpty && - toBeCleanedBroadcstIds.isEmpty && - toBeCheckpointIds.isEmpty + toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } && + toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } && + toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } && + toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty } private def getRDDBlocks(rddId: Int): Seq[BlockId] = { blockManager.master.getMatchingBlockIds( _ match { |