diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-10-07 17:08:06 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-10-07 17:24:18 -0700 |
commit | 391133f66a41cf78cc200c20c0228eb99eebc6fd (patch) | |
tree | 8534a412c03eaf19ada85be0bc01b0e23c280a36 /core | |
parent | 3745a1827fc955be6c3236e4c31d27db062f15de (diff) | |
download | spark-391133f66a41cf78cc200c20c0228eb99eebc6fd.tar.gz spark-391133f66a41cf78cc200c20c0228eb99eebc6fd.tar.bz2 spark-391133f66a41cf78cc200c20c0228eb99eebc6fd.zip |
Fix inconsistent and incorrect log messages in shuffle read path
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/CacheManager.scala | 11 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 12 |
2 files changed, 16 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 3d36761cda..048168c52b 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -34,12 +34,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logInfo("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) @@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 495a72db69..37d0ddb17b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -523,7 +523,17 @@ private[spark] class BlockManager( * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /** |