diff options
author | zsxwing <zsxwing@gmail.com> | 2015-05-16 00:44:29 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-05-16 00:44:29 -0700 |
commit | 47e7ffe36b8a8a246fe9af522aff480d19c0c8a6 (patch) | |
tree | 8cfa6ec81fbd0f72e5c9bf74e8b1be0b5217b150 /sql | |
parent | 0ac8b01a07840f199bbc79fb845762284aead6de (diff) | |
download | spark-47e7ffe36b8a8a246fe9af522aff480d19c0c8a6.tar.gz spark-47e7ffe36b8a8a246fe9af522aff480d19c0c8a6.tar.bz2 spark-47e7ffe36b8a8a246fe9af522aff480d19c0c8a6.zip |
[SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'
Because both `AkkaRpcEndpointRef.ask` and `BroadcastHashJoin` uses `scala.concurrent.ExecutionContext.Implicits.global`. However, because the tasks in `BroadcastHashJoin` are usually long-running tasks, which will occupy all threads in `global`. Then `ask` cannot get a chance to process the replies.
For `ask`, actually the tasks are very simple, so we can use `MoreExecutors.sameThreadExecutor()`. For `BroadcastHashJoin`, it's better to use `ThreadUtils.newDaemonCachedThreadPool`.
Author: zsxwing <zsxwing@gmail.com>
Closes #6200 from zsxwing/SPARK-7655-2 and squashes the following commits:
cfdc605 [zsxwing] Remove redundant imort and minor doc fix
cf83153 [zsxwing] Add "sameThread" and "newDaemonCachedThreadPool with maxThreadNumber" to ThreadUtils
08ad0ee [zsxwing] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 05dd5681ed..fe43fc4125 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.rdd.RDD +import org.apache.spark.util.ThreadUtils import scala.concurrent._ import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Row, Expression} @@ -64,7 +64,7 @@ case class BroadcastHashJoin( val input: Array[Row] = buildPlan.execute().map(_.copy()).collect() val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.length) sparkContext.broadcast(hashed) - } + }(BroadcastHashJoin.broadcastHashJoinExecutionContext) protected override def doExecute(): RDD[Row] = { val broadcastRelation = Await.result(broadcastFuture, timeout) @@ -74,3 +74,9 @@ case class BroadcastHashJoin( } } } + +object BroadcastHashJoin { + + private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 1024)) +} |