aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-17 16:33:30 -0700
committerReynold Xin <rxin@apache.org>2014-07-17 16:33:30 -0700
commit26c428acb7049d683a9879d8380ef4ebf03923b9 (patch)
treec951e435e4d95d4d0d227cf99a407f56d7191b13
parent3bb5d2f8a08285989ea91c039adc7978fc2efae0 (diff)
downloadspark-26c428acb7049d683a9879d8380ef4ebf03923b9.tar.gz
spark-26c428acb7049d683a9879d8380ef4ebf03923b9.tar.bz2
spark-26c428acb7049d683a9879d8380ef4ebf03923b9.zip
[SPARK-2534] Avoid pulling in the entire RDD in various operators (branch-1.0 backport)
This backports #1450 into branch-1.0. Author: Reynold Xin <rxin@apache.org> Closes #1469 from rxin/closure-1.0 and squashes the following commits: b474a92 [Reynold Xin] [SPARK-2534] Avoid pulling in the entire RDD in various operators
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala12
2 files changed, 28 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0d3793de38..8ad93443b5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -132,7 +132,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
- def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+ val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
}
@@ -175,22 +175,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("reduceByKeyLocally() does not support array keys")
}
- def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+ val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
}
Iterator(map)
- }
+ } : Iterator[JHashMap[K, V]]
- def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+ val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
m1
- }
+ } : JHashMap[K, V]
self.mapPartitions(reducePartition).reduce(mergeMaps)
}
@@ -273,11 +273,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
+ val createCombiner = (v: V) => ArrayBuffer(v)
+ val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
+ val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
+
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
+ createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.mapValues(_.toIterable)
}
@@ -571,14 +572,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
+ val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
}
buf
- }
- val res = self.context.runJob(self, process _, Array(index), false)
+ } : Seq[V]
+ val res = self.context.runJob(self, process, Array(index), false)
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
@@ -695,7 +696,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
jobFormat.checkOutputSpecs(job)
}
- def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+ val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -716,19 +717,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (k, v) = iter.next()
writer.write(k, v)
}
- }
- finally {
+ } finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
- return 1
- }
+ 1
+ } : Int
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
- self.context.runJob(self, writeShard _)
+ self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
}
@@ -766,7 +766,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+ val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -775,19 +775,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.open()
try {
var count = 0
- while(iter.hasNext) {
+ while (iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
- }
- finally {
+ } finally {
writer.close()
}
writer.commit()
}
- self.context.runJob(self, writeToFile _)
+ self.context.runJob(self, writeToFile)
writer.commitJob()
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e036c53012..da2dc58647 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -329,7 +329,7 @@ abstract class RDD[T: ClassTag](
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
- def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+ val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
@@ -337,7 +337,7 @@ abstract class RDD[T: ClassTag](
position = position + 1
(position, t)
}
- }
+ } : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
@@ -886,19 +886,19 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
- def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+ val countPartition = (iter: Iterator[T]) => {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
- }
- def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
+ } : Iterator[OpenHashMap[T,Long]]
+ val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
- }
+ } : OpenHashMap[T,Long]
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()