aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJacky Li <jacky.likun@huawei.com>2014-12-16 15:34:59 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-16 15:34:59 -0800
commitfa66ef6c97e87c9255b67b03836a4ba50598ebae (patch)
tree22c45c07db924fa4b53b8b97917d5e06499322a6 /sql
parenta66c23e134a0b1ad9540626fb7436d70d577d929 (diff)
downloadspark-fa66ef6c97e87c9255b67b03836a4ba50598ebae.tar.gz
spark-fa66ef6c97e87c9255b67b03836a4ba50598ebae.tar.bz2
spark-fa66ef6c97e87c9255b67b03836a4ba50598ebae.zip
[SPARK-4269][SQL] make wait time configurable in BroadcastHashJoin
In BroadcastHashJoin, currently it is using a hard coded value (5 minutes) to wait for the execution and broadcast of the small table. In my opinion, it should be a configurable value since broadcast may exceed 5 minutes in some case, like in a busy/congested network environment. Author: Jacky Li <jacky.likun@huawei.com> Closes #3133 from jackylk/timeout-config and squashes the following commits: 733ac08 [Jacky Li] add spark.sql.broadcastTimeout in SQLConf.scala 557acd4 [Jacky Li] switch to sqlContext.getConf 81a5e20 [Jacky Li] make wait time configurable in BroadcastHashJoin
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala11
2 files changed, 17 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9697beb132..f5abf71d6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -38,6 +38,7 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
+ val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
@@ -148,6 +149,12 @@ private[sql] trait SQLConf {
private[spark] def columnNameOfCorruptRecord: String =
getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
+ /**
+ * Timeout in seconds for the broadcast wait time in hash join
+ */
+ private[spark] def broadcastTimeout: Int =
+ getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 5cf2a785ad..fbe1d06ed2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -42,6 +42,15 @@ case class BroadcastHashJoin(
right: SparkPlan)
extends BinaryNode with HashJoin {
+ val timeout = {
+ val timeoutValue = sqlContext.broadcastTimeout
+ if (timeoutValue < 0) {
+ Duration.Inf
+ } else {
+ timeoutValue.seconds
+ }
+ }
+
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
override def requiredChildDistribution =
@@ -56,7 +65,7 @@ case class BroadcastHashJoin(
}
override def execute() = {
- val broadcastRelation = Await.result(broadcastFuture, 5.minute)
+ val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)