aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-07-20 01:24:32 -0700
committerReynold Xin <rxin@apache.org>2014-07-20 01:24:32 -0700
commit98ab4112255d4e0fdb6e084bd3fe65807c5b209b (patch)
tree4d9957c2ef533bfb1729c1fa8d19c5e8d3d0b0cf /core
parent4da01e3813f0a0413fe691358c14278bbd5508ed (diff)
downloadspark-98ab4112255d4e0fdb6e084bd3fe65807c5b209b.tar.gz
spark-98ab4112255d4e0fdb6e084bd3fe65807c5b209b.tar.bz2
spark-98ab4112255d4e0fdb6e084bd3fe65807c5b209b.zip
SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical section...
...s of CoGroupedRDD and PairRDDFunctions This also removes an unnecessary tuple creation in cogroup. Author: Sandy Ryza <sandy@cloudera.com> Closes #1447 from sryza/sandy-spark-2519-2 and squashes the following commits: b6d9699 [Sandy Ryza] Remove missed Tuple2 match in CoGroupedRDD a109828 [Sandy Ryza] Remove another pattern matching in MappedValuesRDD and revert some changes in PairRDDFunctions be10f8a [Sandy Ryza] SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sections of CoGroupedRDD and PairRDDFunctions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala60
3 files changed, 33 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 5366c1a1cc..aca235a62a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
- value match { case (v, depNum) => newCombiner(depNum) += v }
+ newCombiner(value._2) += value._1
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
- value match { case (v, depNum) => combiner(depNum) += v }
+ combiner(value._2) += value._1
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
index 2bc47eb9fc..a60952eee5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
@@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
override val partitioner = firstParent[Product2[K, U]].partitioner
override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
- firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
+ firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 29038b0359..a6b9204672 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
- iter.foreach { case (k, v) =>
- val old = map.get(k)
- map.put(k, if (old == null) v else func(old, v))
+ iter.foreach { pair =>
+ val old = map.get(pair._1)
+ map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
- m2.foreach { case (k, v) =>
- val old = m1.get(k)
- m1.put(k, if (old == null) v else func(old, v))
+ m2.foreach { pair =>
+ val old = m1.get(pair._1)
+ m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
m1
} : JHashMap[K, V]
@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
- this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- for (v <- vs; w <- ws) yield (v, w)
- }
+ this.cogroup(other, partitioner).flatMapValues( pair =>
+ for (v <- pair._1; w <- pair._2) yield (v, w)
+ )
}
/**
@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
- this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- if (ws.isEmpty) {
- vs.map(v => (v, None))
+ this.cogroup(other, partitioner).flatMapValues { pair =>
+ if (pair._2.isEmpty) {
+ pair._1.map(v => (v, None))
} else {
- for (v <- vs; w <- ws) yield (v, Some(w))
+ for (v <- pair._1; w <- pair._2) yield (v, Some(w))
}
}
}
@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
- this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- if (vs.isEmpty) {
- ws.map(w => (None, w))
+ this.cogroup(other, partitioner).flatMapValues { pair =>
+ if (pair._1.isEmpty) {
+ pair._2.map(w => (None, w))
} else {
- for (v <- vs; w <- ws) yield (Some(v), w)
+ for (v <- pair._1; w <- pair._2) yield (Some(v), w)
}
}
}
@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
- data.foreach { case (k, v) => map.put(k, v) }
+ data.foreach { pair => map.put(pair._1, pair._2) }
map
}
@@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
- (vs.asInstanceOf[Seq[V]],
- w1s.asInstanceOf[Seq[W1]],
- w2s.asInstanceOf[Seq[W2]],
- w3s.asInstanceOf[Seq[W3]])
+ (vs.asInstanceOf[Seq[V]],
+ w1s.asInstanceOf[Seq[W1]],
+ w2s.asInstanceOf[Seq[W2]],
+ w3s.asInstanceOf[Seq[W3]])
}
}
@@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
- cg.mapValues { case Seq(vs, ws) =>
- (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+ cg.mapValues { case Seq(vs, w1s) =>
+ (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
}
}
@@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]],
- w1s.asInstanceOf[Seq[W1]],
- w2s.asInstanceOf[Seq[W2]])
+ w1s.asInstanceOf[Seq[W1]],
+ w2s.asInstanceOf[Seq[W2]])
}
}
@@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
- for ((k, v) <- it if k == key) {
- buf += v
+ for (pair <- it if pair._1 == key) {
+ buf += pair._2
}
buf
} : Seq[V]
@@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
while (iter.hasNext) {
- val (k, v) = iter.next()
- writer.write(k, v)
+ val pair = iter.next()
+ writer.write(pair._1, pair._2)
}
} finally {
writer.close(hadoopContext)