aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-09-20 16:30:49 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-20 16:30:49 -0700
commit7f54580c4503d8b6bfcf7d4cbc83b83458140926 (patch)
tree8d78103241777b4698483bb4b16b10199cc5f72c /sql/catalyst/src
parent7c8ad1c0838762f5b632f683834c88a711aef4dd (diff)
downloadspark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.tar.gz
spark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.tar.bz2
spark-7f54580c4503d8b6bfcf7d4cbc83b83458140926.zip
[SPARK-3609][SQL] Adds sizeInBytes statistics for Limit operator when all output attributes are of native data types
This helps to replace shuffled hash joins with broadcast hash joins in some cases. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2468 from liancheng/more-stats and squashes the following commits: 32687dc [Cheng Lian] Moved the test case to PlannerSuite 5595a91 [Cheng Lian] Removes debugging code 73faf69 [Cheng Lian] Test case for auto choosing broadcast hash join f30fe1d [Cheng Lian] Adds sizeInBytes estimation for Limit when all output types are native types
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala10
2 files changed, 21 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d10754c7b..8e8259cae6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -148,6 +148,17 @@ case class Aggregate(
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
override def output = child.output
+
+ override lazy val statistics: Statistics =
+ if (output.forall(_.dataType.isInstanceOf[NativeType])) {
+ val limit = limitExpr.eval(null).asInstanceOf[Int]
+ val sizeInBytes = (limit: Long) * output.map { a =>
+ NativeType.defaultSizeOf(a.dataType.asInstanceOf[NativeType])
+ }.sum
+ Statistics(sizeInBytes = sizeInBytes)
+ } else {
+ Statistics(sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
+ }
}
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 49520b7678..e3050e5397 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -122,6 +122,16 @@ object NativeType {
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
def unapply(dt: DataType): Boolean = all.contains(dt)
+
+ val defaultSizeOf: Map[NativeType, Int] = Map(
+ IntegerType -> 4,
+ BooleanType -> 1,
+ LongType -> 8,
+ DoubleType -> 8,
+ FloatType -> 4,
+ ShortType -> 2,
+ ByteType -> 1,
+ StringType -> 4096)
}
trait PrimitiveType extends DataType {