aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorHuaxin Gao <huaxing@us.ibm.com>2016-02-22 09:44:32 +0000
committerSean Owen <sowen@cloudera.com>2016-02-22 09:44:32 +0000
commit8f35d3eac9268127512851e52864e64b0bae2f33 (patch)
tree213baa664403ecf7d0ea53c5de11da63f9dd0322 /streaming/src
parent39ff15457026767a4d9ff191174fc85e7907f489 (diff)
downloadspark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.gz
spark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.bz2
spark-8f35d3eac9268127512851e52864e64b0bae2f33.zip
[SPARK-13186][STREAMING] migrate away from SynchronizedMap
trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead. Author: Huaxin Gao <huaxing@us.ibm.com> Closes #11250 from huaxingao/spark__13186.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala30
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala12
3 files changed, 27 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 1c2325409b..a25dada5ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -117,7 +117,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Map of batch-time to selected file info for the remembered batches
// This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles =
- new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
+ new mutable.HashMap[Time, Array[String]]
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -148,7 +148,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Find new files
val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
- batchTimeToSelectedFiles += ((validTime, newFiles))
+ batchTimeToSelectedFiles.synchronized {
+ batchTimeToSelectedFiles += ((validTime, newFiles))
+ }
recentlySelectedFiles ++= newFiles
val rdds = Some(filesToRDD(newFiles))
// Copy newFiles to immutable.List to prevent from being modified by the user
@@ -162,14 +164,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
- super.clearMetadata(time)
- val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
- batchTimeToSelectedFiles --= oldFiles.keys
- recentlySelectedFiles --= oldFiles.values.flatten
- logInfo("Cleared " + oldFiles.size + " old files that were older than " +
- (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
- logDebug("Cleared files are:\n" +
- oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+ batchTimeToSelectedFiles.synchronized {
+ val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
+ batchTimeToSelectedFiles --= oldFiles.keys
+ recentlySelectedFiles --= oldFiles.values.flatten
+ logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+ (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+ logDebug("Cleared files are:\n" +
+ oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+ }
// Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}
@@ -307,8 +310,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
- batchTimeToSelectedFiles =
- new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
+ batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
@@ -324,7 +326,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
override def update(time: Time) {
hadoopFiles.clear()
- hadoopFiles ++= batchTimeToSelectedFiles
+ batchTimeToSelectedFiles.synchronized { hadoopFiles ++= batchTimeToSelectedFiles }
}
override def cleanup(time: Time) { }
@@ -335,7 +337,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") )
- batchTimeToSelectedFiles += ((t, f))
+ batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f)))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 1f0245a397..dada495843 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -613,7 +613,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
def recordedFiles(ssc: StreamingContext): Seq[Int] = {
val fileInputDStream =
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
- val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+ val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
+ { fileInputDStream.batchTimeToSelectedFiles.values.flatten }
filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 66f47394c7..6c60652cd6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -270,7 +270,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
_ssc.stop()
- failureReasonsCollector.failureReasons.toMap
+ failureReasonsCollector.failureReasons.synchronized
+ {
+ failureReasonsCollector.failureReasons.toMap
+ }
}
/** Check if a sequence of numbers is in increasing order */
@@ -354,12 +357,15 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
*/
class FailureReasonsCollector extends StreamingListener {
- val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String]
+ val failureReasons = new HashMap[Int, String]
override def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
outputOperationCompleted.outputOperationInfo.failureReason.foreach { f =>
- failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+ failureReasons.synchronized
+ {
+ failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+ }
}
}
}