aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala14
2 files changed, 17 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 5ba09a54af..eca7c79465 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -175,6 +175,12 @@ private[streaming] class ReceiverSupervisorImpl(
}
override protected def onStop(message: String, error: Option[Throwable]) {
+ receivedBlockHandler match {
+ case handler: WriteAheadLogBasedBlockHandler =>
+ // Write ahead log should be closed.
+ handler.stop()
+ case _ =>
+ }
registeredBlockGenerators.asScala.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a1e9d1e023..7fcf45e7de 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
+import java.io._
import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentLinkedQueue
@@ -629,7 +629,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
{ fileInputDStream.batchTimeToSelectedFiles.values.flatten }
- filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
+ filenames.map(_.split("/").last.toInt).toSeq.sorted
}
try {
@@ -755,7 +755,15 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet)
}
} finally {
- Utils.deleteRecursively(testDir)
+ try {
+ // As the driver shuts down in the middle of processing and the thread above sleeps
+ // for a while, `testDir` can be not closed correctly at this point which causes the
+ // test failure on Windows.
+ Utils.deleteRecursively(testDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
}
}