From 73178c75565e20f53e6ee1478f3d976732c64438 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 27 May 2016 21:20:02 -0700 Subject: [SPARK-15633][MINOR] Make package name for Java tests consistent ## What changes were proposed in this pull request? This is a simple patch that makes package names for Java 8 test suites consistent. I moved everything to test.org.apache.spark to we can test package private APIs properly. Also added "java8" as the package name so we can easily run all the tests related to Java 8. ## How was this patch tested? This is a test only change. Author: Reynold Xin Closes #13364 from rxin/SPARK-15633. --- .../spark/sql/JavaDatasetAggregatorSuite.java | 134 +++++++++++++++++++++ .../spark/sql/JavaDatasetAggregatorSuiteBase.java | 75 ++++++++++++ .../org/apache/spark/sql/JavaSaveLoadSuite.java | 106 ++++++++++++++++ .../sql/sources/JavaDatasetAggregatorSuite.java | 134 --------------------- .../sources/JavaDatasetAggregatorSuiteBase.java | 75 ------------ .../spark/sql/sources/JavaSaveLoadSuite.java | 106 ---------------- 6 files changed, 315 insertions(+), 315 deletions(-) create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java delete mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java delete mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuiteBase.java delete mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java (limited to 'sql/core/src/test/java') diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java new file mode 100644 index 0000000000..fe86371516 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql; + +import java.util.Arrays; + +import scala.Tuple2; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.KeyValueGroupedDataset; +import org.apache.spark.sql.expressions.Aggregator; +import org.apache.spark.sql.expressions.javalang.typed; + +/** + * Suite for testing the aggregate functionality of Datasets in Java. + */ +public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { + @Test + public void testTypedAggregationAnonClass() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + + Dataset> agged = grouped.agg(new IntSumOf().toColumn()); + Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); + + Dataset> agged2 = grouped.agg(new IntSumOf().toColumn()) + .as(Encoders.tuple(Encoders.STRING(), Encoders.INT())); + Assert.assertEquals( + Arrays.asList( + new Tuple2<>("a", 3), + new Tuple2<>("b", 3)), + agged2.collectAsList()); + } + + static class IntSumOf extends Aggregator, Integer, Integer> { + @Override + public Integer zero() { + return 0; + } + + @Override + public Integer reduce(Integer l, Tuple2 t) { + return l + t._2(); + } + + @Override + public Integer merge(Integer b1, Integer b2) { + return b1 + b2; + } + + @Override + public Integer finish(Integer reduction) { + return reduction; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.INT(); + } + + @Override + public Encoder outputEncoder() { + return Encoders.INT(); + } + } + + @Test + public void testTypedAggregationAverage() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.avg( + new MapFunction, Double>() { + public Double call(Tuple2 value) throws Exception { + return (double)(value._2() * 2); + } + })); + Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList()); + } + + @Test + public void testTypedAggregationCount() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.count( + new MapFunction, Object>() { + public Object call(Tuple2 value) throws Exception { + return value; + } + })); + Assert.assertEquals(Arrays.asList(tuple2("a", 2), tuple2("b", 1)), agged.collectAsList()); + } + + @Test + public void testTypedAggregationSumDouble() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.sum( + new MapFunction, Double>() { + public Double call(Tuple2 value) throws Exception { + return (double)value._2(); + } + })); + Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList()); + } + + @Test + public void testTypedAggregationSumLong() { + KeyValueGroupedDataset> grouped = generateGroupedDataset(); + Dataset> agged = grouped.agg(typed.sumLong( + new MapFunction, Long>() { + public Long call(Tuple2 value) throws Exception { + return (long)value._2(); + } + })); + Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java new file mode 100644 index 0000000000..8fc4eff55d --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import scala.Tuple2; + +import org.junit.After; +import org.junit.Before; + +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.KeyValueGroupedDataset; +import org.apache.spark.sql.test.TestSparkSession; + +/** + * Common test base shared across this and Java8DatasetAggregatorSuite. + */ +public class JavaDatasetAggregatorSuiteBase implements Serializable { + private transient TestSparkSession spark; + + @Before + public void setUp() { + // Trigger static initializer of TestData + spark = new TestSparkSession(); + spark.loadTestData(); + } + + @After + public void tearDown() { + spark.stop(); + spark = null; + } + + protected Tuple2 tuple2(T1 t1, T2 t2) { + return new Tuple2<>(t1, t2); + } + + protected KeyValueGroupedDataset> generateGroupedDataset() { + Encoder> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT()); + List> data = + Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3)); + Dataset> ds = spark.createDataset(data, encoder); + + return ds.groupByKey( + new MapFunction, String>() { + @Override + public String call(Tuple2 value) throws Exception { + return value._1(); + } + }, + Encoders.STRING()); + } +} + diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java new file mode 100644 index 0000000000..6941c86dfc --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.Utils; + +public class JavaSaveLoadSuite { + + private transient SparkSession spark; + private transient JavaSparkContext jsc; + + File path; + Dataset df; + + private static void checkAnswer(Dataset actual, List expected) { + String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); + if (errorMessage != null) { + Assert.fail(errorMessage); + } + } + + @Before + public void setUp() throws IOException { + spark = SparkSession.builder() + .master("local[*]") + .appName("testing") + .getOrCreate(); + jsc = new JavaSparkContext(spark.sparkContext()); + + path = + Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); + if (path.exists()) { + path.delete(); + } + + List jsonObjects = new ArrayList<>(10); + for (int i = 0; i < 10; i++) { + jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); + } + JavaRDD rdd = jsc.parallelize(jsonObjects); + df = spark.read().json(rdd); + df.createOrReplaceTempView("jsonTable"); + } + + @After + public void tearDown() { + spark.stop(); + spark = null; + } + + @Test + public void saveAndLoad() { + Map options = new HashMap<>(); + options.put("path", path.toString()); + df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save(); + Dataset loadedDF = spark.read().format("json").options(options).load(); + checkAnswer(loadedDF, df.collectAsList()); + } + + @Test + public void saveAndLoadWithSchema() { + Map options = new HashMap<>(); + options.put("path", path.toString()); + df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save(); + + List fields = new ArrayList<>(); + fields.add(DataTypes.createStructField("b", DataTypes.StringType, true)); + StructType schema = DataTypes.createStructType(fields); + Dataset loadedDF = spark.read().format("json").schema(schema).options(options).load(); + + checkAnswer(loadedDF, spark.sql("SELECT b FROM jsonTable").collectAsList()); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java deleted file mode 100644 index f9842e130b..0000000000 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.sources; - -import java.util.Arrays; - -import scala.Tuple2; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.KeyValueGroupedDataset; -import org.apache.spark.sql.expressions.Aggregator; -import org.apache.spark.sql.expressions.javalang.typed; - -/** - * Suite for testing the aggregate functionality of Datasets in Java. - */ -public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { - @Test - public void testTypedAggregationAnonClass() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - - Dataset> agged = grouped.agg(new IntSumOf().toColumn()); - Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); - - Dataset> agged2 = grouped.agg(new IntSumOf().toColumn()) - .as(Encoders.tuple(Encoders.STRING(), Encoders.INT())); - Assert.assertEquals( - Arrays.asList( - new Tuple2<>("a", 3), - new Tuple2<>("b", 3)), - agged2.collectAsList()); - } - - static class IntSumOf extends Aggregator, Integer, Integer> { - @Override - public Integer zero() { - return 0; - } - - @Override - public Integer reduce(Integer l, Tuple2 t) { - return l + t._2(); - } - - @Override - public Integer merge(Integer b1, Integer b2) { - return b1 + b2; - } - - @Override - public Integer finish(Integer reduction) { - return reduction; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.INT(); - } - - @Override - public Encoder outputEncoder() { - return Encoders.INT(); - } - } - - @Test - public void testTypedAggregationAverage() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.avg( - new MapFunction, Double>() { - public Double call(Tuple2 value) throws Exception { - return (double)(value._2() * 2); - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationCount() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.count( - new MapFunction, Object>() { - public Object call(Tuple2 value) throws Exception { - return value; - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 2), tuple2("b", 1)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationSumDouble() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.sum( - new MapFunction, Double>() { - public Double call(Tuple2 value) throws Exception { - return (double)value._2(); - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationSumLong() { - KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(typed.sumLong( - new MapFunction, Long>() { - public Long call(Tuple2 value) throws Exception { - return (long)value._2(); - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); - } -} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuiteBase.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuiteBase.java deleted file mode 100644 index 059c2d9f2c..0000000000 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuiteBase.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.sources; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -import scala.Tuple2; - -import org.junit.After; -import org.junit.Before; - -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.KeyValueGroupedDataset; -import org.apache.spark.sql.test.TestSparkSession; - -/** - * Common test base shared across this and Java8DatasetAggregatorSuite. - */ -public class JavaDatasetAggregatorSuiteBase implements Serializable { - private transient TestSparkSession spark; - - @Before - public void setUp() { - // Trigger static initializer of TestData - spark = new TestSparkSession(); - spark.loadTestData(); - } - - @After - public void tearDown() { - spark.stop(); - spark = null; - } - - protected Tuple2 tuple2(T1 t1, T2 t2) { - return new Tuple2<>(t1, t2); - } - - protected KeyValueGroupedDataset> generateGroupedDataset() { - Encoder> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT()); - List> data = - Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3)); - Dataset> ds = spark.createDataset(data, encoder); - - return ds.groupByKey( - new MapFunction, String>() { - @Override - public String call(Tuple2 value) throws Exception { - return value._1(); - } - }, - Encoders.STRING()); - } -} - diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java deleted file mode 100644 index 9840bc46f9..0000000000 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.sources; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.*; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.util.Utils; - -public class JavaSaveLoadSuite { - - private transient SparkSession spark; - private transient JavaSparkContext jsc; - - File path; - Dataset df; - - private static void checkAnswer(Dataset actual, List expected) { - String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); - if (errorMessage != null) { - Assert.fail(errorMessage); - } - } - - @Before - public void setUp() throws IOException { - spark = SparkSession.builder() - .master("local[*]") - .appName("testing") - .getOrCreate(); - jsc = new JavaSparkContext(spark.sparkContext()); - - path = - Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); - if (path.exists()) { - path.delete(); - } - - List jsonObjects = new ArrayList<>(10); - for (int i = 0; i < 10; i++) { - jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); - } - JavaRDD rdd = jsc.parallelize(jsonObjects); - df = spark.read().json(rdd); - df.createOrReplaceTempView("jsonTable"); - } - - @After - public void tearDown() { - spark.stop(); - spark = null; - } - - @Test - public void saveAndLoad() { - Map options = new HashMap<>(); - options.put("path", path.toString()); - df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save(); - Dataset loadedDF = spark.read().format("json").options(options).load(); - checkAnswer(loadedDF, df.collectAsList()); - } - - @Test - public void saveAndLoadWithSchema() { - Map options = new HashMap<>(); - options.put("path", path.toString()); - df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save(); - - List fields = new ArrayList<>(); - fields.add(DataTypes.createStructField("b", DataTypes.StringType, true)); - StructType schema = DataTypes.createStructType(fields); - Dataset loadedDF = spark.read().format("json").schema(schema).options(options).load(); - - checkAnswer(loadedDF, spark.sql("SELECT b FROM jsonTable").collectAsList()); - } -} -- cgit v1.2.3