diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-11-15 15:44:15 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-15 15:44:15 -0800 |
commit | 503378f10ca92064034aa88e0feebe4718af8bbe (patch) | |
tree | 129c627c4df14410277b6c9cae26c10b93aae7ba | |
parent | 1ae4652b7e1f77a984b8459c778cb06c814192c5 (diff) | |
download | spark-503378f10ca92064034aa88e0feebe4718af8bbe.tar.gz spark-503378f10ca92064034aa88e0feebe4718af8bbe.tar.bz2 spark-503378f10ca92064034aa88e0feebe4718af8bbe.zip |
[SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when stopped even if it was not started
## What changes were proposed in this pull request?
Several tests are being failed on Windows due to the failure of removing the checkpoint dir between each tests.
This is caused by not closed file in `ReceiverTracker`. When it is not started, it does not close it even if `stop()` is called.
```
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery started
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\1478983663710-0, took 3.828 sec
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)
at org.apache.spark.util.Utils.deleteRecursively(Utils.scala)
at org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery(JavaAPISuite.java:1809)
...
```
```
- mapWithState - basic operations with simple API (7 seconds, 640 milliseconds)
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.MapWithStateSuite *** ABORTED *** (12 seconds, 688 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\streaming\checkpoint\spark-b8486e2b-6468-4e6f-bb24-88277d2c033c
...
```
## How was this patch tested?
Tests in `JavaAPISuite` and `MapWithStateSuite`.
Manually tested via AppVeyor:
**Before**
- `org.apache.spark.streaming.JavaAPISuite`
Build: https://ci.appveyor.com/project/spark-test/spark/build/71-MapWithStateSuite-1
Diff: https://github.com/apache/spark/compare/master...spark-test:188c828e682ec45b75d15c3dfc782bcdc8ce024c
- `org.apache.spark.streaming.MapWithStateSuite`
Build: https://ci.appveyor.com/project/spark-test/spark/build/72-MapWithStateSuite-1
Diff: https://github.com/apache/spark/compare/master...spark-test:8f6945d0ccde022a23d3848f6b7fe6da1e7c902e
**After**
- `org.apache.spark.streaming.JavaAPISuite`
Build started: [Streaming] `org.apache.spark.streaming.JavaAPISuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=3D74F2D5-B0D5-4E1D-874C-685AE694FD37&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/3D74F2D5-B0D5-4E1D-874C-685AE694FD37)
Diff: https://github.com/apache/spark/compare/master...spark-test:3D74F2D5-B0D5-4E1D-874C-685AE694FD37
- `org.apache.spark.streaming.MapWithStateSuite`
Build started: [Streaming] `org.apache.spark.streaming.MapWithStateSuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=C8E88B64-49F0-4157-9AFA-FC3ACC442351&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/C8E88B64-49F0-4157-9AFA-FC3ACC442351)
Diff: https://github.com/apache/spark/compare/master...spark-test:C8E88B64-49F0-4157-9AFA-FC3ACC442351
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #15867 from HyukjinKwon/SPARK-18423.
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index b9d898a723..8f55d982a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -197,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") trackerState = Stopped + } else if (isTrackerInitialized) { + trackerState = Stopping + // `ReceivedBlockTracker` is open when this instance is created. We should + // close this even if this `ReceiverTracker` is not started. + receivedBlockTracker.stop() + logInfo("ReceiverTracker stopped") + trackerState = Stopped } } @@ -446,6 +453,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false endpoint.send(StartAllReceivers(receivers)) } + /** Check if tracker has been marked for initiated */ + private def isTrackerInitialized: Boolean = trackerState == Initialized + /** Check if tracker has been marked for starting */ private def isTrackerStarted: Boolean = trackerState == Started |