diff options
author | Davies Liu <davies@databricks.com> | 2015-08-31 15:55:22 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-31 15:55:22 -0700 |
commit | 540bdee93103a73736d282b95db6a8cda8f6a2b1 (patch) | |
tree | 4296a5e874aec24a2bad82cae5091a0ebf24ddb8 | |
parent | 5b3245d6dff65972fc39c73f90d5cbdf84d19129 (diff) | |
download | spark-540bdee93103a73736d282b95db6a8cda8f6a2b1.tar.gz spark-540bdee93103a73736d282b95db6a8cda8f6a2b1.tar.bz2 spark-540bdee93103a73736d282b95db6a8cda8f6a2b1.zip |
[SPARK-10341] [SQL] fix memory starving in unsafe SMJ
In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page.
Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children.
cc rxin JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes #8511 from davies/smj_memory.
3 files changed, 42 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index b475bd8d79..1f2213d0c4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.{Partition, Partitioner, TaskContext} @@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M override def getPartitions: Array[Partition] = firstParent[T].partitions + // In certain join operations, prepare can be called on the same partition multiple times. + // In this case, we need to ensure that each call to compute gets a separate prepare argument. + private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] + + /** + * Prepare a partition for a single call to compute. + */ + def prepare(): Unit = { + preparedArguments += preparePartition() + } + /** * Prepare a partition before computing it from its parent. */ override def compute(partition: Partition, context: TaskContext): Iterator[U] = { - val preparedArgument = preparePartition() + val prepared = + if (preparedArguments.isEmpty) { + preparePartition() + } else { + preparedArguments.remove(0) + } val parentIterator = firstParent[T].iterator(partition, context) - executePartition(context, partition.index, preparedArgument, parentIterator) + executePartition(context, partition.index, prepared, parentIterator) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 81f40ad33a..b3c64394ab 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -73,6 +73,16 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( super.clearDependencies() rdds = null } + + /** + * Call the prepare method of every parent that has one. + * This is needed for reserving execution memory in advance. + */ + protected def tryPrepareParents(): Unit = { + rdds.collect { + case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare() + } + } } private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( @@ -84,6 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { + tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -107,6 +118,7 @@ private[spark] class ZippedPartitionsRDD3 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { + tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), @@ -134,6 +146,7 @@ private[spark] class ZippedPartitionsRDD4 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { + tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), diff --git a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala index c16930e7d6..e281e817e4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala @@ -46,11 +46,17 @@ class MapPartitionsWithPreparationRDDSuite extends SparkFunSuite with LocalSpark } // Verify that the numbers are pushed in the order expected - val result = { - new MapPartitionsWithPreparationRDD[Int, Int, Unit]( - parent, preparePartition, executePartition).collect() - } + val rdd = new MapPartitionsWithPreparationRDD[Int, Int, Unit]( + parent, preparePartition, executePartition) + val result = rdd.collect() assert(result === Array(10, 20, 30)) + + TestObject.things.clear() + // Zip two of these RDDs, both should be prepared before the parent is executed + val rdd2 = new MapPartitionsWithPreparationRDD[Int, Int, Unit]( + parent, preparePartition, executePartition) + val result2 = rdd.zipPartitions(rdd2)((a, b) => a).collect() + assert(result2 === Array(10, 10, 20, 30, 20, 30)) } } |