aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala7
1 files changed, 1 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 9ddf176aee..678f1dc950 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -435,10 +435,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** RpcEndpoint to receive messages from the receivers. */
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
- // TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
- private val submitJobThreadPool = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))
-
private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
@@ -610,12 +606,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
- }(submitJobThreadPool)
+ }(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
}
override def onStop(): Unit = {
- submitJobThreadPool.shutdownNow()
active = false
walBatchingThreadPool.shutdown()
}