aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorlarvaboy <larvaboy@gmail.com>2014-05-13 21:26:08 -0700
committerReynold Xin <rxin@apache.org>2014-05-13 21:26:08 -0700
commitc33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f (patch)
tree497a31ae116b285966699ef51ca975160b3845de /sql/catalyst
parent92cebada09a7e5a00ab48bcb350a9462949c33eb (diff)
downloadspark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.tar.gz
spark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.tar.bz2
spark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.zip
Implement ApproximateCountDistinct for SparkSql
Add the implementation for ApproximateCountDistinct to SparkSql. We use the HyperLogLog algorithm implemented in stream-lib, and do the count in two phases: 1) counting the number of distinct elements in each partitions, and 2) merge the HyperLogLog results from different partitions. A simple serializer and test cases are added as well. Author: larvaboy <larvaboy@gmail.com> Closes #737 from larvaboy/master and squashes the following commits: bd8ef3f [larvaboy] Add support of user-provided standard deviation to ApproxCountDistinct. 9ba8360 [larvaboy] Fix alignment and null handling issues. 95b4067 [larvaboy] Add a test case for count distinct and approximate count distinct. f57917d [larvaboy] Add the parser for the approximate count. a2d5d10 [larvaboy] Add ApproximateCountDistinct aggregates and functions. 7ad273a [larvaboy] Add SparkSql serializer for HyperLogLog. 1d9aacf [larvaboy] Fix a minor typo in the toString method of the Count case class. 653542b [larvaboy] Fix a couple of minor typos.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala78
2 files changed, 83 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index b3a3a1ef1b..f2b9b2c1a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -93,6 +93,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val AND = Keyword("AND")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
+ protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AVG = Keyword("AVG")
protected val BY = Keyword("BY")
protected val CAST = Keyword("CAST")
@@ -318,6 +319,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
COUNT ~> "(" ~ "*" <~ ")" ^^ { case _ => Count(Literal(1)) } |
COUNT ~> "(" ~ expression <~ ")" ^^ { case dist ~ exp => Count(exp) } |
COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => CountDistinct(exp :: Nil) } |
+ APPROXIMATE ~> COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ {
+ case exp => ApproxCountDistinct(exp)
+ } |
+ APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ COUNT ~ "(" ~ DISTINCT ~ expression <~ ")" ^^ {
+ case s ~ _ ~ _ ~ _ ~ _ ~ e => ApproxCountDistinct(e, s.toDouble)
+ } |
FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } |
AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } |
MIN ~> "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } |
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 7777d37290..5dbaaa3b0c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
+
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.errors.TreeNodeException
@@ -146,7 +148,6 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr
override def eval(input: Row): Any = currentMax
}
-
case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
override def references = child.references
override def nullable = false
@@ -166,10 +167,47 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi
override def references = expressions.flatMap(_.references).toSet
override def nullable = false
override def dataType = IntegerType
- override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})"
+ override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")})"
override def newInstance() = new CountDistinctFunction(expressions, this)
}
+case class ApproxCountDistinctPartition(child: Expression, relativeSD: Double)
+ extends AggregateExpression with trees.UnaryNode[Expression] {
+ override def references = child.references
+ override def nullable = false
+ override def dataType = child.dataType
+ override def toString = s"APPROXIMATE COUNT(DISTINCT $child)"
+ override def newInstance() = new ApproxCountDistinctPartitionFunction(child, this, relativeSD)
+}
+
+case class ApproxCountDistinctMerge(child: Expression, relativeSD: Double)
+ extends AggregateExpression with trees.UnaryNode[Expression] {
+ override def references = child.references
+ override def nullable = false
+ override def dataType = IntegerType
+ override def toString = s"APPROXIMATE COUNT(DISTINCT $child)"
+ override def newInstance() = new ApproxCountDistinctMergeFunction(child, this, relativeSD)
+}
+
+case class ApproxCountDistinct(child: Expression, relativeSD: Double = 0.05)
+ extends PartialAggregate with trees.UnaryNode[Expression] {
+ override def references = child.references
+ override def nullable = false
+ override def dataType = IntegerType
+ override def toString = s"APPROXIMATE COUNT(DISTINCT $child)"
+
+ override def asPartial: SplitEvaluation = {
+ val partialCount =
+ Alias(ApproxCountDistinctPartition(child, relativeSD), "PartialApproxCountDistinct")()
+
+ SplitEvaluation(
+ ApproxCountDistinctMerge(partialCount.toAttribute, relativeSD),
+ partialCount :: Nil)
+ }
+
+ override def newInstance() = new CountDistinctFunction(child :: Nil, this)
+}
+
case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
override def references = child.references
override def nullable = false
@@ -269,6 +307,42 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag
override def eval(input: Row): Any = count
}
+case class ApproxCountDistinctPartitionFunction(
+ expr: Expression,
+ base: AggregateExpression,
+ relativeSD: Double)
+ extends AggregateFunction {
+ def this() = this(null, null, 0) // Required for serialization.
+
+ private val hyperLogLog = new HyperLogLog(relativeSD)
+
+ override def update(input: Row): Unit = {
+ val evaluatedExpr = expr.eval(input)
+ if (evaluatedExpr != null) {
+ hyperLogLog.offer(evaluatedExpr)
+ }
+ }
+
+ override def eval(input: Row): Any = hyperLogLog
+}
+
+case class ApproxCountDistinctMergeFunction(
+ expr: Expression,
+ base: AggregateExpression,
+ relativeSD: Double)
+ extends AggregateFunction {
+ def this() = this(null, null, 0) // Required for serialization.
+
+ private val hyperLogLog = new HyperLogLog(relativeSD)
+
+ override def update(input: Row): Unit = {
+ val evaluatedExpr = expr.eval(input)
+ hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog])
+ }
+
+ override def eval(input: Row): Any = hyperLogLog.cardinality()
+}
+
case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
def this() = this(null, null) // Required for serialization.