aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-07-16 11:07:16 -0700
committerReynold Xin <rxin@apache.org>2014-07-16 11:07:16 -0700
commitfc7edc9e76f97b25e456ae7b72ef8636656f4f1a (patch)
treec5798dcb2ddc412c147c177f6e6398bae4046a4b /core/src/main/scala
parent1c5739f68510c2336bf6cb3e18aea03d85988bfb (diff)
downloadspark-fc7edc9e76f97b25e456ae7b72ef8636656f4f1a.tar.gz
spark-fc7edc9e76f97b25e456ae7b72ef8636656f4f1a.tar.bz2
spark-fc7edc9e76f97b25e456ae7b72ef8636656f4f1a.zip
SPARK-2519. Eliminate pattern-matching on Tuple2 in performance-critical...
... aggregation code Author: Sandy Ryza <sandy@cloudera.com> Closes #1435 from sryza/sandy-spark-2519 and squashes the following commits: 640706a [Sandy Ryza] SPARK-2519. Eliminate pattern-matching on Tuple2 in performance-critical aggregation code
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala12
2 files changed, 11 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 59fdf659c9..1d640579ef 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -56,8 +56,8 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
- val (k, v) = iter.next()
- combiners.insert(k, v)
+ val pair = iter.next()
+ combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
@@ -85,8 +85,8 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
- val (k, c) = iter.next()
- combiners.insert(k, c)
+ val pair = iter.next()
+ combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 292d0962f4..765254bf4c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -268,10 +268,10 @@ class ExternalAppendOnlyMap[K, V, C](
private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
var i = 0
while (i < buffer.pairs.length) {
- val (k, c) = buffer.pairs(i)
- if (k == key) {
+ val pair = buffer.pairs(i)
+ if (pair._1 == key) {
buffer.pairs.remove(i)
- return mergeCombiners(baseCombiner, c)
+ return mergeCombiners(baseCombiner, pair._2)
}
i += 1
}
@@ -293,9 +293,11 @@ class ExternalAppendOnlyMap[K, V, C](
}
// Select a key from the StreamBuffer that holds the lowest key hash
val minBuffer = mergeHeap.dequeue()
- val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
+ val minPairs = minBuffer.pairs
+ val minHash = minBuffer.minKeyHash
val minPair = minPairs.remove(0)
- var (minKey, minCombiner) = minPair
+ val minKey = minPair._1
+ var minCombiner = minPair._2
assert(getKeyHashCode(minPair) == minHash)
// For all other streams that may have this key (i.e. have the same minimum key hash),