diff options
author | Reynold Xin <rxin@apache.org> | 2014-07-17 10:54:53 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-07-17 10:54:53 -0700 |
commit | d988d345d5bec0668324386f3e81787f78e75e67 (patch) | |
tree | 47ad23ea5c1805950d32608322b9b1f0a485bc5e /core | |
parent | 9c73822a08848a0cde545282d3eb1c3f1a4c2a82 (diff) | |
download | spark-d988d345d5bec0668324386f3e81787f78e75e67.tar.gz spark-d988d345d5bec0668324386f3e81787f78e75e67.tar.bz2 spark-d988d345d5bec0668324386f3e81787f78e75e67.zip |
[SPARK-2534] Avoid pulling in the entire RDD in various operators
This should go into both master and branch-1.0.
Author: Reynold Xin <rxin@apache.org>
Closes #1450 from rxin/agg-closure and squashes the following commits:
e40f363 [Reynold Xin] Mima check excludes.
9186364 [Reynold Xin] Define the return type more explicitly.
38e348b [Reynold Xin] Fixed the cases in RDD.scala.
ea6b34d [Reynold Xin] Blah
89b9c43 [Reynold Xin] Fix other instances of accidentally pulling in extra stuff in closures.
73b2783 [Reynold Xin] [SPARK-2534] Avoid pulling in the entire RDD in groupByKey.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 46 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 |
2 files changed, 28 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 9d62d53fcb..29038b0359 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -125,7 +125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() - def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) + val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner) } @@ -171,7 +171,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() - def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) + val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner) } @@ -214,22 +214,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) throw new SparkException("reduceByKeyLocally() does not support array keys") } - def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { + val reducePartition = (iter: Iterator[(K, V)]) => { val map = new JHashMap[K, V] iter.foreach { case (k, v) => val old = map.get(k) map.put(k, if (old == null) v else func(old, v)) } Iterator(map) - } + } : Iterator[JHashMap[K, V]] - def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = { + val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => { m2.foreach { case (k, v) => val old = m1.get(k) m1.put(k, if (old == null) v else func(old, v)) } m1 - } + } : JHashMap[K, V] self.mapPartitions(reducePartition).reduce(mergeMaps) } @@ -361,11 +361,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 + val createCombiner = (v: V) => ArrayBuffer(v) + val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v + val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) + createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false) bufs.mapValues(_.toIterable) } @@ -710,14 +710,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) self.partitioner match { case Some(p) => val index = p.getPartition(key) - def process(it: Iterator[(K, V)]): Seq[V] = { + val process = (it: Iterator[(K, V)]) => { val buf = new ArrayBuffer[V] for ((k, v) <- it if k == key) { buf += v } buf - } - val res = self.context.runJob(self, process _, Array(index), false) + } : Seq[V] + val res = self.context.runJob(self, process, Array(index), false) res(0) case None => self.filter(_._1 == key).map(_._2).collect() @@ -840,7 +840,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) jobFormat.checkOutputSpecs(job) } - def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { + val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -861,19 +861,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val (k, v) = iter.next() writer.write(k, v) } - } - finally { + } finally { writer.close(hadoopContext) } committer.commitTask(hadoopContext) - return 1 - } + 1 + } : Int val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) - self.context.runJob(self, writeShard _) + self.context.runJob(self, writeShard) jobCommitter.commitJob(jobTaskContext) } @@ -912,7 +911,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() - def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { + val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -921,19 +920,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.open() try { var count = 0 - while(iter.hasNext) { + while (iter.hasNext) { val record = iter.next() count += 1 writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) } - } - finally { + } finally { writer.close() } writer.commit() } - self.context.runJob(self, writeToFile _) + self.context.runJob(self, writeToFile) writer.commitJob() } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a25f263bea..88a918aebf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -328,7 +328,7 @@ abstract class RDD[T: ClassTag]( : RDD[T] = { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ - def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = { + val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner @@ -336,7 +336,7 @@ abstract class RDD[T: ClassTag]( position = position + 1 (position, t) } - } + } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( @@ -919,19 +919,19 @@ abstract class RDD[T: ClassTag]( throw new SparkException("countByValue() does not support arrays") } // TODO: This should perhaps be distributed by default. - def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = { + val countPartition = (iter: Iterator[T]) => { val map = new OpenHashMap[T,Long] iter.foreach { t => map.changeValue(t, 1L, _ + 1L) } Iterator(map) - } - def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = { + }: Iterator[OpenHashMap[T,Long]] + val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => { m2.foreach { case (key, value) => m1.changeValue(key, value, _ + value) } m1 - } + }: OpenHashMap[T,Long] val myResult = mapPartitions(countPartition).reduce(mergeMaps) // Convert to a Scala mutable map val mutableResult = scala.collection.mutable.Map[T,Long]() |