aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-17 20:37:19 -0700
committerReynold Xin <rxin@databricks.com>2015-05-17 20:37:19 -0700
commitff71d34e00b64d70f671f9bf3e63aec39cd525e5 (patch)
treef07f4836fda6686ae235b9990586ef7afc555a29
parent2f22424e9f6624097b292cb70e00787b69d80718 (diff)
downloadspark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.tar.gz
spark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.tar.bz2
spark-ff71d34e00b64d70f671f9bf3e63aec39cd525e5.zip
[SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
Learnt a lesson from SPARK-7655: Spark should avoid to use `scala.concurrent.ExecutionContext.Implicits.global` because the user may submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` and exhaust all threads in it. This could crash Spark. So Spark should always use its own thread pools for safety. This PR removes all usages of `scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread pools to replace them. Author: zsxwing <zsxwing@gmail.com> Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits: a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128 cf4b3fc [zsxwing] Remove "import scala.concurrent.ExecutionContext.Implicits.global"
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala14
6 files changed, 58 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ed159dec4f..f3a26f54a8 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
@@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
override def onStart() {
- import scala.concurrent.ExecutionContext.Implicits.global
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
+ // This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[RegisteredExecutor.type](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
- } onComplete {
+ }(ThreadUtils.sameThread).onComplete {
+ // This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
}
case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
- }
+ }(ThreadUtils.sameThread)
}
def extractLogUrls: Map[String, String] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ec185340c3..bbf1b83af0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -19,8 +19,10 @@ package org.apache.spark.rdd
import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.util.ThreadUtils
+
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
val f = new ComplexFutureAction[Seq[T]]
f.run {
+ // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which
+ // is a cached thread pool.
val results = new ArrayBuffer[T](num)
val totalParts = self.partitions.length
var partsScanned = 0
@@ -101,7 +105,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
partsScanned += numPartsToTry
}
results.toSeq
- }
+ }(AsyncRDDActions.futureExecutionContext)
f
}
@@ -123,3 +127,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
(index, data) => Unit, Unit)
}
}
+
+private object AsyncRDDActions {
+ val futureExecutionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("AsyncRDDActions-future", 128))
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index cc794e5c90..16d67cbfca 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -21,8 +21,7 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream,
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
import scala.util.Random
@@ -77,6 +76,9 @@ private[spark] class BlockManager(
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
+ private val futureExecutionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
+
// Actual storage of where blocks are kept
private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
@@ -266,11 +268,13 @@ private[spark] class BlockManager(
asyncReregisterLock.synchronized {
if (asyncReregisterTask == null) {
asyncReregisterTask = Future[Unit] {
+ // This is a blocking action and should run in futureExecutionContext which is a cached
+ // thread pool
reregister()
asyncReregisterLock.synchronized {
asyncReregisterTask = null
}
- }
+ }(futureExecutionContext)
}
}
}
@@ -744,7 +748,11 @@ private[spark] class BlockManager(
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
- Future { replicate(blockId, bufferView, putLevel) }
+ Future {
+ // This is a blocking action and should run in futureExecutionContext which is a cached
+ // thread pool
+ replicate(blockId, bufferView, putLevel)
+ }(futureExecutionContext)
case _ => null
}
@@ -1218,6 +1226,7 @@ private[spark] class BlockManager(
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
+ futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index a85e1c7632..abcad9438b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,13 +17,14 @@
package org.apache.spark.storage
+import scala.collection.Iterable
+import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.RpcUtils
+import org.apache.spark.util.{ThreadUtils, RpcUtils}
private[spark]
class BlockManagerMaster(
@@ -102,8 +103,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
- logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
- }
+ logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
+ }(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
@@ -114,8 +115,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Exception =>
- logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
- }
+ logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
+ }(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
@@ -128,8 +129,8 @@ class BlockManagerMaster(
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" +
- s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
- }
+ s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
+ }(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
@@ -169,11 +170,17 @@ class BlockManagerMaster(
val response = driverEndpoint.
askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
- val result = Await.result(Future.sequence(futures), timeout)
- if (result == null) {
+ implicit val sameThread = ThreadUtils.sameThread
+ val cbf =
+ implicitly[
+ CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
+ Option[BlockStatus],
+ Iterable[Option[BlockStatus]]]]
+ val blockStatus = Await.result(
+ Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
+ if (blockStatus == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
- val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
}.toMap
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index fe43fc4125..b8b12be875 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -78,5 +78,5 @@ case class BroadcastHashJoin(
object BroadcastHashJoin {
private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 1024))
+ ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 4943f29395..33be067ebd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -18,14 +18,14 @@
package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
+import java.util.concurrent.CountDownLatch
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
-import java.util.concurrent.CountDownLatch
-import scala.concurrent._
-import ExecutionContext.Implicits.global
+import org.apache.spark.util.ThreadUtils
/**
* Abstract class that is responsible for supervising a Receiver in the worker.
@@ -46,6 +46,9 @@ private[streaming] abstract class ReceiverSupervisor(
// Attach the executor to the receiver
receiver.attachExecutor(this)
+ private val futureExecutionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))
+
/** Receiver id */
protected val streamId = receiver.streamId
@@ -111,6 +114,7 @@ private[streaming] abstract class ReceiverSupervisor(
stoppingError = error.orNull
stopReceiver(message, error)
onStop(message, error)
+ futureExecutionContext.shutdownNow()
stopLatch.countDown()
}
@@ -150,6 +154,8 @@ private[streaming] abstract class ReceiverSupervisor(
/** Restart receiver with delay */
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
Future {
+ // This is a blocking action so we should use "futureExecutionContext" which is a cached
+ // thread pool.
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
@@ -158,7 +164,7 @@ private[streaming] abstract class ReceiverSupervisor(
logInfo("Starting receiver again")
startReceiver()
logInfo("Receiver started again")
- }
+ }(futureExecutionContext)
}
/** Check if receiver has been marked for stopping */