diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-04 11:00:15 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-01-04 11:00:15 -0800 |
commit | 43706bf8bdfe08010bb11848788e0718d15363b3 (patch) | |
tree | bc36f5cf3c54701c21976336afbff35869275390 /streaming/src/main | |
parent | b504b6a90a95a723210beb0031ed41a75d702f66 (diff) | |
download | spark-43706bf8bdfe08010bb11848788e0718d15363b3.tar.gz spark-43706bf8bdfe08010bb11848788e0718d15363b3.tar.bz2 spark-43706bf8bdfe08010bb11848788e0718d15363b3.zip |
[SPARK-12608][STREAMING] Remove submitJobThreadPool since submitJob doesn't create a separate thread to wait for the job result
Before #9264, submitJob would create a separate thread to wait for the job result. `submitJobThreadPool` was a workaround in `ReceiverTracker` to run these waiting-job-result threads. Now #9264 has been merged to master and resolved this blocking issue, `submitJobThreadPool` can be removed now.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10560 from zsxwing/remove-submitJobThreadPool.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 7 |
1 files changed, 1 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 9ddf176aee..678f1dc950 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -435,10 +435,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** RpcEndpoint to receive messages from the receivers. */ private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { - // TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged - private val submitJobThreadPool = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool")) - private val walBatchingThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool")) @@ -610,12 +606,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } - }(submitJobThreadPool) + }(ThreadUtils.sameThread) logInfo(s"Receiver ${receiver.streamId} started") } override def onStop(): Unit = { - submitJobThreadPool.shutdownNow() active = false walBatchingThreadPool.shutdown() } |