aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala21
2 files changed, 36 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 1c6e29b3cd..94c2a249ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import scala.reflect.ClassTag
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}
@@ -44,6 +45,8 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
kryo.register(classOf[scala.collection.Map[_,_]], new MapSerializer)
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
+ kryo.register(classOf[com.clearspring.analytics.stream.cardinality.HyperLogLog],
+ new HyperLogLogSerializer)
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
kryo.setReferences(false)
@@ -81,6 +84,20 @@ private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {
}
}
+private[sql] class HyperLogLogSerializer extends Serializer[HyperLogLog] {
+ def write(kryo: Kryo, output: Output, hyperLogLog: HyperLogLog) {
+ val bytes = hyperLogLog.getBytes()
+ output.writeInt(bytes.length)
+ output.writeBytes(bytes)
+ }
+
+ def read(kryo: Kryo, input: Input, tpe: Class[HyperLogLog]): HyperLogLog = {
+ val length = input.readInt()
+ val bytes = input.readBytes(length)
+ HyperLogLog.Builder.build(bytes)
+ }
+}
+
/**
* Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
* them as `Array[(k,v)]`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e966d89c30..524549eb54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -96,8 +96,25 @@ class SQLQuerySuite extends QueryTest {
test("count") {
checkAnswer(
sql("SELECT COUNT(*) FROM testData2"),
- testData2.count()
- )
+ testData2.count())
+ }
+
+ test("count distinct") {
+ checkAnswer(
+ sql("SELECT COUNT(DISTINCT b) FROM testData2"),
+ 2)
+ }
+
+ test("approximate count distinct") {
+ checkAnswer(
+ sql("SELECT APPROXIMATE COUNT(DISTINCT a) FROM testData2"),
+ 3)
+ }
+
+ test("approximate count distinct with user provided standard deviation") {
+ checkAnswer(
+ sql("SELECT APPROXIMATE(0.04) COUNT(DISTINCT a) FROM testData2"),
+ 3)
}
// No support for primitive nulls yet.