diff options
author | Reynold Xin <rxin@apache.org> | 2014-06-28 21:05:03 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-28 21:05:03 -0700 |
commit | 2053d793cc2e8e5f5776e6576ddc6f8e6168e60c (patch) | |
tree | 8039adfc4d5658f012415c9f458cc422dca6ffc2 | |
parent | 3c104c79d24425786cec0034f269ba19cf465b31 (diff) | |
download | spark-2053d793cc2e8e5f5776e6576ddc6f8e6168e60c.tar.gz spark-2053d793cc2e8e5f5776e6576ddc6f8e6168e60c.tar.bz2 spark-2053d793cc2e8e5f5776e6576ddc6f8e6168e60c.zip |
Improve MapOutputTracker error logging.
Author: Reynold Xin <rxin@apache.org>
Closes #1258 from rxin/mapOutputTracker and squashes the following commits:
a7c95b6 [Reynold Xin] Improve MapOutputTracker error logging.
-rw-r--r-- | core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 182abacc47..8940917614 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -26,10 +26,10 @@ import scala.concurrent.Await import akka.actor._ import akka.pattern.ask -import org.apache.spark.util._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util._ private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int) @@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging Await.result(future, timeout) } catch { case e: Exception => + logError("Error communicating with MapOutputTracker", e) throw new SparkException("Error communicating with MapOutputTracker", e) } } /** Send a one-way message to the trackerActor, to which we expect it to reply with true. */ protected def sendTracker(message: Any) { - if (askTracker(message) != true) { - throw new SparkException("Error reply received from MapOutputTracker") + val response = askTracker(message) + if (response != true) { + throw new SparkException( + "Error reply received from MapOutputTracker. Expecting true, got " + response.toString) } } @@ -366,9 +369,9 @@ private[spark] object MapOutputTracker { // any of the statuses is null (indicating a missing location due to a failed mapper), // throw a FetchFailedException. private def convertMapStatuses( - shuffleId: Int, - reduceId: Int, - statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { + shuffleId: Int, + reduceId: Int, + statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { assert (statuses != null) statuses.map { status => @@ -403,7 +406,7 @@ private[spark] object MapOutputTracker { if (compressedSize == 0) { 0 } else { - math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong + math.pow(LOG_BASE, compressedSize & 0xFF).toLong } } } |