aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-17 10:54:53 -0700
committerReynold Xin <rxin@apache.org>2014-07-17 10:54:53 -0700
commitd988d345d5bec0668324386f3e81787f78e75e67 (patch)
tree47ad23ea5c1805950d32608322b9b1f0a485bc5e /core
parent9c73822a08848a0cde545282d3eb1c3f1a4c2a82 (diff)
downloadspark-d988d345d5bec0668324386f3e81787f78e75e67.tar.gz
spark-d988d345d5bec0668324386f3e81787f78e75e67.tar.bz2
spark-d988d345d5bec0668324386f3e81787f78e75e67.zip
[SPARK-2534] Avoid pulling in the entire RDD in various operators
This should go into both master and branch-1.0. Author: Reynold Xin <rxin@apache.org> Closes #1450 from rxin/agg-closure and squashes the following commits: e40f363 [Reynold Xin] Mima check excludes. 9186364 [Reynold Xin] Define the return type more explicitly. 38e348b [Reynold Xin] Fixed the cases in RDD.scala. ea6b34d [Reynold Xin] Blah 89b9c43 [Reynold Xin] Fix other instances of accidentally pulling in extra stuff in closures. 73b2783 [Reynold Xin] [SPARK-2534] Avoid pulling in the entire RDD in groupByKey.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala12
2 files changed, 28 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 9d62d53fcb..29038b0359 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -125,7 +125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
- def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
+ val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
}
@@ -171,7 +171,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
- def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+ val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
}
@@ -214,22 +214,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("reduceByKeyLocally() does not support array keys")
}
- def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+ val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
}
Iterator(map)
- }
+ } : Iterator[JHashMap[K, V]]
- def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+ val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
m1
- }
+ } : JHashMap[K, V]
self.mapPartitions(reducePartition).reduce(mergeMaps)
}
@@ -361,11 +361,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
+ val createCombiner = (v: V) => ArrayBuffer(v)
+ val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
+ val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
+ createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.mapValues(_.toIterable)
}
@@ -710,14 +710,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
+ val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
}
buf
- }
- val res = self.context.runJob(self, process _, Array(index), false)
+ } : Seq[V]
+ val res = self.context.runJob(self, process, Array(index), false)
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
@@ -840,7 +840,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
jobFormat.checkOutputSpecs(job)
}
- def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+ val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -861,19 +861,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (k, v) = iter.next()
writer.write(k, v)
}
- }
- finally {
+ } finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
- return 1
- }
+ 1
+ } : Int
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
- self.context.runJob(self, writeShard _)
+ self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
}
@@ -912,7 +911,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+ val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -921,19 +920,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.open()
try {
var count = 0
- while(iter.hasNext) {
+ while (iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
- }
- finally {
+ } finally {
writer.close()
}
writer.commit()
}
- self.context.runJob(self, writeToFile _)
+ self.context.runJob(self, writeToFile)
writer.commitJob()
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a25f263bea..88a918aebf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -328,7 +328,7 @@ abstract class RDD[T: ClassTag](
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
- def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+ val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
@@ -336,7 +336,7 @@ abstract class RDD[T: ClassTag](
position = position + 1
(position, t)
}
- }
+ } : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
@@ -919,19 +919,19 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
- def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+ val countPartition = (iter: Iterator[T]) => {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
- }
- def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
+ }: Iterator[OpenHashMap[T,Long]]
+ val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
- }
+ }: OpenHashMap[T,Long]
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()