diff options
author | zsxwing <zsxwing@gmail.com> | 2015-05-17 20:37:19 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-05-17 20:37:19 -0700 |
commit | ff71d34e00b64d70f671f9bf3e63aec39cd525e5 (patch) | |
tree | f07f4836fda6686ae235b9990586ef7afc555a29 /streaming/src | |
parent | 2f22424e9f6624097b292cb70e00787b69d80718 (diff) | |
download | spark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.tar.gz spark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.tar.bz2 spark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.zip |
[SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
Learnt a lesson from SPARK-7655: Spark should avoid to use `scala.concurrent.ExecutionContext.Implicits.global` because the user may submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` and exhaust all threads in it. This could crash Spark. So Spark should always use its own thread pools for safety.
This PR removes all usages of `scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread pools to replace them.
Author: zsxwing <zsxwing@gmail.com>
Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits:
a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128
cf4b3fc [zsxwing] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 4943f29395..33be067ebd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -18,14 +18,14 @@ package org.apache.spark.streaming.receiver import java.nio.ByteBuffer +import java.util.concurrent.CountDownLatch import scala.collection.mutable.ArrayBuffer +import scala.concurrent._ import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.StreamBlockId -import java.util.concurrent.CountDownLatch -import scala.concurrent._ -import ExecutionContext.Implicits.global +import org.apache.spark.util.ThreadUtils /** * Abstract class that is responsible for supervising a Receiver in the worker. @@ -46,6 +46,9 @@ private[streaming] abstract class ReceiverSupervisor( // Attach the executor to the receiver receiver.attachExecutor(this) + private val futureExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128)) + /** Receiver id */ protected val streamId = receiver.streamId @@ -111,6 +114,7 @@ private[streaming] abstract class ReceiverSupervisor( stoppingError = error.orNull stopReceiver(message, error) onStop(message, error) + futureExecutionContext.shutdownNow() stopLatch.countDown() } @@ -150,6 +154,8 @@ private[streaming] abstract class ReceiverSupervisor( /** Restart receiver with delay */ def restartReceiver(message: String, error: Option[Throwable], delay: Int) { Future { + // This is a blocking action so we should use "futureExecutionContext" which is a cached + // thread pool. logWarning("Restarting receiver with delay " + delay + " ms: " + message, error.getOrElse(null)) stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) @@ -158,7 +164,7 @@ private[streaming] abstract class ReceiverSupervisor( logInfo("Starting receiver again") startReceiver() logInfo("Receiver started again") - } + }(futureExecutionContext) } /** Check if receiver has been marked for stopping */ |