diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-01-13 22:43:28 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-13 22:43:28 -0800 |
commit | 962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7 (patch) | |
tree | fa7174220efa51f56287d32bc82a379508ee4c17 /sql/core/src/test/java | |
parent | e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984 (diff) | |
download | spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.gz spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.bz2 spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.zip |
[SPARK-12756][SQL] use hash expression in Exchange
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #10703 from cloud-fan/use-hash-expr-in-shuffle.
Diffstat (limited to 'sql/core/src/test/java')
-rw-r--r-- | sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java | 4 | ||||
-rw-r--r-- | sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java | 33 |
2 files changed, 21 insertions, 16 deletions
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 8e0b2dbca4..ac1607ba35 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -237,8 +237,8 @@ public class JavaDataFrameSuite { DataFrame crosstab = df.stat().crosstab("a", "b"); String[] columnNames = crosstab.schema().fieldNames(); Assert.assertEquals("a_b", columnNames[0]); - Assert.assertEquals("1", columnNames[1]); - Assert.assertEquals("2", columnNames[2]); + Assert.assertEquals("2", columnNames[1]); + Assert.assertEquals("1", columnNames[2]); Row[] rows = crosstab.collect(); Arrays.sort(rows, crosstabRowComparator); Integer count = 1; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 9f8db39e33..1a3df1b117 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -187,7 +187,7 @@ public class JavaDatasetSuite implements Serializable { } }, Encoders.STRING()); - Assert.assertEquals(Arrays.asList("1a", "3foobar"), mapped.collectAsList()); + Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList())); Dataset<String> flatMapped = grouped.flatMapGroups( new FlatMapGroupsFunction<Integer, String, String>() { @@ -202,7 +202,7 @@ public class JavaDatasetSuite implements Serializable { }, Encoders.STRING()); - Assert.assertEquals(Arrays.asList("1a", "3foobar"), flatMapped.collectAsList()); + Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped.collectAsList())); Dataset<Tuple2<Integer, String>> reduced = grouped.reduce(new ReduceFunction<String>() { @Override @@ -212,8 +212,8 @@ public class JavaDatasetSuite implements Serializable { }); Assert.assertEquals( - Arrays.asList(tuple2(1, "a"), tuple2(3, "foobar")), - reduced.collectAsList()); + asSet(tuple2(1, "a"), tuple2(3, "foobar")), + toSet(reduced.collectAsList())); List<Integer> data2 = Arrays.asList(2, 6, 10); Dataset<Integer> ds2 = context.createDataset(data2, Encoders.INT()); @@ -245,7 +245,7 @@ public class JavaDatasetSuite implements Serializable { }, Encoders.STRING()); - Assert.assertEquals(Arrays.asList("1a#2", "3foobar#6", "5#10"), cogrouped.collectAsList()); + Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), toSet(cogrouped.collectAsList())); } @Test @@ -268,7 +268,7 @@ public class JavaDatasetSuite implements Serializable { }, Encoders.STRING()); - Assert.assertEquals(Arrays.asList("1a", "3foobar"), mapped.collectAsList()); + Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList())); } @Test @@ -290,9 +290,7 @@ public class JavaDatasetSuite implements Serializable { List<String> data = Arrays.asList("abc", "abc", "xyz"); Dataset<String> ds = context.createDataset(data, Encoders.STRING()); - Assert.assertEquals( - Arrays.asList("abc", "xyz"), - sort(ds.distinct().collectAsList().toArray(new String[0]))); + Assert.assertEquals(asSet("abc", "xyz"), toSet(ds.distinct().collectAsList())); List<String> data2 = Arrays.asList("xyz", "foo", "foo"); Dataset<String> ds2 = context.createDataset(data2, Encoders.STRING()); @@ -302,16 +300,23 @@ public class JavaDatasetSuite implements Serializable { Dataset<String> unioned = ds.union(ds2); Assert.assertEquals( - Arrays.asList("abc", "abc", "foo", "foo", "xyz", "xyz"), - sort(unioned.collectAsList().toArray(new String[0]))); + Arrays.asList("abc", "abc", "xyz", "xyz", "foo", "foo"), + unioned.collectAsList()); Dataset<String> subtracted = ds.subtract(ds2); Assert.assertEquals(Arrays.asList("abc", "abc"), subtracted.collectAsList()); } - private <T extends Comparable<T>> List<T> sort(T[] data) { - Arrays.sort(data); - return Arrays.asList(data); + private <T> Set<T> toSet(List<T> records) { + Set<T> set = new HashSet<T>(); + for (T record : records) { + set.add(record); + } + return set; + } + + private <T> Set<T> asSet(T... records) { + return toSet(Arrays.asList(records)); } @Test |