diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-01-13 22:43:28 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-13 22:43:28 -0800 |
commit | 962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7 (patch) | |
tree | fa7174220efa51f56287d32bc82a379508ee4c17 /sql/hive | |
parent | e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984 (diff) | |
download | spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.gz spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.bz2 spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.zip |
[SPARK-12756][SQL] use hash expression in Exchange
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #10703 from cloud-fan/use-hash-expr-in-shuffle.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index b718b7cefb..3ea9826544 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalyst.expressions.{Murmur3Hash, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.Utils class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import testImplicits._ @@ -98,11 +98,12 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle val qe = readBack.select(bucketCols.map(col): _*).queryExecution val rows = qe.toRdd.map(_.copy()).collect() - val getHashCode = - UnsafeProjection.create(new Murmur3Hash(qe.analyzed.output) :: Nil, qe.analyzed.output) + val getHashCode = UnsafeProjection.create( + HashPartitioning(qe.analyzed.output, 8).partitionIdExpression :: Nil, + qe.analyzed.output) for (row <- rows) { - val actualBucketId = Utils.nonNegativeMod(getHashCode(row).getInt(0), 8) + val actualBucketId = getHashCode(row).getInt(0) assert(actualBucketId == bucketId) } } |