diff options
author | gatorsmile <gatorsmile@gmail.com> | 2015-12-21 13:46:58 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-12-21 13:46:58 -0800 |
commit | 4883a5087d481d4de5d3beabbd709853de01399a (patch) | |
tree | 63cd07b7cf6447ab0b2ca77503789ef7a7b06d2f /sql/catalyst | |
parent | 7634fe9511e1a8fb94979624b1b617b495b48ad3 (diff) | |
download | spark-4883a5087d481d4de5d3beabbd709853de01399a.tar.gz spark-4883a5087d481d4de5d3beabbd709853de01399a.tar.bz2 spark-4883a5087d481d4de5d3beabbd709853de01399a.zip |
[SPARK-12374][SPARK-12150][SQL] Adding logical/physical operators for Range
Based on the suggestions from marmbrus , added logical/physical operators for Range for improving the performance.
Also added another API for resolving the JIRA Spark-12150.
Could you take a look at my implementation, marmbrus ? If not good, I can rework it. : )
Thank you very much!
Author: gatorsmile <gatorsmile@gmail.com>
Closes #10335 from gatorsmile/rangeOperators.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index ec42b763f1..64ef4d7996 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -210,6 +210,38 @@ case class Sort( override def output: Seq[Attribute] = child.output } +/** Factory for constructing new `Range` nodes. */ +object Range { + def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = { + val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes + new Range(start, end, step, numSlices, output) + } +} + +case class Range( + start: Long, + end: Long, + step: Long, + numSlices: Int, + output: Seq[Attribute]) extends LeafNode { + require(step != 0, "step cannot be 0") + val numElements: BigInt = { + val safeStart = BigInt(start) + val safeEnd = BigInt(end) + if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) { + (safeEnd - safeStart) / step + } else { + // the remainder has the same sign with range, could add 1 more + (safeEnd - safeStart) / step + 1 + } + } + + override def statistics: Statistics = { + val sizeInBytes = LongType.defaultSize * numElements + Statistics( sizeInBytes = sizeInBytes ) + } +} + case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], |