aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/RDD.scala10
1 files changed, 3 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 45dcad54b4..6334896cb6 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -20,11 +20,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
// Methods that must be implemented by subclasses
def splits: Array[Split]
def compute(split: Split): Iterator[T]
- def preferredLocations(split: Split): Seq[String]
val dependencies: List[Dependency[_]]
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner] = None
+
+ // Optionally overridden by subclasses to specify placement preferences
+ def preferredLocations(split: Split): Seq[String] = Nil
def context = sc
@@ -152,7 +154,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => U)
extends RDD[U](prev.context) {
override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).map(f)
}
@@ -161,7 +162,6 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => Traversable[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
}
@@ -170,7 +170,6 @@ class FilteredRDD[T: ClassManifest](
prev: RDD[T], f: T => Boolean)
extends RDD[T](prev.context) {
override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).filter(f)
}
@@ -178,7 +177,6 @@ extends RDD[T](prev.context) {
class SplitRDD[T: ClassManifest](prev: RDD[T])
extends RDD[Array[T]](prev.context) {
override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
}
@@ -303,7 +301,6 @@ class MappedValuesRDD[K, V, U](
prev: RDD[(K, V)], f: V => U)
extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
override def compute(split: Split) =
@@ -314,7 +311,6 @@ class FlatMappedValuesRDD[K, V, U](
prev: RDD[(K, V)], f: V => Traversable[U])
extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
override def compute(split: Split) = {