aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/java
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-04-05 21:22:20 -0500
committerReynold Xin <rxin@databricks.com>2016-04-05 21:22:20 -0500
commit7d29c72f64f8637d8182fb7c495f87ab7ce86ea0 (patch)
tree23ad6102159b3cbd3c7e41dcb1a916831a00bdda /sql/core/src/test/java
parent1146c534d6c3806f3e920043ba06838ef02cd7e8 (diff)
downloadspark-7d29c72f64f8637d8182fb7c495f87ab7ce86ea0.tar.gz
spark-7d29c72f64f8637d8182fb7c495f87ab7ce86ea0.tar.bz2
spark-7d29c72f64f8637d8182fb7c495f87ab7ce86ea0.zip
[SPARK-14359] Unit tests for java 8 lambda syntax with typed aggregates
## What changes were proposed in this pull request? Adds unit tests for java 8 lambda syntax with typed aggregates as a follow-up to #12168 ## How was this patch tested? Unit tests. Author: Eric Liang <ekl@databricks.com> Closes #12181 from ericl/sc-2794-2.
Diffstat (limited to 'sql/core/src/test/java')
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java86
1 files changed, 45 insertions, 41 deletions
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java
index c8d0eecd5c..594f4675bd 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java
@@ -41,46 +41,7 @@ import org.apache.spark.sql.test.TestSQLContext;
/**
* Suite for testing the aggregate functionality of Datasets in Java.
*/
-public class JavaDatasetAggregatorSuite implements Serializable {
- private transient JavaSparkContext jsc;
- private transient TestSQLContext context;
-
- @Before
- public void setUp() {
- // Trigger static initializer of TestData
- SparkContext sc = new SparkContext("local[*]", "testing");
- jsc = new JavaSparkContext(sc);
- context = new TestSQLContext(sc);
- context.loadTestData();
- }
-
- @After
- public void tearDown() {
- context.sparkContext().stop();
- context = null;
- jsc = null;
- }
-
- private <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
- return new Tuple2<>(t1, t2);
- }
-
- private KeyValueGroupedDataset<String, Tuple2<String, Integer>> generateGroupedDataset() {
- Encoder<Tuple2<String, Integer>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT());
- List<Tuple2<String, Integer>> data =
- Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3));
- Dataset<Tuple2<String, Integer>> ds = context.createDataset(data, encoder);
-
- return ds.groupByKey(
- new MapFunction<Tuple2<String, Integer>, String>() {
- @Override
- public String call(Tuple2<String, Integer> value) throws Exception {
- return value._1();
- }
- },
- Encoders.STRING());
- }
-
+public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase {
@Test
public void testTypedAggregationAnonClass() {
KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
@@ -100,7 +61,6 @@ public class JavaDatasetAggregatorSuite implements Serializable {
}
static class IntSumOf extends Aggregator<Tuple2<String, Integer>, Integer, Integer> {
-
@Override
public Integer zero() {
return 0;
@@ -170,3 +130,47 @@ public class JavaDatasetAggregatorSuite implements Serializable {
Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList());
}
}
+
+/**
+ * Common test base shared across this and Java8DatasetAggregatorSuite.
+ */
+class JavaDatasetAggregatorSuiteBase implements Serializable {
+ protected transient JavaSparkContext jsc;
+ protected transient TestSQLContext context;
+
+ @Before
+ public void setUp() {
+ // Trigger static initializer of TestData
+ SparkContext sc = new SparkContext("local[*]", "testing");
+ jsc = new JavaSparkContext(sc);
+ context = new TestSQLContext(sc);
+ context.loadTestData();
+ }
+
+ @After
+ public void tearDown() {
+ context.sparkContext().stop();
+ context = null;
+ jsc = null;
+ }
+
+ protected <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
+ return new Tuple2<>(t1, t2);
+ }
+
+ protected KeyValueGroupedDataset<String, Tuple2<String, Integer>> generateGroupedDataset() {
+ Encoder<Tuple2<String, Integer>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT());
+ List<Tuple2<String, Integer>> data =
+ Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3));
+ Dataset<Tuple2<String, Integer>> ds = context.createDataset(data, encoder);
+
+ return ds.groupByKey(
+ new MapFunction<Tuple2<String, Integer>, String>() {
+ @Override
+ public String call(Tuple2<String, Integer> value) throws Exception {
+ return value._1();
+ }
+ },
+ Encoders.STRING());
+ }
+}