aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala32
1 files changed, 32 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index ec42b763f1..64ef4d7996 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -210,6 +210,38 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+/** Factory for constructing new `Range` nodes. */
+object Range {
+ def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
+ val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
+ new Range(start, end, step, numSlices, output)
+ }
+}
+
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ output: Seq[Attribute]) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
+ (safeEnd - safeStart) / step
+ } else {
+ // the remainder has the same sign with range, could add 1 more
+ (safeEnd - safeStart) / step + 1
+ }
+ }
+
+ override def statistics: Statistics = {
+ val sizeInBytes = LongType.defaultSize * numElements
+ Statistics( sizeInBytes = sizeInBytes )
+ }
+}
+
case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],