aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2016-11-29 15:01:12 -0800
committerYin Huai <yhuai@databricks.com>2016-11-29 15:01:12 -0800
commitf8878a4c6f7c4ebb16e4aef26ad0869ba12eb9fc (patch)
treefda5146965a7165fa709a9cb666392fcaf92c1c8 /sql/core/src/test
parentd57a594b8b4cb1a6942dbd2fd30c3a97e0dd031e (diff)
downloadspark-f8878a4c6f7c4ebb16e4aef26ad0869ba12eb9fc.tar.gz
spark-f8878a4c6f7c4ebb16e4aef26ad0869ba12eb9fc.tar.bz2
spark-f8878a4c6f7c4ebb16e4aef26ad0869ba12eb9fc.zip
[SPARK-18631][SQL] Changed ExchangeCoordinator re-partitioning to avoid more data skew
## What changes were proposed in this pull request? Re-partitioning logic in ExchangeCoordinator changed so that adding another pre-shuffle partition to the post-shuffle partition will not be done if doing so would cause the size of the post-shuffle partition to exceed the target partition size. ## How was this patch tested? Existing tests updated to reflect new expectations. Author: Mark Hamstra <markhamstra@gmail.com> Closes #16065 from markhamstra/SPARK-17064.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala40
1 files changed, 20 insertions, 20 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index 2803b62462..06bce9a240 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -85,7 +85,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
{
// There are a few large pre-shuffle partitions.
val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
- val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4)
+ val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
}
@@ -146,7 +146,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// 2 post-shuffle partition are needed.
val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
- val expectedPartitionStartIndices = Array[Int](0, 3)
+ val expectedPartitionStartIndices = Array[Int](0, 2, 4)
checkEstimation(
coordinator,
Array(bytesByPartitionId1, bytesByPartitionId2),
@@ -154,10 +154,10 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
}
{
- // 2 post-shuffle partition are needed.
+ // 4 post-shuffle partition are needed.
val bytesByPartitionId1 = Array[Long](0, 99, 0, 20, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
- val expectedPartitionStartIndices = Array[Int](0, 2)
+ val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
coordinator,
Array(bytesByPartitionId1, bytesByPartitionId2),
@@ -168,7 +168,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// 2 post-shuffle partition are needed.
val bytesByPartitionId1 = Array[Long](0, 100, 0, 30, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
- val expectedPartitionStartIndices = Array[Int](0, 2, 4)
+ val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
coordinator,
Array(bytesByPartitionId1, bytesByPartitionId2),
@@ -179,7 +179,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// There are a few large pre-shuffle partitions.
val bytesByPartitionId1 = Array[Long](0, 100, 40, 30, 0)
val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
- val expectedPartitionStartIndices = Array[Int](0, 2, 3)
+ val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(
coordinator,
Array(bytesByPartitionId1, bytesByPartitionId2),
@@ -228,7 +228,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// The number of post-shuffle partitions is determined by the coordinator.
val bytesByPartitionId1 = Array[Long](10, 50, 20, 80, 20)
val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
- val expectedPartitionStartIndices = Array[Int](0, 2, 4)
+ val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4)
checkEstimation(
coordinator,
Array(bytesByPartitionId1, bytesByPartitionId2),
@@ -272,13 +272,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "-1")
}
- val spark = SparkSession.builder
+ val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
try f(spark) finally spark.stop()
}
- Seq(Some(3), None).foreach { minNumPostShufflePartitions =>
+ Seq(Some(5), None).foreach { minNumPostShufflePartitions =>
val testNameNote = minNumPostShufflePartitions match {
case Some(numPartitions) => "(minNumPostShufflePartitions: 3)"
case None => ""
@@ -290,7 +290,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
spark
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 20 as key", "id as value")
- val agg = df.groupBy("key").count
+ val agg = df.groupBy("key").count()
// Check the answer first.
checkAnswer(
@@ -308,7 +308,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
exchanges.foreach {
case e: ShuffleExchange =>
assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 3)
+ assert(e.outputPartitioning.numPartitions === 5)
case o =>
}
@@ -316,7 +316,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
exchanges.foreach {
case e: ShuffleExchange =>
assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 2)
+ assert(e.outputPartitioning.numPartitions === 3)
case o =>
}
}
@@ -359,7 +359,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
exchanges.foreach {
case e: ShuffleExchange =>
assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 3)
+ assert(e.outputPartitioning.numPartitions === 5)
case o =>
}
@@ -383,14 +383,14 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 500 as key1", "id as value1")
.groupBy("key1")
- .count
+ .count()
.toDF("key1", "cnt1")
val df2 =
spark
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 500 as key2", "id as value2")
.groupBy("key2")
- .count
+ .count()
.toDF("key2", "cnt2")
val join = df1.join(df2, col("key1") === col("key2")).select(col("key1"), col("cnt2"))
@@ -415,13 +415,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
exchanges.foreach {
case e: ShuffleExchange =>
assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 3)
+ assert(e.outputPartitioning.numPartitions === 5)
case o =>
}
case None =>
assert(exchanges.forall(_.coordinator.isDefined))
- assert(exchanges.map(_.outputPartitioning.numPartitions).toSeq.toSet === Set(1, 2))
+ assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(2, 3))
}
}
@@ -435,7 +435,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 500 as key1", "id as value1")
.groupBy("key1")
- .count
+ .count()
.toDF("key1", "cnt1")
val df2 =
spark
@@ -467,13 +467,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
exchanges.foreach {
case e: ShuffleExchange =>
assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 3)
+ assert(e.outputPartitioning.numPartitions === 5)
case o =>
}
case None =>
assert(exchanges.forall(_.coordinator.isDefined))
- assert(exchanges.map(_.outputPartitioning.numPartitions).toSeq.toSet === Set(2, 3))
+ assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(5, 3))
}
}