aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2015-12-21 13:46:58 -0800
committerMichael Armbrust <michael@databricks.com>2015-12-21 13:46:58 -0800
commit4883a5087d481d4de5d3beabbd709853de01399a (patch)
tree63cd07b7cf6447ab0b2ca77503789ef7a7b06d2f /sql/catalyst
parent7634fe9511e1a8fb94979624b1b617b495b48ad3 (diff)
downloadspark-4883a5087d481d4de5d3beabbd709853de01399a.tar.gz
spark-4883a5087d481d4de5d3beabbd709853de01399a.tar.bz2
spark-4883a5087d481d4de5d3beabbd709853de01399a.zip
[SPARK-12374][SPARK-12150][SQL] Adding logical/physical operators for Range
Based on the suggestions from marmbrus , added logical/physical operators for Range for improving the performance. Also added another API for resolving the JIRA Spark-12150. Could you take a look at my implementation, marmbrus ? If not good, I can rework it. : ) Thank you very much! Author: gatorsmile <gatorsmile@gmail.com> Closes #10335 from gatorsmile/rangeOperators.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala32
1 files changed, 32 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index ec42b763f1..64ef4d7996 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -210,6 +210,38 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+/** Factory for constructing new `Range` nodes. */
+object Range {
+ def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
+ val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
+ new Range(start, end, step, numSlices, output)
+ }
+}
+
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ output: Seq[Attribute]) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
+ (safeEnd - safeStart) / step
+ } else {
+ // the remainder has the same sign with range, could add 1 more
+ (safeEnd - safeStart) / step + 1
+ }
+ }
+
+ override def statistics: Statistics = {
+ val sizeInBytes = LongType.defaultSize * numElements
+ Statistics( sizeInBytes = sizeInBytes )
+ }
+}
+
case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],