diff options
-rw-r--r-- | src/scala/spark/HdfsFile.scala | 4 | ||||
-rw-r--r-- | src/scala/spark/ParallelArray.scala | 2 | ||||
-rw-r--r-- | src/scala/spark/RDD.scala | 16 | ||||
-rw-r--r-- | src/scala/spark/Split.scala | 12 |
4 files changed, 23 insertions, 11 deletions
diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala index 886272a8ed..8637c6e30a 100644 --- a/src/scala/spark/HdfsFile.scala +++ b/src/scala/spark/HdfsFile.scala @@ -14,7 +14,9 @@ import org.apache.hadoop.mapred.Reporter @serializable class HdfsSplit(@transient s: InputSplit) extends Split { val inputSplit = new SerializableWritable[InputSplit](s) - override def toString = inputSplit.toString + + override def getId() = inputSplit.toString // Hadoop makes this unique + // for each split of each file } class HdfsTextFile(sc: SparkContext, path: String) diff --git a/src/scala/spark/ParallelArray.scala b/src/scala/spark/ParallelArray.scala index 12fbfaf4c2..a01904d61c 100644 --- a/src/scala/spark/ParallelArray.scala +++ b/src/scala/spark/ParallelArray.scala @@ -17,7 +17,7 @@ extends Split { case _ => false } - override def toString() = + override def getId() = "ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice) } diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala index 5236eb958f..803c063865 100644 --- a/src/scala/spark/RDD.scala +++ b/src/scala/spark/RDD.scala @@ -166,8 +166,8 @@ extends RDD[Array[T]](prev.sparkContext) { @serializable class SeededSplit(val prev: Split, val seed: Int) extends Split { - override def toString() = - "SeededSplit(" + prev.toString + ", seed " + seed + ")" + override def getId() = + "SeededSplit(" + prev.getId() + ", seed " + seed + ")" } class SampledRDD[T: ClassManifest]( @@ -216,7 +216,7 @@ extends RDD[T](prev.sparkContext) with Logging { } override def iterator(split: Split): Iterator[T] = { - val key = id + "::" + split.toString + val key = id + "::" + split.getId() logInfo("CachedRDD split key is " + key) val cache = CachedRDD.cache val loading = CachedRDD.loading @@ -271,7 +271,7 @@ private object CachedRDD { abstract class UnionSplit[T: ClassManifest] extends Split { def iterator(): Iterator[T] def preferredLocations(): Seq[String] - def toString(): String + def getId(): String } @serializable @@ -280,8 +280,8 @@ class UnionSplitImpl[T: ClassManifest]( extends UnionSplit[T] { override def iterator() = rdd.iterator(split) override def preferredLocations() = rdd.preferredLocations(split) - override def toString() = - "UnionSplitImpl(" + split.toString + ")" + override def getId() = + "UnionSplitImpl(" + split.getId() + ")" } @serializable @@ -304,8 +304,8 @@ extends RDD[T](sc) { } @serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split { - override def toString() = - "CartesianSplit(" + s1.toString + ", " + s2.toString + ")" + override def getId() = + "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")" } @serializable diff --git a/src/scala/spark/Split.scala b/src/scala/spark/Split.scala index 4251191814..0f7a21354d 100644 --- a/src/scala/spark/Split.scala +++ b/src/scala/spark/Split.scala @@ -1,3 +1,13 @@ package spark -abstract class Split {} +/** + * A partition of an RDD. + */ +trait Split { + /** + * Get a unique ID for this split which can be used, for example, to + * set up caches based on it. The ID should stay the same if we serialize + * and then deserialize the split. + */ + def getId(): String +} |