aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-01-26 20:12:34 -0800
committerReynold Xin <rxin@databricks.com>2016-01-26 20:12:34 -0800
commitce38a35b764397fcf561ac81de6da96579f5c13e (patch)
tree0f03dfb31f4840488fabc75d5b4edbdc7eb0d874 /sql/core/src/test
parente7f9199e709c46a6b5ad6b03c9ecf12cc19e3a41 (diff)
downloadspark-ce38a35b764397fcf561ac81de6da96579f5c13e.tar.gz
spark-ce38a35b764397fcf561ac81de6da96579f5c13e.tar.bz2
spark-ce38a35b764397fcf561ac81de6da96579f5c13e.zip
[SPARK-12935][SQL] DataFrame API for Count-Min Sketch
This PR integrates Count-Min Sketch from spark-sketch into DataFrame. This version resorts to `RDD.aggregate` for building the sketch. A more performant UDAF version can be built in future follow-up PRs. Author: Cheng Lian <lian@databricks.com> Closes #10911 from liancheng/cms-df-api.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala36
2 files changed, 63 insertions, 1 deletions
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index ac1607ba35..9cf94e72d3 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -35,9 +35,10 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
-import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.test.TestSQLContext;
import org.apache.spark.sql.types.*;
+import org.apache.spark.util.sketch.CountMinSketch;
+import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.types.DataTypes.*;
public class JavaDataFrameSuite {
@@ -321,4 +322,29 @@ public class JavaDataFrameSuite {
Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
Assert.assertEquals(5L, df2.count());
}
+
+ @Test
+ public void testCountMinSketch() {
+ DataFrame df = context.range(1000);
+
+ CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42);
+ Assert.assertEquals(sketch1.totalCount(), 1000);
+ Assert.assertEquals(sketch1.depth(), 10);
+ Assert.assertEquals(sketch1.width(), 20);
+
+ CountMinSketch sketch2 = df.stat().countMinSketch(col("id"), 10, 20, 42);
+ Assert.assertEquals(sketch2.totalCount(), 1000);
+ Assert.assertEquals(sketch2.depth(), 10);
+ Assert.assertEquals(sketch2.width(), 20);
+
+ CountMinSketch sketch3 = df.stat().countMinSketch("id", 0.001, 0.99, 42);
+ Assert.assertEquals(sketch3.totalCount(), 1000);
+ Assert.assertEquals(sketch3.relativeError(), 0.001, 1e-4);
+ Assert.assertEquals(sketch3.confidence(), 0.99, 5e-3);
+
+ CountMinSketch sketch4 = df.stat().countMinSketch(col("id"), 0.001, 0.99, 42);
+ Assert.assertEquals(sketch4.totalCount(), 1000);
+ Assert.assertEquals(sketch4.relativeError(), 0.001, 1e-4);
+ Assert.assertEquals(sketch4.confidence(), 0.99, 5e-3);
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 63ad6c439a..8f3ea5a286 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -19,8 +19,11 @@ package org.apache.spark.sql
import java.util.Random
+import org.scalatest.Matchers._
+
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.DoubleType
class DataFrameStatSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -210,4 +213,37 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
sampled.groupBy("key").count().orderBy("key"),
Seq(Row(0, 6), Row(1, 11)))
}
+
+ // This test case only verifies that `DataFrame.countMinSketch()` methods do return
+ // `CountMinSketch`es that meet required specs. Test cases for `CountMinSketch` can be found in
+ // `CountMinSketchSuite` in project spark-sketch.
+ test("countMinSketch") {
+ val df = sqlContext.range(1000)
+
+ val sketch1 = df.stat.countMinSketch("id", depth = 10, width = 20, seed = 42)
+ assert(sketch1.totalCount() === 1000)
+ assert(sketch1.depth() === 10)
+ assert(sketch1.width() === 20)
+
+ val sketch2 = df.stat.countMinSketch($"id", depth = 10, width = 20, seed = 42)
+ assert(sketch2.totalCount() === 1000)
+ assert(sketch2.depth() === 10)
+ assert(sketch2.width() === 20)
+
+ val sketch3 = df.stat.countMinSketch("id", eps = 0.001, confidence = 0.99, seed = 42)
+ assert(sketch3.totalCount() === 1000)
+ assert(sketch3.relativeError() === 0.001)
+ assert(sketch3.confidence() === 0.99 +- 5e-3)
+
+ val sketch4 = df.stat.countMinSketch($"id", eps = 0.001, confidence = 0.99, seed = 42)
+ assert(sketch4.totalCount() === 1000)
+ assert(sketch4.relativeError() === 0.001 +- 1e04)
+ assert(sketch4.confidence() === 0.99 +- 5e-3)
+
+ intercept[IllegalArgumentException] {
+ df.select('id cast DoubleType as 'id)
+ .stat
+ .countMinSketch('id, depth = 10, width = 20, seed = 42)
+ }
+ }
}