aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-04 11:00:15 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-04 11:00:15 -0800
commit43706bf8bdfe08010bb11848788e0718d15363b3 (patch)
treebc36f5cf3c54701c21976336afbff35869275390 /streaming
parentb504b6a90a95a723210beb0031ed41a75d702f66 (diff)
downloadspark-43706bf8bdfe08010bb11848788e0718d15363b3.tar.gz
spark-43706bf8bdfe08010bb11848788e0718d15363b3.tar.bz2
spark-43706bf8bdfe08010bb11848788e0718d15363b3.zip
[SPARK-12608][STREAMING] Remove submitJobThreadPool since submitJob doesn't create a separate thread to wait for the job result
Before #9264, submitJob would create a separate thread to wait for the job result. `submitJobThreadPool` was a workaround in `ReceiverTracker` to run these waiting-job-result threads. Now #9264 has been merged to master and resolved this blocking issue, `submitJobThreadPool` can be removed now. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10560 from zsxwing/remove-submitJobThreadPool.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala7
1 files changed, 1 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 9ddf176aee..678f1dc950 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -435,10 +435,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** RpcEndpoint to receive messages from the receivers. */
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
- // TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
- private val submitJobThreadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))
-
private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
@@ -610,12 +606,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
- }(submitJobThreadPool)
+ }(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
}
override def onStop(): Unit = {
- submitJobThreadPool.shutdownNow()
active = false
walBatchingThreadPool.shutdown()
}