aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala41
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala22
2 files changed, 39 insertions, 24 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index d1e806b2eb..e60678b300 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import java.lang.ref.WeakReference
-import scala.collection.mutable.{HashSet, SynchronizedSet}
+import scala.collection.mutable.HashSet
import scala.language.existentials
import scala.util.Random
@@ -442,25 +442,25 @@ class CleanerTester(
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {
- val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
- val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
- val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
- val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
+ val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds
+ val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds
+ val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds
+ val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal
val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = {
- toBeCleanedRDDIds -= rddId
+ toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId }
logInfo("RDD " + rddId + " cleaned")
}
def shuffleCleaned(shuffleId: Int): Unit = {
- toBeCleanedShuffleIds -= shuffleId
+ toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId }
logInfo("Shuffle " + shuffleId + " cleaned")
}
def broadcastCleaned(broadcastId: Long): Unit = {
- toBeCleanedBroadcstIds -= broadcastId
+ toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId }
logInfo("Broadcast " + broadcastId + " cleaned")
}
@@ -469,7 +469,7 @@ class CleanerTester(
}
def checkpointCleaned(rddId: Long): Unit = {
- toBeCheckpointIds -= rddId
+ toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId }
logInfo("checkpoint " + rddId + " cleaned")
}
}
@@ -578,18 +578,27 @@ class CleanerTester(
}
private def uncleanedResourcesToString = {
+ val s1 = toBeCleanedRDDIds.synchronized {
+ toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")
+ }
+ val s2 = toBeCleanedShuffleIds.synchronized {
+ toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")
+ }
+ val s3 = toBeCleanedBroadcstIds.synchronized {
+ toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")
+ }
s"""
- |\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}
- |\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}
- |\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")}
+ |\tRDDs = $s1
+ |\tShuffles = $s2
+ |\tBroadcasts = $s3
""".stripMargin
}
private def isAllCleanedUp =
- toBeCleanedRDDIds.isEmpty &&
- toBeCleanedShuffleIds.isEmpty &&
- toBeCleanedBroadcstIds.isEmpty &&
- toBeCheckpointIds.isEmpty
+ toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } &&
+ toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } &&
+ toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } &&
+ toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty }
private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ca5d13da46..4460b6bcca 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + collected.mkString(", "))
+ collected.synchronized {
+ collected ++= rdd.collect()
+ logInfo("Collected = " + collected.mkString(", "))
+ }
}
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
- assert(collected === testData.toSet, "\nData received does not match data sent")
+ assert(collected.synchronized { collected === testData.toSet },
+ "\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
@@ -205,10 +208,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
stream shouldBe a [ReceiverInputDStream[_]]
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ val collected = new mutable.HashSet[Int]
stream.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + collected.mkString(", "))
+ collected.synchronized {
+ collected ++= rdd.collect()
+ logInfo("Collected = " + collected.mkString(", "))
+ }
}
ssc.start()
@@ -216,7 +221,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
- assert(collected === modData.toSet, "\nData received does not match data sent")
+ assert(collected.synchronized { collected === modData.toSet },
+ "\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}