aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authormaji2014 <maji3@asiainfo.com>2014-12-09 13:13:12 -0800
committerAndrew Or <andrew@databricks.com>2014-12-09 13:13:12 -0800
commitb31074466a83d3d1387fc1e4337dfab9e164fc04 (patch)
tree8c25eb24d1949e4cd5dfdc938230fd15bbdb5610 /core
parent61f1a7022767e64ab092aa91c0c5aa1b2fdbef7c (diff)
downloadspark-b31074466a83d3d1387fc1e4337dfab9e164fc04.tar.gz
spark-b31074466a83d3d1387fc1e4337dfab9e164fc04.tar.bz2
spark-b31074466a83d3d1387fc1e4337dfab9e164fc04.zip
[SPARK-4691][shuffle] Restructure a few lines in shuffle code
In HashShuffleReader.scala and HashShuffleWriter.scala, no need to judge "dep.aggregator.isEmpty" again as this is judged by "dep.aggregator.isDefined" In SortShuffleWriter.scala, "dep.aggregator.isEmpty" is better than "!dep.aggregator.isDefined" ? Author: maji2014 <maji3@asiainfo.com> Closes #3553 from maji2014/spark-4691 and squashes the following commits: bf7b14d [maji2014] change a elegant way for SortShuffleWriter.scala 10d0cf0 [maji2014] change a elegant way d8f52dc [maji2014] code optimization for judgement
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala4
3 files changed, 4 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index 5baf45db45..de72148ccc 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -45,9 +45,9 @@ private[spark] class HashShuffleReader[K, C](
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
- } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
- throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
+ require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
+
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 183a30373b..755f17d6aa 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -56,9 +56,8 @@ private[spark] class HashShuffleWriter[K, V](
} else {
records
}
- } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
- throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
+ require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index d75f9d7311..27496c5a28 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -50,9 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C](
/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
- if (!dep.aggregator.isDefined) {
- throw new IllegalStateException("Aggregator is empty for map-side combine")
- }
+ require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)