aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-01 17:41:55 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-01 17:41:55 -0700
commitebc25a4ddfe07a67668217cec59893bc3b8cf730 (patch)
treedb4c85e869140c20ff1199042b91002273b96887
parent98e7045805282988da907793a844fa53d4c293c9 (diff)
downloadspark-ebc25a4ddfe07a67668217cec59893bc3b8cf730.tar.gz
spark-ebc25a4ddfe07a67668217cec59893bc3b8cf730.tar.bz2
spark-ebc25a4ddfe07a67668217cec59893bc3b8cf730.zip
[SPARK-7309] [CORE] [STREAMING] Shutdown the thread pools in ReceivedBlockHandler and DAGScheduler
Shutdown the thread pools in ReceivedBlockHandler and DAGScheduler when stopping them. Author: zsxwing <zsxwing@gmail.com> Closes #5845 from zsxwing/SPARK-7309 and squashes the following commits: 6c004fd [zsxwing] Shutdown the thread pools in ReceivedBlockHandler and DAGScheduler
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala1
2 files changed, 2 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 05b8ab0d0a..5d812918a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1399,6 +1399,7 @@ class DAGScheduler(
def stop() {
logInfo("Stopping DAGScheduler")
+ messageScheduler.shutdownNow()
eventProcessLoop.stop()
taskScheduler.stop()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 4b3d9ee4b0..651b534ac1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -190,6 +190,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
def stop() {
writeAheadLog.close()
+ executionContext.shutdown()
}
}