aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-11-15 15:44:15 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-15 15:44:15 -0800
commit503378f10ca92064034aa88e0feebe4718af8bbe (patch)
tree129c627c4df14410277b6c9cae26c10b93aae7ba /streaming/src
parent1ae4652b7e1f77a984b8459c778cb06c814192c5 (diff)
downloadspark-503378f10ca92064034aa88e0feebe4718af8bbe.tar.gz
spark-503378f10ca92064034aa88e0feebe4718af8bbe.tar.bz2
spark-503378f10ca92064034aa88e0feebe4718af8bbe.zip
[SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when stopped even if it was not started
## What changes were proposed in this pull request? Several tests are being failed on Windows due to the failure of removing the checkpoint dir between each tests. This is caused by not closed file in `ReceiverTracker`. When it is not started, it does not close it even if `stop()` is called. ``` Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery started Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\1478983663710-0, took 3.828 sec at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010) at org.apache.spark.util.Utils.deleteRecursively(Utils.scala) at org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery(JavaAPISuite.java:1809) ... ``` ``` - mapWithState - basic operations with simple API (7 seconds, 640 milliseconds) Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.MapWithStateSuite *** ABORTED *** (12 seconds, 688 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\streaming\checkpoint\spark-b8486e2b-6468-4e6f-bb24-88277d2c033c ... ``` ## How was this patch tested? Tests in `JavaAPISuite` and `MapWithStateSuite`. Manually tested via AppVeyor: **Before** - `org.apache.spark.streaming.JavaAPISuite` Build: https://ci.appveyor.com/project/spark-test/spark/build/71-MapWithStateSuite-1 Diff: https://github.com/apache/spark/compare/master...spark-test:188c828e682ec45b75d15c3dfc782bcdc8ce024c - `org.apache.spark.streaming.MapWithStateSuite` Build: https://ci.appveyor.com/project/spark-test/spark/build/72-MapWithStateSuite-1 Diff: https://github.com/apache/spark/compare/master...spark-test:8f6945d0ccde022a23d3848f6b7fe6da1e7c902e **After** - `org.apache.spark.streaming.JavaAPISuite` Build started: [Streaming] `org.apache.spark.streaming.JavaAPISuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=3D74F2D5-B0D5-4E1D-874C-685AE694FD37&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/3D74F2D5-B0D5-4E1D-874C-685AE694FD37) Diff: https://github.com/apache/spark/compare/master...spark-test:3D74F2D5-B0D5-4E1D-874C-685AE694FD37 - `org.apache.spark.streaming.MapWithStateSuite` Build started: [Streaming] `org.apache.spark.streaming.MapWithStateSuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=C8E88B64-49F0-4157-9AFA-FC3ACC442351&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/C8E88B64-49F0-4157-9AFA-FC3ACC442351) Diff: https://github.com/apache/spark/compare/master...spark-test:C8E88B64-49F0-4157-9AFA-FC3ACC442351 Author: hyukjinkwon <gurwls223@gmail.com> Closes #15867 from HyukjinKwon/SPARK-18423.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala10
1 files changed, 10 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b9d898a723..8f55d982a9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -197,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
+ } else if (isTrackerInitialized) {
+ trackerState = Stopping
+ // `ReceivedBlockTracker` is open when this instance is created. We should
+ // close this even if this `ReceiverTracker` is not started.
+ receivedBlockTracker.stop()
+ logInfo("ReceiverTracker stopped")
+ trackerState = Stopped
}
}
@@ -446,6 +453,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
endpoint.send(StartAllReceivers(receivers))
}
+ /** Check if tracker has been marked for initiated */
+ private def isTrackerInitialized: Boolean = trackerState == Initialized
+
/** Check if tracker has been marked for starting */
private def isTrackerStarted: Boolean = trackerState == Started