aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-05-25 00:06:42 -0700
committerReynold Xin <rxin@apache.org>2014-05-25 00:06:42 -0700
commit6052db9dc10c996215658485e805200e4f0cf549 (patch)
treec5575603da300b1dbdcce95f7bdff9457bc26094 /sql/catalyst
parent6e9fb6320bec3371bc9c010ccbc1b915f500486b (diff)
downloadspark-6052db9dc10c996215658485e805200e4f0cf549.tar.gz
spark-6052db9dc10c996215658485e805200e4f0cf549.tar.bz2
spark-6052db9dc10c996215658485e805200e4f0cf549.zip
[SPARK-1822] SchemaRDD.count() should use query optimizer
Author: Kan Zhang <kzhang@apache.org> Closes #841 from kanzhang/SPARK-1822 and squashes the following commits: 2f8072a [Kan Zhang] [SPARK-1822] Minor style update cf4baa4 [Kan Zhang] [SPARK-1822] Adding Scaladoc e67c910 [Kan Zhang] [SPARK-1822] SchemaRDD.count() should use optimizer
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 5dbaaa3b0c..1bcd4e2276 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -151,7 +151,7 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr
case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
override def references = child.references
override def nullable = false
- override def dataType = IntegerType
+ override def dataType = LongType
override def toString = s"COUNT($child)"
override def asPartial: SplitEvaluation = {
@@ -295,12 +295,12 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
case class CountFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
def this() = this(null, null) // Required for serialization.
- var count: Int = _
+ var count: Long = _
override def update(input: Row): Unit = {
val evaluatedExpr = expr.map(_.eval(input))
if (evaluatedExpr.map(_ != null).reduceLeft(_ || _)) {
- count += 1
+ count += 1L
}
}