aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/scala/spark/HdfsFile.scala4
-rw-r--r--src/scala/spark/ParallelArray.scala2
-rw-r--r--src/scala/spark/RDD.scala16
-rw-r--r--src/scala/spark/Split.scala12
4 files changed, 23 insertions, 11 deletions
diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala
index 886272a8ed..8637c6e30a 100644
--- a/src/scala/spark/HdfsFile.scala
+++ b/src/scala/spark/HdfsFile.scala
@@ -14,7 +14,9 @@ import org.apache.hadoop.mapred.Reporter
@serializable class HdfsSplit(@transient s: InputSplit)
extends Split {
val inputSplit = new SerializableWritable[InputSplit](s)
- override def toString = inputSplit.toString
+
+ override def getId() = inputSplit.toString // Hadoop makes this unique
+ // for each split of each file
}
class HdfsTextFile(sc: SparkContext, path: String)
diff --git a/src/scala/spark/ParallelArray.scala b/src/scala/spark/ParallelArray.scala
index 12fbfaf4c2..a01904d61c 100644
--- a/src/scala/spark/ParallelArray.scala
+++ b/src/scala/spark/ParallelArray.scala
@@ -17,7 +17,7 @@ extends Split {
case _ => false
}
- override def toString() =
+ override def getId() =
"ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice)
}
diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala
index 5236eb958f..803c063865 100644
--- a/src/scala/spark/RDD.scala
+++ b/src/scala/spark/RDD.scala
@@ -166,8 +166,8 @@ extends RDD[Array[T]](prev.sparkContext) {
@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {
- override def toString() =
- "SeededSplit(" + prev.toString + ", seed " + seed + ")"
+ override def getId() =
+ "SeededSplit(" + prev.getId() + ", seed " + seed + ")"
}
class SampledRDD[T: ClassManifest](
@@ -216,7 +216,7 @@ extends RDD[T](prev.sparkContext) with Logging {
}
override def iterator(split: Split): Iterator[T] = {
- val key = id + "::" + split.toString
+ val key = id + "::" + split.getId()
logInfo("CachedRDD split key is " + key)
val cache = CachedRDD.cache
val loading = CachedRDD.loading
@@ -271,7 +271,7 @@ private object CachedRDD {
abstract class UnionSplit[T: ClassManifest] extends Split {
def iterator(): Iterator[T]
def preferredLocations(): Seq[String]
- def toString(): String
+ def getId(): String
}
@serializable
@@ -280,8 +280,8 @@ class UnionSplitImpl[T: ClassManifest](
extends UnionSplit[T] {
override def iterator() = rdd.iterator(split)
override def preferredLocations() = rdd.preferredLocations(split)
- override def toString() =
- "UnionSplitImpl(" + split.toString + ")"
+ override def getId() =
+ "UnionSplitImpl(" + split.getId() + ")"
}
@serializable
@@ -304,8 +304,8 @@ extends RDD[T](sc) {
}
@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
- override def toString() =
- "CartesianSplit(" + s1.toString + ", " + s2.toString + ")"
+ override def getId() =
+ "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")"
}
@serializable
diff --git a/src/scala/spark/Split.scala b/src/scala/spark/Split.scala
index 4251191814..0f7a21354d 100644
--- a/src/scala/spark/Split.scala
+++ b/src/scala/spark/Split.scala
@@ -1,3 +1,13 @@
package spark
-abstract class Split {}
+/**
+ * A partition of an RDD.
+ */
+trait Split {
+ /**
+ * Get a unique ID for this split which can be used, for example, to
+ * set up caches based on it. The ID should stay the same if we serialize
+ * and then deserialize the split.
+ */
+ def getId(): String
+}