aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-28 21:05:03 -0700
committerReynold Xin <rxin@apache.org>2014-06-28 21:05:03 -0700
commit2053d793cc2e8e5f5776e6576ddc6f8e6168e60c (patch)
tree8039adfc4d5658f012415c9f458cc422dca6ffc2 /core
parent3c104c79d24425786cec0034f269ba19cf465b31 (diff)
downloadspark-2053d793cc2e8e5f5776e6576ddc6f8e6168e60c.tar.gz
spark-2053d793cc2e8e5f5776e6576ddc6f8e6168e60c.tar.bz2
spark-2053d793cc2e8e5f5776e6576ddc6f8e6168e60c.zip
Improve MapOutputTracker error logging.
Author: Reynold Xin <rxin@apache.org> Closes #1258 from rxin/mapOutputTracker and squashes the following commits: a7c95b6 [Reynold Xin] Improve MapOutputTracker error logging.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala17
1 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 182abacc47..8940917614 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -26,10 +26,10 @@ import scala.concurrent.Await
import akka.actor._
import akka.pattern.ask
-import org.apache.spark.util._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util._
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
Await.result(future, timeout)
} catch {
case e: Exception =>
+ logError("Error communicating with MapOutputTracker", e)
throw new SparkException("Error communicating with MapOutputTracker", e)
}
}
/** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
protected def sendTracker(message: Any) {
- if (askTracker(message) != true) {
- throw new SparkException("Error reply received from MapOutputTracker")
+ val response = askTracker(message)
+ if (response != true) {
+ throw new SparkException(
+ "Error reply received from MapOutputTracker. Expecting true, got " + response.toString)
}
}
@@ -366,9 +369,9 @@ private[spark] object MapOutputTracker {
// any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException.
private def convertMapStatuses(
- shuffleId: Int,
- reduceId: Int,
- statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+ shuffleId: Int,
+ reduceId: Int,
+ statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
status =>
@@ -403,7 +406,7 @@ private[spark] object MapOutputTracker {
if (compressedSize == 0) {
0
} else {
- math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
+ math.pow(LOG_BASE, compressedSize & 0xFF).toLong
}
}
}