diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 6 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 14 |
2 files changed, 17 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 5ba09a54af..eca7c79465 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -175,6 +175,12 @@ private[streaming] class ReceiverSupervisorImpl( } override protected def onStop(message: String, error: Option[Throwable]) { + receivedBlockHandler match { + case handler: WriteAheadLogBasedBlockHandler => + // Write ahead log should be closed. + handler.stop() + case _ => + } registeredBlockGenerators.asScala.foreach { _.stop() } env.rpcEnv.stop(endpoint) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index a1e9d1e023..7fcf45e7de 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import java.io._ import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentLinkedQueue @@ -629,7 +629,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized { fileInputDStream.batchTimeToSelectedFiles.values.flatten } - filenames.map(_.split(File.separator).last.toInt).toSeq.sorted + filenames.map(_.split("/").last.toInt).toSeq.sorted } try { @@ -755,7 +755,15 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet) } } finally { - Utils.deleteRecursively(testDir) + try { + // As the driver shuts down in the middle of processing and the thread above sleeps + // for a while, `testDir` can be not closed correctly at this point which causes the + // test failure on Windows. + Utils.deleteRecursively(testDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } } |