aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-30 15:47:40 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-30 15:47:40 -0700
commit45029bfdea42eb8964f2ba697859687393d2a558 (patch)
tree45173a40ba6548f69f797d307ffb3a299bf6872e /core
parentbb5a2af034196620d869fc9b1a400e014e718b8c (diff)
downloadspark-45029bfdea42eb8964f2ba697859687393d2a558.tar.gz
spark-45029bfdea42eb8964f2ba697859687393d2a558.tar.bz2
spark-45029bfdea42eb8964f2ba697859687393d2a558.zip
[SPARK-11423] remove MapPartitionsWithPreparationRDD
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore. This PR basically revert #8543, #8511, #8038, #8011 Author: Davies Liu <davies@databricks.com> Closes #9381 from davies/remove_prepare2.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala66
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala66
3 files changed, 0 insertions, 145 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
deleted file mode 100644
index 417ff5278d..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, Partitioner, TaskContext}
-
-/**
- * An RDD that applies a user provided function to every partition of the parent RDD, and
- * additionally allows the user to prepare each partition before computing the parent partition.
- */
-private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M: ClassTag](
- prev: RDD[T],
- preparePartition: () => M,
- executePartition: (TaskContext, Int, M, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false)
- extends RDD[U](prev) {
-
- override val partitioner: Option[Partitioner] = {
- if (preservesPartitioning) firstParent[T].partitioner else None
- }
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- // In certain join operations, prepare can be called on the same partition multiple times.
- // In this case, we need to ensure that each call to compute gets a separate prepare argument.
- private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
-
- /**
- * Prepare a partition for a single call to compute.
- */
- def prepare(): Unit = {
- preparedArguments += preparePartition()
- }
-
- /**
- * Prepare a partition before computing it from its parent.
- */
- override def compute(partition: Partition, context: TaskContext): Iterator[U] = {
- val prepared =
- if (preparedArguments.isEmpty) {
- preparePartition()
- } else {
- preparedArguments.remove(0)
- }
- val parentIterator = firstParent[T].iterator(partition, context)
- executePartition(context, partition.index, prepared, parentIterator)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 70bf04de64..4333a679c8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -73,16 +73,6 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
super.clearDependencies()
rdds = null
}
-
- /**
- * Call the prepare method of every parent that has one.
- * This is needed for reserving execution memory in advance.
- */
- protected def tryPrepareParents(): Unit = {
- rdds.collect {
- case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
- }
- }
}
private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
@@ -94,7 +84,6 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
- tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
}
@@ -118,7 +107,6 @@ private[spark] class ZippedPartitionsRDD3
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
- tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
@@ -146,7 +134,6 @@ private[spark] class ZippedPartitionsRDD4
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
- tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
diff --git a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala
deleted file mode 100644
index e281e817e4..0000000000
--- a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.collection.mutable
-
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TaskContext}
-
-class MapPartitionsWithPreparationRDDSuite extends SparkFunSuite with LocalSparkContext {
-
- test("prepare called before parent partition is computed") {
- sc = new SparkContext("local", "test")
-
- // Have the parent partition push a number to the list
- val parent = sc.parallelize(1 to 100, 1).mapPartitions { iter =>
- TestObject.things.append(20)
- iter
- }
-
- // Push a different number during the prepare phase
- val preparePartition = () => { TestObject.things.append(10) }
-
- // Push yet another number during the execution phase
- val executePartition = (
- taskContext: TaskContext,
- partitionIndex: Int,
- notUsed: Unit,
- parentIterator: Iterator[Int]) => {
- TestObject.things.append(30)
- TestObject.things.iterator
- }
-
- // Verify that the numbers are pushed in the order expected
- val rdd = new MapPartitionsWithPreparationRDD[Int, Int, Unit](
- parent, preparePartition, executePartition)
- val result = rdd.collect()
- assert(result === Array(10, 20, 30))
-
- TestObject.things.clear()
- // Zip two of these RDDs, both should be prepared before the parent is executed
- val rdd2 = new MapPartitionsWithPreparationRDD[Int, Int, Unit](
- parent, preparePartition, executePartition)
- val result2 = rdd.zipPartitions(rdd2)((a, b) => a).collect()
- assert(result2 === Array(10, 10, 20, 30, 20, 30))
- }
-
-}
-
-private object TestObject {
- val things = new mutable.ListBuffer[Int]
-}