aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-01 10:48:53 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-01 10:48:53 -0700
commit53f90d0f0ea8ae605e95ece9850df6004123c787 (patch)
treecb1b17d507e4f223c1634e0dedcec8193f75648b /core/src/main
parent2314132d57878152a84325f86ea320bdbc7cca31 (diff)
downloadspark-53f90d0f0ea8ae605e95ece9850df6004123c787.tar.gz
spark-53f90d0f0ea8ae605e95ece9850df6004123c787.tar.bz2
spark-53f90d0f0ea8ae605e95ece9850df6004123c787.zip
Use underscores instead of colons in RDD IDs
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala4
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala2
5 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 0bbdb4e432..9c42e88b68 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -29,7 +29,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
val blocksByAddress: Seq[(BlockManagerId, Seq[String])] = splitsByAddress.toSeq.map {
case (address, splits) =>
- (address, splits.map(i => "shuffleid_%d_%d_%d".format(shuffleId, i, reduceId)))
+ (address, splits.map(i => "shuffle_%d_%d_%d".format(shuffleId, i, reduceId)))
}
for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) {
@@ -42,7 +42,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
}
}
case None => {
- val regex = "shuffleid_([0-9]*)_([0-9]*)_([0-9]*)".r
+ val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
blockId match {
case regex(shufId, mapId, reduceId) =>
val addr = addresses(mapId.toInt)
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 225a5ad403..d9e0ef90b8 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -167,7 +167,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
// Gets or computes an RDD split
def getOrCompute[T](rdd: RDD[T], split: Split, storageLevel: StorageLevel): Iterator[T] = {
- val key = "rdd:%d:%d".format(rdd.id, split.index)
+ val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Cache key is " + key)
blockManager.get(key) match {
case Some(cachedValues) =>
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 784f25086e..ab8014c056 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -106,7 +106,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
// This is a hack. Ideally this should re-use the code used by the CacheTracker
// to generate the key.
- def getSplitKey(split: Split) = "rdd:%d:%d".format(this.id, split.index)
+ def getSplitKey(split: Split) = "rdd_%d_%d".format(this.id, split.index)
persist(level)
sc.runJob(this, (iter: Iterator[T]) => {} )
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index d70a061366..27d97ffee5 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -144,7 +144,7 @@ class ShuffleMapTask(
val ser = SparkEnv.get.serializer.newInstance()
val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) {
- val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i
+ val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
// Get a scala iterator from java map
val iter: Iterator[(Any, Any)] = bucketIterators(i)
// TODO: This should probably be DISK_ONLY
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index a66f812662..081981c838 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -606,7 +606,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This code will be removed when CacheTracker is gone.
private def notifyTheCacheTracker(key: String) {
- val rddInfo = key.split(":")
+ val rddInfo = key.split("_")
val rddId: Int = rddInfo(1).toInt
val splitIndex: Int = rddInfo(2).toInt
val host = System.getProperty("spark.hostname", Utils.localHostName())