aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-17 20:37:19 -0700
committerReynold Xin <rxin@databricks.com>2015-05-17 20:37:19 -0700
commitff71d34e00b64d70f671f9bf3e63aec39cd525e5 (patch)
treef07f4836fda6686ae235b9990586ef7afc555a29 /streaming
parent2f22424e9f6624097b292cb70e00787b69d80718 (diff)
downloadspark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.tar.gz
spark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.tar.bz2
spark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.zip
[SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
Learnt a lesson from SPARK-7655: Spark should avoid to use `scala.concurrent.ExecutionContext.Implicits.global` because the user may submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` and exhaust all threads in it. This could crash Spark. So Spark should always use its own thread pools for safety. This PR removes all usages of `scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread pools to replace them. Author: zsxwing <zsxwing@gmail.com> Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits: a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128 cf4b3fc [zsxwing] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala14
1 files changed, 10 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 4943f29395..33be067ebd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -18,14 +18,14 @@
package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
+import java.util.concurrent.CountDownLatch
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
-import java.util.concurrent.CountDownLatch
-import scala.concurrent._
-import ExecutionContext.Implicits.global
+import org.apache.spark.util.ThreadUtils
/**
* Abstract class that is responsible for supervising a Receiver in the worker.
@@ -46,6 +46,9 @@ private[streaming] abstract class ReceiverSupervisor(
// Attach the executor to the receiver
receiver.attachExecutor(this)
+ private val futureExecutionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))
+
/** Receiver id */
protected val streamId = receiver.streamId
@@ -111,6 +114,7 @@ private[streaming] abstract class ReceiverSupervisor(
stoppingError = error.orNull
stopReceiver(message, error)
onStop(message, error)
+ futureExecutionContext.shutdownNow()
stopLatch.countDown()
}
@@ -150,6 +154,8 @@ private[streaming] abstract class ReceiverSupervisor(
/** Restart receiver with delay */
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
Future {
+ // This is a blocking action so we should use "futureExecutionContext" which is a cached
+ // thread pool.
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
@@ -158,7 +164,7 @@ private[streaming] abstract class ReceiverSupervisor(
logInfo("Starting receiver again")
startReceiver()
logInfo("Receiver started again")
- }
+ }(futureExecutionContext)
}
/** Check if receiver has been marked for stopping */