From 7f54580c4503d8b6bfcf7d4cbc83b83458140926 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 20 Sep 2014 16:30:49 -0700 Subject: [SPARK-3609][SQL] Adds sizeInBytes statistics for Limit operator when all output attributes are of native data types This helps to replace shuffled hash joins with broadcast hash joins in some cases. Author: Cheng Lian Closes #2468 from liancheng/more-stats and squashes the following commits: 32687dc [Cheng Lian] Moved the test case to PlannerSuite 5595a91 [Cheng Lian] Removes debugging code 73faf69 [Cheng Lian] Test case for auto choosing broadcast hash join f30fe1d [Cheng Lian] Adds sizeInBytes estimation for Limit when all output types are native types --- .../sql/catalyst/plans/logical/basicOperators.scala | 11 +++++++++++ .../apache/spark/sql/catalyst/types/dataTypes.scala | 10 ++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 9 +++++---- .../apache/spark/sql/execution/PlannerSuite.scala | 20 +++++++++++++++++++- 4 files changed, 45 insertions(+), 5 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 5d10754c7b..8e8259cae6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -148,6 +148,17 @@ case class Aggregate( case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { override def output = child.output + + override lazy val statistics: Statistics = + if (output.forall(_.dataType.isInstanceOf[NativeType])) { + val limit = limitExpr.eval(null).asInstanceOf[Int] + val sizeInBytes = (limit: Long) * output.map { a => + NativeType.defaultSizeOf(a.dataType.asInstanceOf[NativeType]) + }.sum + Statistics(sizeInBytes = sizeInBytes) + } else { + Statistics(sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product) + } } case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 49520b7678..e3050e5397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -122,6 +122,16 @@ object NativeType { IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) def unapply(dt: DataType): Boolean = all.contains(dt) + + val defaultSizeOf: Map[NativeType, Int] = Map( + IntegerType -> 4, + BooleanType -> 1, + LongType -> 8, + DoubleType -> 8, + FloatType -> 4, + ShortType -> 2, + ByteType -> 1, + StringType -> 4096) } trait PrimitiveType extends DataType { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 67563b6c55..15f6bcef93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{ShuffledHashJoin, BroadcastHashJoin} import org.apache.spark.sql.test._ import org.scalatest.BeforeAndAfterAll import java.util.TimeZone @@ -649,24 +650,24 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { (3, null) :: (4, 2147483644) :: Nil) } - + test("SPARK-3423 BETWEEN") { checkAnswer( sql("SELECT key, value FROM testData WHERE key BETWEEN 5 and 7"), Seq((5, "5"), (6, "6"), (7, "7")) ) - + checkAnswer( sql("SELECT key, value FROM testData WHERE key BETWEEN 7 and 7"), Seq((7, "7")) ) - + checkAnswer( sql("SELECT key, value FROM testData WHERE key BETWEEN 9 and 7"), Seq() ) } - + test("cast boolean to string") { // TODO Ensure true/false string letter casing is consistent with Hive in all cases. checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 37d64f0de7..bfbf431a11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -22,7 +22,7 @@ import org.scalatest.FunSuite import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.execution +import org.apache.spark.sql.{SQLConf, execution} import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext.planner._ @@ -57,4 +57,22 @@ class PlannerSuite extends FunSuite { val planned = HashAggregation(query) assert(planned.nonEmpty) } + + test("sizeInBytes estimation of limit operator for broadcast hash join optimization") { + val origThreshold = autoBroadcastJoinThreshold + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString) + + // Using a threshold that is definitely larger than the small testing table (b) below + val a = testData.as('a) + val b = testData.limit(3).as('b) + val planned = a.join(b, Inner, Some("a.key".attr === "b.key".attr)).queryExecution.executedPlan + + val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join } + val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join } + + assert(broadcastHashJoins.size === 1, "Should use broadcast hash join") + assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join") + + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString) + } } -- cgit v1.2.3