aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorlarvaboy <larvaboy@gmail.com>2014-05-13 21:26:08 -0700
committerReynold Xin <rxin@apache.org>2014-05-13 21:26:08 -0700
commitc33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f (patch)
tree497a31ae116b285966699ef51ca975160b3845de /sql/core
parent92cebada09a7e5a00ab48bcb350a9462949c33eb (diff)
downloadspark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.tar.gz
spark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.tar.bz2
spark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.zip
Implement ApproximateCountDistinct for SparkSql
Add the implementation for ApproximateCountDistinct to SparkSql. We use the HyperLogLog algorithm implemented in stream-lib, and do the count in two phases: 1) counting the number of distinct elements in each partitions, and 2) merge the HyperLogLog results from different partitions. A simple serializer and test cases are added as well. Author: larvaboy <larvaboy@gmail.com> Closes #737 from larvaboy/master and squashes the following commits: bd8ef3f [larvaboy] Add support of user-provided standard deviation to ApproxCountDistinct. 9ba8360 [larvaboy] Fix alignment and null handling issues. 95b4067 [larvaboy] Add a test case for count distinct and approximate count distinct. f57917d [larvaboy] Add the parser for the approximate count. a2d5d10 [larvaboy] Add ApproximateCountDistinct aggregates and functions. 7ad273a [larvaboy] Add SparkSql serializer for HyperLogLog. 1d9aacf [larvaboy] Fix a minor typo in the toString method of the Count case class. 653542b [larvaboy] Fix a couple of minor typos.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala21
2 files changed, 36 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 1c6e29b3cd..94c2a249ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import scala.reflect.ClassTag
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}
@@ -44,6 +45,8 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
kryo.register(classOf[scala.collection.Map[_,_]], new MapSerializer)
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
+ kryo.register(classOf[com.clearspring.analytics.stream.cardinality.HyperLogLog],
+ new HyperLogLogSerializer)
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
kryo.setReferences(false)
@@ -81,6 +84,20 @@ private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {
}
}
+private[sql] class HyperLogLogSerializer extends Serializer[HyperLogLog] {
+ def write(kryo: Kryo, output: Output, hyperLogLog: HyperLogLog) {
+ val bytes = hyperLogLog.getBytes()
+ output.writeInt(bytes.length)
+ output.writeBytes(bytes)
+ }
+
+ def read(kryo: Kryo, input: Input, tpe: Class[HyperLogLog]): HyperLogLog = {
+ val length = input.readInt()
+ val bytes = input.readBytes(length)
+ HyperLogLog.Builder.build(bytes)
+ }
+}
+
/**
* Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
* them as `Array[(k,v)]`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e966d89c30..524549eb54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -96,8 +96,25 @@ class SQLQuerySuite extends QueryTest {
test("count") {
checkAnswer(
sql("SELECT COUNT(*) FROM testData2"),
- testData2.count()
- )
+ testData2.count())
+ }
+
+ test("count distinct") {
+ checkAnswer(
+ sql("SELECT COUNT(DISTINCT b) FROM testData2"),
+ 2)
+ }
+
+ test("approximate count distinct") {
+ checkAnswer(
+ sql("SELECT APPROXIMATE COUNT(DISTINCT a) FROM testData2"),
+ 3)
+ }
+
+ test("approximate count distinct with user provided standard deviation") {
+ checkAnswer(
+ sql("SELECT APPROXIMATE(0.04) COUNT(DISTINCT a) FROM testData2"),
+ 3)
}
// No support for primitive nulls yet.