aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-30 15:39:46 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-30 15:39:46 -0700
commit0dbd6963d589a8f6ad344273f3da7df680ada515 (patch)
treeb631e05cb48185de381418dd007494cffe80bd88 /streaming/src/main
parent89cda69ecd5ef942a68ad13fc4e1f4184010f087 (diff)
downloadspark-0dbd6963d589a8f6ad344273f3da7df680ada515.tar.gz
spark-0dbd6963d589a8f6ad344273f3da7df680ada515.tar.bz2
spark-0dbd6963d589a8f6ad344273f3da7df680ada515.zip
[SPARK-9479] [STREAMING] [TESTS] Fix ReceiverTrackerSuite failure for maven build and other potential test failures in Streaming
See https://issues.apache.org/jira/browse/SPARK-9479 for the failure cause. The PR includes the following changes: 1. Make ReceiverTrackerSuite create StreamingContext in the test body. 2. Fix places that don't stop StreamingContext. I verified no SparkContext was stopped in the shutdown hook locally after this fix. 3. Fix an issue that `ReceiverTracker.endpoint` may be null. 4. Make sure stopping SparkContext in non-main thread won't fail other tests. Author: zsxwing <zsxwing@gmail.com> Closes #7797 from zsxwing/fix-ReceiverTrackerSuite and squashes the following commits: 3a4bb98 [zsxwing] Fix another potential NPE d7497df [zsxwing] Fix ReceiverTrackerSuite; make sure StreamingContext in tests is closed
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala12
1 files changed, 9 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 6270137951..e076fb5ea1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -223,7 +223,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Signal the receivers to delete old block data
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
- endpoint.send(CleanupOldBlocks(cleanupThreshTime))
+ synchronized {
+ if (isTrackerStarted) {
+ endpoint.send(CleanupOldBlocks(cleanupThreshTime))
+ }
+ }
}
}
@@ -285,8 +289,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
/** Update a receiver's maximum ingestion rate */
- def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
- endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
+ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
+ if (isTrackerStarted) {
+ endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
+ }
}
/** Add new blocks for the given stream */