diff options
author | Reynold Xin <rxin@apache.org> | 2013-10-07 20:45:58 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2013-10-07 20:45:58 -0700 |
commit | ea34c521025d3408d44d45ab5c132fd9791794f6 (patch) | |
tree | 2eff6ba6d8055932179e71e63ed6e2dbf954e9e9 | |
parent | 02f37ee8530bcd5afefe403147bbf411464fa773 (diff) | |
parent | 8b377718b85d31fe5b0efb0ad77a8f38ffcede89 (diff) | |
download | spark-ea34c521025d3408d44d45ab5c132fd9791794f6.tar.gz spark-ea34c521025d3408d44d45ab5c132fd9791794f6.tar.bz2 spark-ea34c521025d3408d44d45ab5c132fd9791794f6.zip |
Merge pull request #42 from pwendell/shuffle-read-perf
Fix inconsistent and incorrect log messages in shuffle read path
The user-facing messages generated by the CacheManager are currently wrong and somewhat misleading. This patch makes the messages more accurate. It also uses a consistent representation of the partition being fetched (`rdd_xx_yy`) so that it's easier for users to trace what is going on when reading logs.
-rw-r--r-- | core/src/main/scala/org/apache/spark/CacheManager.scala | 17 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 12 |
2 files changed, 19 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 3d36761cda..4cf7eb96da 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -34,22 +34,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logDebug("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) loading.synchronized { if (loading.contains(key)) { - logInfo("Loading contains " + key + ", waiting...") + logInfo("Another thread is loading %s, waiting for it to finish...".format(key)) while (loading.contains(key)) { try {loading.wait()} catch {case _ : Throwable =>} } - logInfo("Loading no longer contains " + key + ", so returning cached result") + logInfo("Finished waiting for %s".format(key)) // See whether someone else has successfully loaded it. The main way this would fail // is for the RDD-level cache eviction policy if someone else has loaded the same RDD // partition but we didn't want to make space for it. However, that case is unlikely @@ -59,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return values.asInstanceOf[Iterator[T]] case None => - logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") + logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) loading.add(key) } } else { @@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 495a72db69..37d0ddb17b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -523,7 +523,17 @@ private[spark] class BlockManager( * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /** |