aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-05 22:14:18 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-05 22:16:59 -0600
commita9c8d53cfa0bd09565799cec88344b286d7cc436 (patch)
tree86e00d50f288de93191c98186dd24cdf3201aac0
parentf4d43cb43e64ec3436a129cf3f7a177374451060 (diff)
downloadspark-a9c8d53cfa0bd09565799cec88344b286d7cc436.tar.gz
spark-a9c8d53cfa0bd09565799cec88344b286d7cc436.tar.bz2
spark-a9c8d53cfa0bd09565799cec88344b286d7cc436.zip
Clean up RDDs, mainly to use getSplits.
Also made sure clearDependencies() was calling super, to ensure the getSplits/getDependencies vars in the RDD base class get cleaned up.
-rw-r--r--core/src/main/scala/spark/RDD.scala1
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala3
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala21
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala13
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala7
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/SampledRDD.scala8
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala8
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala6
11 files changed, 30 insertions, 59 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index f0bc85865c..5f99591fd5 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -656,7 +656,6 @@ abstract class RDD[T: ClassManifest](
*/
private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
clearDependencies()
- dependencies_ = null
splits_ = null
deps = null // Forget the constructor argument for dependencies too
}
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 2c022f88e0..4214817c65 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -11,10 +11,6 @@ private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {
- @transient var splits_ : Array[Split] = (0 until blockIds.size).map(i => {
- new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
- }).toArray
-
@transient lazy val locations_ = {
val blockManager = SparkEnv.get.blockManager
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
@@ -22,7 +18,10 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
HashMap(blockIds.zip(locations):_*)
}
- override def getSplits = splits_
+ override def getSplits = (0 until blockIds.size).map(i => {
+ new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
+ }).toArray
+
override def compute(split: Split, context: TaskContext): Iterator[T] = {
val blockManager = SparkEnv.get.blockManager
@@ -37,8 +36,5 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
override def getPreferredLocations(split: Split) =
locations_(split.asInstanceOf[BlockRDDSplit].blockId)
- override def clearDependencies() {
- splits_ = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 0f9ca06531..2f572a1941 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -35,7 +35,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
val numSplitsInRdd2 = rdd2.splits.size
- override def getSplits: Array[Split] = {
+ override def getSplits = {
// create the cross product split
val array = new Array[Split](rdd1.splits.size * rdd2.splits.size)
for (s1 <- rdd1.splits; s2 <- rdd2.splits) {
@@ -66,6 +66,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
)
override def clearDependencies() {
+ super.clearDependencies()
rdd1 = null
rdd2 = null
}
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 96b593ba7c..7cde523f11 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
- @transient val splits_ : Array[Split] = {
+ override def getSplits = {
val dirContents = fs.listStatus(new Path(checkpointPath))
val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numSplits = splitFiles.size
@@ -34,8 +34,6 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
checkpointData = Some(new RDDCheckpointData[T](this))
checkpointData.get.cpFile = Some(checkpointPath)
- override def getSplits = splits_
-
override def getPreferredLocations(split: Split): Seq[String] = {
val status = fs.getFileStatus(new Path(checkpointPath))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 021118c8ba..d31ce13706 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -43,26 +43,22 @@ private[spark] class CoGroupAggregator
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
- val aggr = new CoGroupAggregator
+ private val aggr = new CoGroupAggregator
- @transient var deps_ = {
- val deps = new ArrayBuffer[Dependency[_]]
- for (rdd <- rdds) {
+ override def getDependencies = {
+ rdds.map { rdd =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
- deps += new OneToOneDependency(rdd)
+ new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+ new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
}
}
- deps.toList
}
- override def getDependencies = deps_
-
- @transient var splits_ : Array[Split] = {
+ override def getSplits = {
val array = new Array[Split](part.numPartitions)
for (i <- 0 until array.size) {
// Each CoGroupSplit will have a dependency per contributing RDD
@@ -79,8 +75,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
array
}
- override def getSplits = splits_
-
override val partitioner = Some(part)
override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
@@ -117,8 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
override def clearDependencies() {
- deps_ = null
- splits_ = null
+ super.clearDependencies()
rdds = null
}
}
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 4c57434b65..a1aa7a30b0 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -31,7 +31,7 @@ class CoalescedRDD[T: ClassManifest](
maxPartitions: Int)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
- override def getSplits: Array[Split] = {
+ override def getSplits = {
val prevSplits = prev.splits
if (prevSplits.length < maxPartitions) {
prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) }
@@ -50,14 +50,13 @@ class CoalescedRDD[T: ClassManifest](
}
}
- override def getDependencies: Seq[Dependency[_]] = List(
- new NarrowDependency(prev) {
- def getParents(id: Int): Seq[Int] =
- splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
- }
- )
+ override def getDependencies = Seq(new NarrowDependency(prev) {
+ def getParents(id: Int): Seq[Int] =
+ splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
+ })
override def clearDependencies() {
+ super.clearDependencies()
prev = null
}
}
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index f547f53812..cd948de967 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -45,10 +45,9 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
- val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
- @transient
- val splits_ : Array[Split] = {
+ override def getSplits = {
val inputFormat = createInputFormat(conf)
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Split](inputSplits.size)
@@ -63,8 +62,6 @@ class HadoopRDD[K, V](
.asInstanceOf[InputFormat[K, V]]
}
- override def getSplits = splits_
-
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index c3b155fcbd..2d000f5c68 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -29,7 +29,7 @@ class NewHadoopRDD[K, V](
with HadoopMapReduceUtil {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
private val jobtrackerId: String = {
@@ -39,7 +39,7 @@ class NewHadoopRDD[K, V](
@transient private val jobId = new JobID(jobtrackerId, id)
- @transient private val splits_ : Array[Split] = {
+ override def getSplits = {
val inputFormat = inputFormatClass.newInstance
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
@@ -50,8 +50,6 @@ class NewHadoopRDD[K, V](
result
}
- override def getSplits = splits_
-
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopSplit]
val conf = confBroadcast.value.value
diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala
index e24ad23b21..81626d5009 100644
--- a/core/src/main/scala/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/spark/rdd/SampledRDD.scala
@@ -19,13 +19,11 @@ class SampledRDD[T: ClassManifest](
seed: Int)
extends RDD[T](prev) {
- @transient var splits_ : Array[Split] = {
+ override def getSplits = {
val rg = new Random(seed)
firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt))
}
- override def getSplits = splits_
-
override def getPreferredLocations(split: Split) =
firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
@@ -48,8 +46,4 @@ class SampledRDD[T: ClassManifest](
firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac))
}
}
-
- override def clearDependencies() {
- splits_ = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 26a2d511f2..5ac24d2ffc 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -28,7 +28,7 @@ class UnionRDD[T: ClassManifest](
@transient var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
- override def getSplits: Array[Split] = {
+ override def getSplits = {
val array = new Array[Split](rdds.map(_.splits.size).sum)
var pos = 0
for (rdd <- rdds; split <- rdd.splits) {
@@ -38,7 +38,7 @@ class UnionRDD[T: ClassManifest](
array
}
- override def getDependencies: Seq[Dependency[_]] = {
+ override def getDependencies = {
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
@@ -53,8 +53,4 @@ class UnionRDD[T: ClassManifest](
override def getPreferredLocations(s: Split): Seq[String] =
s.asInstanceOf[UnionSplit[T]].preferredLocations()
-
- override def clearDependencies() {
- rdds = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index e5df6d8c72..a079720a93 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -29,10 +29,9 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
sc: SparkContext,
var rdd1: RDD[T],
var rdd2: RDD[U])
- extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2)))
- with Serializable {
+ extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) {
- override def getSplits: Array[Split] = {
+ override def getSplits = {
if (rdd1.splits.size != rdd2.splits.size) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
@@ -54,6 +53,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def clearDependencies() {
+ super.clearDependencies()
rdd1 = null
rdd2 = null
}