aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-07 17:17:07 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-07 17:17:07 -0700
commit630a982b88a3bac3bd362d52d64331b1495f31ff (patch)
treedf948d58929e3cdf2e866249d9dd6c6b11652465
parentf9671b086b8e4faba8feb6fb6fa5a61ae4290ca3 (diff)
downloadspark-630a982b88a3bac3bd362d52d64331b1495f31ff.tar.gz
spark-630a982b88a3bac3bd362d52d64331b1495f31ff.tar.bz2
spark-630a982b88a3bac3bd362d52d64331b1495f31ff.zip
Added a getId method to split to force classes to specify a unique ID
for each split. This replaces the previous method of calling split.toString, which would produce different results for the same split each time it is deserialized (because the default implementation returns the Java object's address).
-rw-r--r--src/scala/spark/HdfsFile.scala4
-rw-r--r--src/scala/spark/ParallelArray.scala2
-rw-r--r--src/scala/spark/RDD.scala16
-rw-r--r--src/scala/spark/Split.scala12
4 files changed, 23 insertions, 11 deletions
diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala
index 886272a8ed..8637c6e30a 100644
--- a/src/scala/spark/HdfsFile.scala
+++ b/src/scala/spark/HdfsFile.scala
@@ -14,7 +14,9 @@ import org.apache.hadoop.mapred.Reporter
@serializable class HdfsSplit(@transient s: InputSplit)
extends Split {
val inputSplit = new SerializableWritable[InputSplit](s)
- override def toString = inputSplit.toString
+
+ override def getId() = inputSplit.toString // Hadoop makes this unique
+ // for each split of each file
}
class HdfsTextFile(sc: SparkContext, path: String)
diff --git a/src/scala/spark/ParallelArray.scala b/src/scala/spark/ParallelArray.scala
index 12fbfaf4c2..a01904d61c 100644
--- a/src/scala/spark/ParallelArray.scala
+++ b/src/scala/spark/ParallelArray.scala
@@ -17,7 +17,7 @@ extends Split {
case _ => false
}
- override def toString() =
+ override def getId() =
"ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice)
}
diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala
index 5236eb958f..803c063865 100644
--- a/src/scala/spark/RDD.scala
+++ b/src/scala/spark/RDD.scala
@@ -166,8 +166,8 @@ extends RDD[Array[T]](prev.sparkContext) {
@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {
- override def toString() =
- "SeededSplit(" + prev.toString + ", seed " + seed + ")"
+ override def getId() =
+ "SeededSplit(" + prev.getId() + ", seed " + seed + ")"
}
class SampledRDD[T: ClassManifest](
@@ -216,7 +216,7 @@ extends RDD[T](prev.sparkContext) with Logging {
}
override def iterator(split: Split): Iterator[T] = {
- val key = id + "::" + split.toString
+ val key = id + "::" + split.getId()
logInfo("CachedRDD split key is " + key)
val cache = CachedRDD.cache
val loading = CachedRDD.loading
@@ -271,7 +271,7 @@ private object CachedRDD {
abstract class UnionSplit[T: ClassManifest] extends Split {
def iterator(): Iterator[T]
def preferredLocations(): Seq[String]
- def toString(): String
+ def getId(): String
}
@serializable
@@ -280,8 +280,8 @@ class UnionSplitImpl[T: ClassManifest](
extends UnionSplit[T] {
override def iterator() = rdd.iterator(split)
override def preferredLocations() = rdd.preferredLocations(split)
- override def toString() =
- "UnionSplitImpl(" + split.toString + ")"
+ override def getId() =
+ "UnionSplitImpl(" + split.getId() + ")"
}
@serializable
@@ -304,8 +304,8 @@ extends RDD[T](sc) {
}
@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
- override def toString() =
- "CartesianSplit(" + s1.toString + ", " + s2.toString + ")"
+ override def getId() =
+ "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")"
}
@serializable
diff --git a/src/scala/spark/Split.scala b/src/scala/spark/Split.scala
index 4251191814..0f7a21354d 100644
--- a/src/scala/spark/Split.scala
+++ b/src/scala/spark/Split.scala
@@ -1,3 +1,13 @@
package spark
-abstract class Split {}
+/**
+ * A partition of an RDD.
+ */
+trait Split {
+ /**
+ * Get a unique ID for this split which can be used, for example, to
+ * set up caches based on it. The ID should stay the same if we serialize
+ * and then deserialize the split.
+ */
+ def getId(): String
+}