aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-07 20:45:58 -0700
committerReynold Xin <rxin@apache.org>2013-10-07 20:45:58 -0700
commitea34c521025d3408d44d45ab5c132fd9791794f6 (patch)
tree2eff6ba6d8055932179e71e63ed6e2dbf954e9e9
parent02f37ee8530bcd5afefe403147bbf411464fa773 (diff)
parent8b377718b85d31fe5b0efb0ad77a8f38ffcede89 (diff)
downloadspark-ea34c521025d3408d44d45ab5c132fd9791794f6.tar.gz
spark-ea34c521025d3408d44d45ab5c132fd9791794f6.tar.bz2
spark-ea34c521025d3408d44d45ab5c132fd9791794f6.zip
Merge pull request #42 from pwendell/shuffle-read-perf
Fix inconsistent and incorrect log messages in shuffle read path The user-facing messages generated by the CacheManager are currently wrong and somewhat misleading. This patch makes the messages more accurate. It also uses a consistent representation of the partition being fetched (`rdd_xx_yy`) so that it's easier for users to trace what is going on when reading logs.
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala12
2 files changed, 19 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3d36761cda..4cf7eb96da 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -34,22 +34,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
- logInfo("Cache key is " + key)
+ logDebug("Looking for partition " + key)
blockManager.get(key) match {
- case Some(cachedValues) =>
- // Partition is in cache, so just return its values
- logInfo("Found partition in cache!")
- return cachedValues.asInstanceOf[Iterator[T]]
+ case Some(values) =>
+ // Partition is already materialized, so just return its values
+ return values.asInstanceOf[Iterator[T]]
case None =>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
- logInfo("Loading contains " + key + ", waiting...")
+ logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
while (loading.contains(key)) {
try {loading.wait()} catch {case _ : Throwable =>}
}
- logInfo("Loading no longer contains " + key + ", so returning cached result")
+ logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
@@ -59,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
return values.asInstanceOf[Iterator[T]]
case None =>
- logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
+ logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
loading.add(key)
}
} else {
@@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
- logInfo("Computing partition " + split)
+ logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 495a72db69..37d0ddb17b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -523,7 +523,17 @@ private[spark] class BlockManager(
* Get a block from the block manager (either local or remote).
*/
def get(blockId: String): Option[Iterator[Any]] = {
- getLocal(blockId).orElse(getRemote(blockId))
+ val local = getLocal(blockId)
+ if (local.isDefined) {
+ logInfo("Found block %s locally".format(blockId))
+ return local
+ }
+ val remote = getRemote(blockId)
+ if (remote.isDefined) {
+ logInfo("Found block %s remotely".format(blockId))
+ return remote
+ }
+ None
}
/**