diff options
Diffstat (limited to 'streaming/src')
3 files changed, 27 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1c2325409b..a25dada5ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -117,7 +117,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Map of batch-time to selected file info for the remembered batches // This is a concurrent map because it's also accessed in unit tests @transient private[streaming] var batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] + new mutable.HashMap[Time, Array[String]] // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() @@ -148,7 +148,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Find new files val newFiles = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) - batchTimeToSelectedFiles += ((validTime, newFiles)) + batchTimeToSelectedFiles.synchronized { + batchTimeToSelectedFiles += ((validTime, newFiles)) + } recentlySelectedFiles ++= newFiles val rdds = Some(filesToRDD(newFiles)) // Copy newFiles to immutable.List to prevent from being modified by the user @@ -162,14 +164,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( /** Clear the old time-to-files mappings along with old RDDs */ protected[streaming] override def clearMetadata(time: Time) { - super.clearMetadata(time) - val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) - batchTimeToSelectedFiles --= oldFiles.keys - recentlySelectedFiles --= oldFiles.values.flatten - logInfo("Cleared " + oldFiles.size + " old files that were older than " + - (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) - logDebug("Cleared files are:\n" + - oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) + batchTimeToSelectedFiles.synchronized { + val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) + batchTimeToSelectedFiles --= oldFiles.keys + recentlySelectedFiles --= oldFiles.values.flatten + logInfo("Cleared " + oldFiles.size + " old files that were older than " + + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) + logDebug("Cleared files are:\n" + + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) + } // Delete file mod times that weren't accessed in the last round of getting new files fileToModTime.clearOldValues(lastNewFileFindingTime - 1) } @@ -307,8 +310,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() - batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] + batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) } @@ -324,7 +326,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( override def update(time: Time) { hadoopFiles.clear() - hadoopFiles ++= batchTimeToSelectedFiles + batchTimeToSelectedFiles.synchronized { hadoopFiles ++= batchTimeToSelectedFiles } } override def cleanup(time: Time) { } @@ -335,7 +337,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Restore the metadata in both files and generatedRDDs logInfo("Restoring files for time " + t + " - " + f.mkString("[", ", ", "]") ) - batchTimeToSelectedFiles += ((t, f)) + batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) } recentlySelectedFiles ++= f generatedRDDs += ((t, filesToRDD(f))) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 1f0245a397..dada495843 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -613,7 +613,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester def recordedFiles(ssc: StreamingContext): Seq[Int] = { val fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten + val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized + { fileInputDStream.batchTimeToSelectedFiles.values.flatten } filenames.map(_.split(File.separator).last.toInt).toSeq.sorted } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 66f47394c7..6c60652cd6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -270,7 +270,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } _ssc.stop() - failureReasonsCollector.failureReasons.toMap + failureReasonsCollector.failureReasons.synchronized + { + failureReasonsCollector.failureReasons.toMap + } } /** Check if a sequence of numbers is in increasing order */ @@ -354,12 +357,15 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O */ class FailureReasonsCollector extends StreamingListener { - val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String] + val failureReasons = new HashMap[Int, String] override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { outputOperationCompleted.outputOperationInfo.failureReason.foreach { f => - failureReasons(outputOperationCompleted.outputOperationInfo.id) = f + failureReasons.synchronized + { + failureReasons(outputOperationCompleted.outputOperationInfo.id) = f + } } } } |