diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-09-20 16:30:49 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-09-20 16:30:49 -0700 |
commit | 7f54580c4503d8b6bfcf7d4cbc83b83458140926 (patch) | |
tree | 8d78103241777b4698483bb4b16b10199cc5f72c /sql/catalyst | |
parent | 7c8ad1c0838762f5b632f683834c88a711aef4dd (diff) | |
download | spark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.tar.gz spark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.tar.bz2 spark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.zip |
[SPARK-3609][SQL] Adds sizeInBytes statistics for Limit operator when all output attributes are of native data types
This helps to replace shuffled hash joins with broadcast hash joins in some cases.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #2468 from liancheng/more-stats and squashes the following commits:
32687dc [Cheng Lian] Moved the test case to PlannerSuite
5595a91 [Cheng Lian] Removes debugging code
73faf69 [Cheng Lian] Test case for auto choosing broadcast hash join
f30fe1d [Cheng Lian] Adds sizeInBytes estimation for Limit when all output types are native types
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 11 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala | 10 |
2 files changed, 21 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 5d10754c7b..8e8259cae6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -148,6 +148,17 @@ case class Aggregate( case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { override def output = child.output + + override lazy val statistics: Statistics = + if (output.forall(_.dataType.isInstanceOf[NativeType])) { + val limit = limitExpr.eval(null).asInstanceOf[Int] + val sizeInBytes = (limit: Long) * output.map { a => + NativeType.defaultSizeOf(a.dataType.asInstanceOf[NativeType]) + }.sum + Statistics(sizeInBytes = sizeInBytes) + } else { + Statistics(sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product) + } } case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 49520b7678..e3050e5397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -122,6 +122,16 @@ object NativeType { IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) def unapply(dt: DataType): Boolean = all.contains(dt) + + val defaultSizeOf: Map[NativeType, Int] = Map( + IntegerType -> 4, + BooleanType -> 1, + LongType -> 8, + DoubleType -> 8, + FloatType -> 4, + ShortType -> 2, + ByteType -> 1, + StringType -> 4096) } trait PrimitiveType extends DataType { |