aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-09-20 16:30:49 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-20 16:30:49 -0700
commit7f54580c4503d8b6bfcf7d4cbc83b83458140926 (patch)
tree8d78103241777b4698483bb4b16b10199cc5f72c /sql
parent7c8ad1c0838762f5b632f683834c88a711aef4dd (diff)
downloadspark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.tar.gz
spark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.tar.bz2
spark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.zip
[SPARK-3609][SQL] Adds sizeInBytes statistics for Limit operator when all output attributes are of native data types
This helps to replace shuffled hash joins with broadcast hash joins in some cases. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2468 from liancheng/more-stats and squashes the following commits: 32687dc [Cheng Lian] Moved the test case to PlannerSuite 5595a91 [Cheng Lian] Removes debugging code 73faf69 [Cheng Lian] Test case for auto choosing broadcast hash join f30fe1d [Cheng Lian] Adds sizeInBytes estimation for Limit when all output types are native types
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala20
4 files changed, 45 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d10754c7b..8e8259cae6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -148,6 +148,17 @@ case class Aggregate(
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
override def output = child.output
+
+ override lazy val statistics: Statistics =
+ if (output.forall(_.dataType.isInstanceOf[NativeType])) {
+ val limit = limitExpr.eval(null).asInstanceOf[Int]
+ val sizeInBytes = (limit: Long) * output.map { a =>
+ NativeType.defaultSizeOf(a.dataType.asInstanceOf[NativeType])
+ }.sum
+ Statistics(sizeInBytes = sizeInBytes)
+ } else {
+ Statistics(sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
+ }
}
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 49520b7678..e3050e5397 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -122,6 +122,16 @@ object NativeType {
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
def unapply(dt: DataType): Boolean = all.contains(dt)
+
+ val defaultSizeOf: Map[NativeType, Int] = Map(
+ IntegerType -> 4,
+ BooleanType -> 1,
+ LongType -> 8,
+ DoubleType -> 8,
+ FloatType -> 4,
+ ShortType -> 2,
+ ByteType -> 1,
+ StringType -> 4096)
}
trait PrimitiveType extends DataType {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 67563b6c55..15f6bcef93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{ShuffledHashJoin, BroadcastHashJoin}
import org.apache.spark.sql.test._
import org.scalatest.BeforeAndAfterAll
import java.util.TimeZone
@@ -649,24 +650,24 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
(3, null) ::
(4, 2147483644) :: Nil)
}
-
+
test("SPARK-3423 BETWEEN") {
checkAnswer(
sql("SELECT key, value FROM testData WHERE key BETWEEN 5 and 7"),
Seq((5, "5"), (6, "6"), (7, "7"))
)
-
+
checkAnswer(
sql("SELECT key, value FROM testData WHERE key BETWEEN 7 and 7"),
Seq((7, "7"))
)
-
+
checkAnswer(
sql("SELECT key, value FROM testData WHERE key BETWEEN 9 and 7"),
Seq()
)
}
-
+
test("cast boolean to string") {
// TODO Ensure true/false string letter casing is consistent with Hive in all cases.
checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 37d64f0de7..bfbf431a11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.execution
+import org.apache.spark.sql.{SQLConf, execution}
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.planner._
@@ -57,4 +57,22 @@ class PlannerSuite extends FunSuite {
val planned = HashAggregation(query)
assert(planned.nonEmpty)
}
+
+ test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
+ val origThreshold = autoBroadcastJoinThreshold
+ setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
+
+ // Using a threshold that is definitely larger than the small testing table (b) below
+ val a = testData.as('a)
+ val b = testData.limit(3).as('b)
+ val planned = a.join(b, Inner, Some("a.key".attr === "b.key".attr)).queryExecution.executedPlan
+
+ val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
+ val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+
+ assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
+ assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+
+ setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+ }
}