aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-16 00:44:29 -0700
committerReynold Xin <rxin@databricks.com>2015-05-16 00:44:29 -0700
commit47e7ffe36b8a8a246fe9af522aff480d19c0c8a6 (patch)
tree8cfa6ec81fbd0f72e5c9bf74e8b1be0b5217b150 /sql/core
parent0ac8b01a07840f199bbc79fb845762284aead6de (diff)
downloadspark-47e7ffe36b8a8a246fe9af522aff480d19c0c8a6.tar.gz
spark-47e7ffe36b8a8a246fe9af522aff480d19c0c8a6.tar.bz2
spark-47e7ffe36b8a8a246fe9af522aff480d19c0c8a6.zip
[SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'
Because both `AkkaRpcEndpointRef.ask` and `BroadcastHashJoin` uses `scala.concurrent.ExecutionContext.Implicits.global`. However, because the tasks in `BroadcastHashJoin` are usually long-running tasks, which will occupy all threads in `global`. Then `ask` cannot get a chance to process the replies. For `ask`, actually the tasks are very simple, so we can use `MoreExecutors.sameThreadExecutor()`. For `BroadcastHashJoin`, it's better to use `ThreadUtils.newDaemonCachedThreadPool`. Author: zsxwing <zsxwing@gmail.com> Closes #6200 from zsxwing/SPARK-7655-2 and squashes the following commits: cfdc605 [zsxwing] Remove redundant imort and minor doc fix cf83153 [zsxwing] Add "sameThread" and "newDaemonCachedThreadPool with maxThreadNumber" to ThreadUtils 08ad0ee [zsxwing] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala10
1 files changed, 8 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 05dd5681ed..fe43fc4125 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -18,10 +18,10 @@
package org.apache.spark.sql.execution.joins
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.ThreadUtils
import scala.concurrent._
import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Row, Expression}
@@ -64,7 +64,7 @@ case class BroadcastHashJoin(
val input: Array[Row] = buildPlan.execute().map(_.copy()).collect()
val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.length)
sparkContext.broadcast(hashed)
- }
+ }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
@@ -74,3 +74,9 @@ case class BroadcastHashJoin(
}
}
}
+
+object BroadcastHashJoin {
+
+ private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 1024))
+}