diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-10-07 17:17:07 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-10-07 17:17:07 -0700 |
commit | 630a982b88a3bac3bd362d52d64331b1495f31ff (patch) | |
tree | df948d58929e3cdf2e866249d9dd6c6b11652465 | |
parent | f9671b086b8e4faba8feb6fb6fa5a61ae4290ca3 (diff) | |
download | spark-630a982b88a3bac3bd362d52d64331b1495f31ff.tar.gz spark-630a982b88a3bac3bd362d52d64331b1495f31ff.tar.bz2 spark-630a982b88a3bac3bd362d52d64331b1495f31ff.zip |
Added a getId method to split to force classes to specify a unique ID
for each split. This replaces the previous method of calling
split.toString, which would produce different results for the same split
each time it is deserialized (because the default implementation returns
the Java object's address).
-rw-r--r-- | src/scala/spark/HdfsFile.scala | 4 | ||||
-rw-r--r-- | src/scala/spark/ParallelArray.scala | 2 | ||||
-rw-r--r-- | src/scala/spark/RDD.scala | 16 | ||||
-rw-r--r-- | src/scala/spark/Split.scala | 12 |
4 files changed, 23 insertions, 11 deletions
diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala index 886272a8ed..8637c6e30a 100644 --- a/src/scala/spark/HdfsFile.scala +++ b/src/scala/spark/HdfsFile.scala @@ -14,7 +14,9 @@ import org.apache.hadoop.mapred.Reporter @serializable class HdfsSplit(@transient s: InputSplit) extends Split { val inputSplit = new SerializableWritable[InputSplit](s) - override def toString = inputSplit.toString + + override def getId() = inputSplit.toString // Hadoop makes this unique + // for each split of each file } class HdfsTextFile(sc: SparkContext, path: String) diff --git a/src/scala/spark/ParallelArray.scala b/src/scala/spark/ParallelArray.scala index 12fbfaf4c2..a01904d61c 100644 --- a/src/scala/spark/ParallelArray.scala +++ b/src/scala/spark/ParallelArray.scala @@ -17,7 +17,7 @@ extends Split { case _ => false } - override def toString() = + override def getId() = "ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice) } diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala index 5236eb958f..803c063865 100644 --- a/src/scala/spark/RDD.scala +++ b/src/scala/spark/RDD.scala @@ -166,8 +166,8 @@ extends RDD[Array[T]](prev.sparkContext) { @serializable class SeededSplit(val prev: Split, val seed: Int) extends Split { - override def toString() = - "SeededSplit(" + prev.toString + ", seed " + seed + ")" + override def getId() = + "SeededSplit(" + prev.getId() + ", seed " + seed + ")" } class SampledRDD[T: ClassManifest]( @@ -216,7 +216,7 @@ extends RDD[T](prev.sparkContext) with Logging { } override def iterator(split: Split): Iterator[T] = { - val key = id + "::" + split.toString + val key = id + "::" + split.getId() logInfo("CachedRDD split key is " + key) val cache = CachedRDD.cache val loading = CachedRDD.loading @@ -271,7 +271,7 @@ private object CachedRDD { abstract class UnionSplit[T: ClassManifest] extends Split { def iterator(): Iterator[T] def preferredLocations(): Seq[String] - def toString(): String + def getId(): String } @serializable @@ -280,8 +280,8 @@ class UnionSplitImpl[T: ClassManifest]( extends UnionSplit[T] { override def iterator() = rdd.iterator(split) override def preferredLocations() = rdd.preferredLocations(split) - override def toString() = - "UnionSplitImpl(" + split.toString + ")" + override def getId() = + "UnionSplitImpl(" + split.getId() + ")" } @serializable @@ -304,8 +304,8 @@ extends RDD[T](sc) { } @serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split { - override def toString() = - "CartesianSplit(" + s1.toString + ", " + s2.toString + ")" + override def getId() = + "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")" } @serializable diff --git a/src/scala/spark/Split.scala b/src/scala/spark/Split.scala index 4251191814..0f7a21354d 100644 --- a/src/scala/spark/Split.scala +++ b/src/scala/spark/Split.scala @@ -1,3 +1,13 @@ package spark -abstract class Split {} +/** + * A partition of an RDD. + */ +trait Split { + /** + * Get a unique ID for this split which can be used, for example, to + * set up caches based on it. The ID should stay the same if we serialize + * and then deserialize the split. + */ + def getId(): String +} |