diff options
author | zsxwing <zsxwing@gmail.com> | 2015-07-30 15:39:46 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-30 15:39:46 -0700 |
commit | 0dbd6963d589a8f6ad344273f3da7df680ada515 (patch) | |
tree | b631e05cb48185de381418dd007494cffe80bd88 /streaming/src/main | |
parent | 89cda69ecd5ef942a68ad13fc4e1f4184010f087 (diff) | |
download | spark-0dbd6963d589a8f6ad344273f3da7df680ada515.tar.gz spark-0dbd6963d589a8f6ad344273f3da7df680ada515.tar.bz2 spark-0dbd6963d589a8f6ad344273f3da7df680ada515.zip |
[SPARK-9479] [STREAMING] [TESTS] Fix ReceiverTrackerSuite failure for maven build and other potential test failures in Streaming
See https://issues.apache.org/jira/browse/SPARK-9479 for the failure cause.
The PR includes the following changes:
1. Make ReceiverTrackerSuite create StreamingContext in the test body.
2. Fix places that don't stop StreamingContext. I verified no SparkContext was stopped in the shutdown hook locally after this fix.
3. Fix an issue that `ReceiverTracker.endpoint` may be null.
4. Make sure stopping SparkContext in non-main thread won't fail other tests.
Author: zsxwing <zsxwing@gmail.com>
Closes #7797 from zsxwing/fix-ReceiverTrackerSuite and squashes the following commits:
3a4bb98 [zsxwing] Fix another potential NPE
d7497df [zsxwing] Fix ReceiverTrackerSuite; make sure StreamingContext in tests is closed
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 6270137951..e076fb5ea1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -223,7 +223,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Signal the receivers to delete old block data if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logInfo(s"Cleanup old received batch data: $cleanupThreshTime") - endpoint.send(CleanupOldBlocks(cleanupThreshTime)) + synchronized { + if (isTrackerStarted) { + endpoint.send(CleanupOldBlocks(cleanupThreshTime)) + } + } } } @@ -285,8 +289,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } /** Update a receiver's maximum ingestion rate */ - def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { - endpoint.send(UpdateReceiverRateLimit(streamUID, newRate)) + def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized { + if (isTrackerStarted) { + endpoint.send(UpdateReceiverRateLimit(streamUID, newRate)) + } } /** Add new blocks for the given stream */ |