From 503378f10ca92064034aa88e0feebe4718af8bbe Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 15 Nov 2016 15:44:15 -0800 Subject: [SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when stopped even if it was not started ## What changes were proposed in this pull request? Several tests are being failed on Windows due to the failure of removing the checkpoint dir between each tests. This is caused by not closed file in `ReceiverTracker`. When it is not started, it does not close it even if `stop()` is called. ``` Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery started Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\1478983663710-0, took 3.828 sec at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010) at org.apache.spark.util.Utils.deleteRecursively(Utils.scala) at org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery(JavaAPISuite.java:1809) ... ``` ``` - mapWithState - basic operations with simple API (7 seconds, 640 milliseconds) Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.MapWithStateSuite *** ABORTED *** (12 seconds, 688 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\streaming\checkpoint\spark-b8486e2b-6468-4e6f-bb24-88277d2c033c ... ``` ## How was this patch tested? Tests in `JavaAPISuite` and `MapWithStateSuite`. Manually tested via AppVeyor: **Before** - `org.apache.spark.streaming.JavaAPISuite` Build: https://ci.appveyor.com/project/spark-test/spark/build/71-MapWithStateSuite-1 Diff: https://github.com/apache/spark/compare/master...spark-test:188c828e682ec45b75d15c3dfc782bcdc8ce024c - `org.apache.spark.streaming.MapWithStateSuite` Build: https://ci.appveyor.com/project/spark-test/spark/build/72-MapWithStateSuite-1 Diff: https://github.com/apache/spark/compare/master...spark-test:8f6945d0ccde022a23d3848f6b7fe6da1e7c902e **After** - `org.apache.spark.streaming.JavaAPISuite` Build started: [Streaming] `org.apache.spark.streaming.JavaAPISuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=3D74F2D5-B0D5-4E1D-874C-685AE694FD37&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/3D74F2D5-B0D5-4E1D-874C-685AE694FD37) Diff: https://github.com/apache/spark/compare/master...spark-test:3D74F2D5-B0D5-4E1D-874C-685AE694FD37 - `org.apache.spark.streaming.MapWithStateSuite` Build started: [Streaming] `org.apache.spark.streaming.MapWithStateSuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=C8E88B64-49F0-4157-9AFA-FC3ACC442351&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/C8E88B64-49F0-4157-9AFA-FC3ACC442351) Diff: https://github.com/apache/spark/compare/master...spark-test:C8E88B64-49F0-4157-9AFA-FC3ACC442351 Author: hyukjinkwon Closes #15867 from HyukjinKwon/SPARK-18423. --- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index b9d898a723..8f55d982a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -197,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") trackerState = Stopped + } else if (isTrackerInitialized) { + trackerState = Stopping + // `ReceivedBlockTracker` is open when this instance is created. We should + // close this even if this `ReceiverTracker` is not started. + receivedBlockTracker.stop() + logInfo("ReceiverTracker stopped") + trackerState = Stopped } } @@ -446,6 +453,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false endpoint.send(StartAllReceivers(receivers)) } + /** Check if tracker has been marked for initiated */ + private def isTrackerInitialized: Boolean = trackerState == Initialized + /** Check if tracker has been marked for starting */ private def isTrackerStarted: Boolean = trackerState == Started -- cgit v1.2.3