diff options
author | larvaboy <larvaboy@gmail.com> | 2014-05-13 21:26:08 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-05-13 21:26:08 -0700 |
commit | c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f (patch) | |
tree | 497a31ae116b285966699ef51ca975160b3845de /sql/core | |
parent | 92cebada09a7e5a00ab48bcb350a9462949c33eb (diff) | |
download | spark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.tar.gz spark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.tar.bz2 spark-c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f.zip |
Implement ApproximateCountDistinct for SparkSql
Add the implementation for ApproximateCountDistinct to SparkSql. We use the HyperLogLog algorithm implemented in stream-lib, and do the count in two phases: 1) counting the number of distinct elements in each partitions, and 2) merge the HyperLogLog results from different partitions.
A simple serializer and test cases are added as well.
Author: larvaboy <larvaboy@gmail.com>
Closes #737 from larvaboy/master and squashes the following commits:
bd8ef3f [larvaboy] Add support of user-provided standard deviation to ApproxCountDistinct.
9ba8360 [larvaboy] Fix alignment and null handling issues.
95b4067 [larvaboy] Add a test case for count distinct and approximate count distinct.
f57917d [larvaboy] Add the parser for the approximate count.
a2d5d10 [larvaboy] Add ApproximateCountDistinct aggregates and functions.
7ad273a [larvaboy] Add SparkSql serializer for HyperLogLog.
1d9aacf [larvaboy] Fix a minor typo in the toString method of the Count case class.
653542b [larvaboy] Fix a couple of minor typos.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala | 17 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 21 |
2 files changed, 36 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 1c6e29b3cd..94c2a249ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag +import com.clearspring.analytics.stream.cardinality.HyperLogLog import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Serializer, Kryo} @@ -44,6 +45,8 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co kryo.register(classOf[scala.collection.Map[_,_]], new MapSerializer) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) + kryo.register(classOf[com.clearspring.analytics.stream.cardinality.HyperLogLog], + new HyperLogLogSerializer) kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer) kryo.setReferences(false) @@ -81,6 +84,20 @@ private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] { } } +private[sql] class HyperLogLogSerializer extends Serializer[HyperLogLog] { + def write(kryo: Kryo, output: Output, hyperLogLog: HyperLogLog) { + val bytes = hyperLogLog.getBytes() + output.writeInt(bytes.length) + output.writeBytes(bytes) + } + + def read(kryo: Kryo, input: Input, tpe: Class[HyperLogLog]): HyperLogLog = { + val length = input.readInt() + val bytes = input.readBytes(length) + HyperLogLog.Builder.build(bytes) + } +} + /** * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize * them as `Array[(k,v)]`. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e966d89c30..524549eb54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -96,8 +96,25 @@ class SQLQuerySuite extends QueryTest { test("count") { checkAnswer( sql("SELECT COUNT(*) FROM testData2"), - testData2.count() - ) + testData2.count()) + } + + test("count distinct") { + checkAnswer( + sql("SELECT COUNT(DISTINCT b) FROM testData2"), + 2) + } + + test("approximate count distinct") { + checkAnswer( + sql("SELECT APPROXIMATE COUNT(DISTINCT a) FROM testData2"), + 3) + } + + test("approximate count distinct with user provided standard deviation") { + checkAnswer( + sql("SELECT APPROXIMATE(0.04) COUNT(DISTINCT a) FROM testData2"), + 3) } // No support for primitive nulls yet. |