aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-01-13 22:43:28 -0800
committerReynold Xin <rxin@databricks.com>2016-01-13 22:43:28 -0800
commit962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7 (patch)
treefa7174220efa51f56287d32bc82a379508ee4c17 /sql/hive
parente2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984 (diff)
downloadspark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.gz
spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.bz2
spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.zip
[SPARK-12756][SQL] use hash expression in Exchange
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one. This PR also fixes the tests that are broken by the new hash behaviour in shuffle. Author: Wenchen Fan <wenchen@databricks.com> Closes #10703 from cloud-fan/use-hash-expr-in-shuffle.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala11
1 files changed, 6 insertions, 5 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index b718b7cefb..3ea9826544 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.sources
import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest}
-import org.apache.spark.sql.catalyst.expressions.{Murmur3Hash, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.Utils
class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import testImplicits._
@@ -98,11 +98,12 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
val qe = readBack.select(bucketCols.map(col): _*).queryExecution
val rows = qe.toRdd.map(_.copy()).collect()
- val getHashCode =
- UnsafeProjection.create(new Murmur3Hash(qe.analyzed.output) :: Nil, qe.analyzed.output)
+ val getHashCode = UnsafeProjection.create(
+ HashPartitioning(qe.analyzed.output, 8).partitionIdExpression :: Nil,
+ qe.analyzed.output)
for (row <- rows) {
- val actualBucketId = Utils.nonNegativeMod(getHashCode(row).getInt(0), 8)
+ val actualBucketId = getHashCode(row).getInt(0)
assert(actualBucketId == bucketId)
}
}