aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHuaxin Gao <huaxing@us.ibm.com>2016-02-22 09:44:32 +0000
committerSean Owen <sowen@cloudera.com>2016-02-22 09:44:32 +0000
commit8f35d3eac9268127512851e52864e64b0bae2f33 (patch)
tree213baa664403ecf7d0ea53c5de11da63f9dd0322
parent39ff15457026767a4d9ff191174fc85e7907f489 (diff)
downloadspark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.gz
spark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.bz2
spark-8f35d3eac9268127512851e52864e64b0bae2f33.zip
[SPARK-13186][STREAMING] migrate away from SynchronizedMap
trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead. Author: Huaxin Gao <huaxing@us.ibm.com> Closes #11250 from huaxingao/spark__13186.
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala13
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala30
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala12
5 files changed, 55 insertions, 41 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 797b07f80d..6a35ac14a8 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
- val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
+ val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
- val ret = r.collect()
- ret.toMap.foreach { kv =>
- val count = result.getOrElseUpdate(kv._1, 0) + kv._2
- result.put(kv._1, count)
+ r.collect().foreach { kv =>
+ result.synchronized {
+ val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+ result.put(kv._1, count)
+ }
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- assert(sent === result)
+ assert(result.synchronized { sent === result })
}
}
}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ee6a5f0390..ca5d13da46 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -230,7 +230,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
- with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
@@ -241,13 +240,16 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
- collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ collectedData.synchronized {
+ collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ }
})
ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
ssc.start()
- def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
+ def numBatchesWithData: Int =
+ collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
@@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
// and return the same data
- val times = collectedData.keySet
- times.foreach { time =>
- val (arrayOfSeqNumRanges, data) = collectedData(time)
- val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
- rdd shouldBe a [KinesisBackedBlockRDD[_]]
-
- // Verify the recovered sequence ranges
- val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
- assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
- arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
- assert(expected.ranges.toSeq === found.ranges.toSeq)
+ collectedData.synchronized {
+ val times = collectedData.keySet
+ times.foreach { time =>
+ val (arrayOfSeqNumRanges, data) = collectedData(time)
+ val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+ rdd shouldBe a[KinesisBackedBlockRDD[_]]
+
+ // Verify the recovered sequence ranges
+ val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+ assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+ arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
+ assert(expected.ranges.toSeq === found.ranges.toSeq)
+ }
+
+ // Verify the recovered data
+ assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
-
- // Verify the recovered data
- assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
ssc.stop()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 1c2325409b..a25dada5ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -117,7 +117,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Map of batch-time to selected file info for the remembered batches
// This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles =
- new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
+ new mutable.HashMap[Time, Array[String]]
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -148,7 +148,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Find new files
val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
- batchTimeToSelectedFiles += ((validTime, newFiles))
+ batchTimeToSelectedFiles.synchronized {
+ batchTimeToSelectedFiles += ((validTime, newFiles))
+ }
recentlySelectedFiles ++= newFiles
val rdds = Some(filesToRDD(newFiles))
// Copy newFiles to immutable.List to prevent from being modified by the user
@@ -162,14 +164,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
- super.clearMetadata(time)
- val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
- batchTimeToSelectedFiles --= oldFiles.keys
- recentlySelectedFiles --= oldFiles.values.flatten
- logInfo("Cleared " + oldFiles.size + " old files that were older than " +
- (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
- logDebug("Cleared files are:\n" +
- oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+ batchTimeToSelectedFiles.synchronized {
+ val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
+ batchTimeToSelectedFiles --= oldFiles.keys
+ recentlySelectedFiles --= oldFiles.values.flatten
+ logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+ (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+ logDebug("Cleared files are:\n" +
+ oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+ }
// Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}
@@ -307,8 +310,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
- batchTimeToSelectedFiles =
- new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
+ batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
@@ -324,7 +326,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
override def update(time: Time) {
hadoopFiles.clear()
- hadoopFiles ++= batchTimeToSelectedFiles
+ batchTimeToSelectedFiles.synchronized { hadoopFiles ++= batchTimeToSelectedFiles }
}
override def cleanup(time: Time) { }
@@ -335,7 +337,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") )
- batchTimeToSelectedFiles += ((t, f))
+ batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f)))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 1f0245a397..dada495843 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -613,7 +613,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
def recordedFiles(ssc: StreamingContext): Seq[Int] = {
val fileInputDStream =
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+ val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
+ { fileInputDStream.batchTimeToSelectedFiles.values.flatten }
filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 66f47394c7..6c60652cd6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -270,7 +270,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
_ssc.stop()
- failureReasonsCollector.failureReasons.toMap
+ failureReasonsCollector.failureReasons.synchronized
+ {
+ failureReasonsCollector.failureReasons.toMap
+ }
}
/** Check if a sequence of numbers is in increasing order */
@@ -354,12 +357,15 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
*/
class FailureReasonsCollector extends StreamingListener {
- val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String]
+ val failureReasons = new HashMap[Int, String]
override def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
outputOperationCompleted.outputOperationInfo.failureReason.foreach { f =>
- failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+ failureReasons.synchronized
+ {
+ failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+ }
}
}
}