aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-07 17:08:06 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-07 17:24:18 -0700
commit391133f66a41cf78cc200c20c0228eb99eebc6fd (patch)
tree8534a412c03eaf19ada85be0bc01b0e23c280a36 /core
parent3745a1827fc955be6c3236e4c31d27db062f15de (diff)
downloadspark-391133f66a41cf78cc200c20c0228eb99eebc6fd.tar.gz
spark-391133f66a41cf78cc200c20c0228eb99eebc6fd.tar.bz2
spark-391133f66a41cf78cc200c20c0228eb99eebc6fd.zip
Fix inconsistent and incorrect log messages in shuffle read path
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala12
2 files changed, 16 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3d36761cda..048168c52b 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -34,12 +34,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
- logInfo("Cache key is " + key)
+ logInfo("Looking for partition " + key)
blockManager.get(key) match {
- case Some(cachedValues) =>
- // Partition is in cache, so just return its values
- logInfo("Found partition in cache!")
- return cachedValues.asInstanceOf[Iterator[T]]
+ case Some(values) =>
+ // Partition is already materialized, so just return its values
+ return values.asInstanceOf[Iterator[T]]
case None =>
// Mark the split as loading (unless someone else marks it first)
@@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
- logInfo("Computing partition " + split)
+ logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 495a72db69..37d0ddb17b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -523,7 +523,17 @@ private[spark] class BlockManager(
* Get a block from the block manager (either local or remote).
*/
def get(blockId: String): Option[Iterator[Any]] = {
- getLocal(blockId).orElse(getRemote(blockId))
+ val local = getLocal(blockId)
+ if (local.isDefined) {
+ logInfo("Found block %s locally".format(blockId))
+ return local
+ }
+ val remote = getRemote(blockId)
+ if (remote.isDefined) {
+ logInfo("Found block %s remotely".format(blockId))
+ return remote
+ }
+ None
}
/**