aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/BlockRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/rdd/BlockRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala24
1 files changed, 10 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 2c022f88e0..7348c4f15b 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -1,9 +1,9 @@
package spark.rdd
import scala.collection.mutable.HashMap
-import spark.{RDD, SparkContext, SparkEnv, Split, TaskContext}
+import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
-private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
+private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
val index = idx
}
@@ -11,10 +11,6 @@ private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {
- @transient var splits_ : Array[Split] = (0 until blockIds.size).map(i => {
- new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
- }).toArray
-
@transient lazy val locations_ = {
val blockManager = SparkEnv.get.blockManager
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
@@ -22,11 +18,14 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
HashMap(blockIds.zip(locations):_*)
}
- override def getSplits = splits_
+ override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
+ new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
+ }).toArray
- override def compute(split: Split, context: TaskContext): Iterator[T] = {
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val blockManager = SparkEnv.get.blockManager
- val blockId = split.asInstanceOf[BlockRDDSplit].blockId
+ val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
case Some(block) => block.asInstanceOf[Iterator[T]]
case None =>
@@ -34,11 +33,8 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
}
}
- override def getPreferredLocations(split: Split) =
- locations_(split.asInstanceOf[BlockRDDSplit].blockId)
+ override def getPreferredLocations(split: Partition): Seq[String] =
+ locations_(split.asInstanceOf[BlockRDDPartition].blockId)
- override def clearDependencies() {
- splits_ = null
- }
}