aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala2
3 files changed, 53 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index a838aac6e8..4312d3a417 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -21,6 +21,9 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
+/**
+ * An RDD that applies the provided function to every partition of the parent RDD.
+ */
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
new file mode 100644
index 0000000000..b475bd8d79
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, Partitioner, TaskContext}
+
+/**
+ * An RDD that applies a user provided function to every partition of the parent RDD, and
+ * additionally allows the user to prepare each partition before computing the parent partition.
+ */
+private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M: ClassTag](
+ prev: RDD[T],
+ preparePartition: () => M,
+ executePartition: (TaskContext, Int, M, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false)
+ extends RDD[U](prev) {
+
+ override val partitioner: Option[Partitioner] = {
+ if (preservesPartitioning) firstParent[T].partitioner else None
+ }
+
+ override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+ /**
+ * Prepare a partition before computing it from its parent.
+ */
+ override def compute(partition: Partition, context: TaskContext): Iterator[U] = {
+ val preparedArgument = preparePartition()
+ val parentIterator = firstParent[T].iterator(partition, context)
+ executePartition(context, partition.index, preparedArgument, parentIterator)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index 00c1e078a4..e3d229cc99 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -124,7 +124,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
}
}
-private object ShuffleMemoryManager {
+private[spark] object ShuffleMemoryManager {
/**
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
* of the memory pool and a safety factor since collections can sometimes grow bigger than