diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-05-22 17:12:29 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-05-22 17:12:29 -0700 |
commit | cec427e777fe2d6ef0dab285a1f4289d2ae4f89e (patch) | |
tree | 81c514b1372e21ea75b72c0be5162efb7a139e18 | |
parent | 4c888b2933cd8e3695378b5a3c09adf97ab5f827 (diff) | |
download | spark-cec427e777fe2d6ef0dab285a1f4289d2ae4f89e.tar.gz spark-cec427e777fe2d6ef0dab285a1f4289d2ae4f89e.tar.bz2 spark-cec427e777fe2d6ef0dab285a1f4289d2ae4f89e.zip |
Fixed a bug with preferred locations having changed meaning in new RDDs
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 10 |
1 files changed, 3 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 45dcad54b4..6334896cb6 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -20,11 +20,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { // Methods that must be implemented by subclasses def splits: Array[Split] def compute(split: Split): Iterator[T] - def preferredLocations(split: Split): Seq[String] val dependencies: List[Dependency[_]] // Optionally overridden by subclasses to specify how they are partitioned val partitioner: Option[Partitioner] = None + + // Optionally overridden by subclasses to specify placement preferences + def preferredLocations(split: Split): Seq[String] = Nil def context = sc @@ -152,7 +154,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => U) extends RDD[U](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).map(f) } @@ -161,7 +162,6 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => Traversable[U]) extends RDD[U](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator } @@ -170,7 +170,6 @@ class FilteredRDD[T: ClassManifest]( prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).filter(f) } @@ -178,7 +177,6 @@ extends RDD[T](prev.context) { class SplitRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray)) } @@ -303,7 +301,6 @@ class MappedValuesRDD[K, V, U]( prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner override def compute(split: Split) = @@ -314,7 +311,6 @@ class FlatMappedValuesRDD[K, V, U]( prev: RDD[(K, V)], f: V => Traversable[U]) extends RDD[(K, U)](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner override def compute(split: Split) = { |