diff options
author | maji2014 <maji3@asiainfo.com> | 2014-12-09 13:13:12 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2014-12-09 13:13:12 -0800 |
commit | b31074466a83d3d1387fc1e4337dfab9e164fc04 (patch) | |
tree | 8c25eb24d1949e4cd5dfdc938230fd15bbdb5610 | |
parent | 61f1a7022767e64ab092aa91c0c5aa1b2fdbef7c (diff) | |
download | spark-b31074466a83d3d1387fc1e4337dfab9e164fc04.tar.gz spark-b31074466a83d3d1387fc1e4337dfab9e164fc04.tar.bz2 spark-b31074466a83d3d1387fc1e4337dfab9e164fc04.zip |
[SPARK-4691][shuffle] Restructure a few lines in shuffle code
In HashShuffleReader.scala and HashShuffleWriter.scala, no need to judge "dep.aggregator.isEmpty" again as this is judged by "dep.aggregator.isDefined"
In SortShuffleWriter.scala, "dep.aggregator.isEmpty" is better than "!dep.aggregator.isDefined" ?
Author: maji2014 <maji3@asiainfo.com>
Closes #3553 from maji2014/spark-4691 and squashes the following commits:
bf7b14d [maji2014] change a elegant way for SortShuffleWriter.scala
10d0cf0 [maji2014] change a elegant way
d8f52dc [maji2014] code optimization for judgement
3 files changed, 4 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 5baf45db45..de72148ccc 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -45,9 +45,9 @@ private[spark] class HashShuffleReader[K, C]( } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { - throw new IllegalStateException("Aggregator is empty for map-side combine") } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") + // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 183a30373b..755f17d6aa 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -56,9 +56,8 @@ private[spark] class HashShuffleWriter[K, V]( } else { records } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { - throw new IllegalStateException("Aggregator is empty for map-side combine") } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") records } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index d75f9d7311..27496c5a28 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -50,9 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C]( /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { - if (!dep.aggregator.isDefined) { - throw new IllegalStateException("Aggregator is empty for map-side combine") - } + require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) |