aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-26 21:12:16 -0700
committerReynold Xin <rxin@apache.org>2014-06-26 21:12:16 -0700
commitbf578deaf2493081ceeb78dfd7617def5699a06e (patch)
treee0ef5fae0ee5a8b040c4594515e730867f6fd429 /core
parent981bde9b056ef5e91aed553e0b5930f12e1ff797 (diff)
downloadspark-bf578deaf2493081ceeb78dfd7617def5699a06e.tar.gz
spark-bf578deaf2493081ceeb78dfd7617def5699a06e.tar.bz2
spark-bf578deaf2493081ceeb78dfd7617def5699a06e.zip
Removed throwable field from FetchFailedException and added MetadataFetchFailedException
FetchFailedException used to have a Throwable field, but in reality we never propagate any of the throwable/exceptions back to the driver because Executor explicitly looks for FetchFailedException and then sends FetchFailed as the TaskEndReason. This pull request removes the throwable and adds a MetadataFetchFailedException that extends FetchFailedException (so now MapOutputTracker throws MetadataFetchFailedException instead). Author: Reynold Xin <rxin@apache.org> Closes #1227 from rxin/metadataFetchException and squashes the following commits: 5cb1e0a [Reynold Xin] MetadataFetchFailedException extends FetchFailedException. 8861ee2 [Reynold Xin] Throw MetadataFetchFailedException in MapOutputTracker.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala (renamed from core/src/main/scala/org/apache/spark/FetchFailedException.scala)43
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala1
7 files changed, 42 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ee82d9fa78..182abacc47 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -25,9 +25,11 @@ import scala.concurrent.Await
import akka.actor._
import akka.pattern.ask
+
+import org.apache.spark.util._
import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util._
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing all output locations for shuffle " + shuffleId))
+ throw new MetadataFetchFailedException(
+ shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
@@ -371,8 +373,8 @@ private[spark] object MapOutputTracker {
statuses.map {
status =>
if (status == null) {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing an output location for shuffle " + shuffleId))
+ throw new MetadataFetchFailedException(
+ shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 5e8bd8c8e5..df42d679b4 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
- bmAddress: BlockManagerId,
+ bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 557b9a3f46..4d3ba11633 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,8 +26,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 1481d70db4..4c96b9e5fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -21,6 +21,10 @@ import java.nio.ByteBuffer
import org.apache.spark.util.SerializableBuffer
+/**
+ * Description of a task that gets passed onto executors to be executed, usually created by
+ * [[TaskSetManager.resourceOffer]].
+ */
private[spark] class TaskDescription(
val taskId: Long,
val executorId: String,
diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index 8eaa26bdb1..71c08e9d5a 100644
--- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -15,31 +15,38 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.shuffle
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.{FetchFailed, TaskEndReason}
+/**
+ * Failed to fetch a shuffle block. The executor catches this exception and propagates it
+ * back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage.
+ *
+ * Note that bmAddress can be null.
+ */
private[spark] class FetchFailedException(
- taskEndReason: TaskEndReason,
- message: String,
- cause: Throwable)
+ bmAddress: BlockManagerId,
+ shuffleId: Int,
+ mapId: Int,
+ reduceId: Int)
extends Exception {
- def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
- cause: Throwable) =
- this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
- "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
- cause)
-
- def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
- this(FetchFailed(null, shuffleId, -1, reduceId),
- "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
-
- override def getMessage(): String = message
+ override def getMessage: String =
+ "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
+ def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
+}
- override def getCause(): Throwable = cause
-
- def toTaskEndReason: TaskEndReason = taskEndReason
+/**
+ * Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]].
+ */
+private[spark] class MetadataFetchFailedException(
+ shuffleId: Int,
+ reduceId: Int,
+ message: String)
+ extends FetchFailedException(null, shuffleId, -1, reduceId) {
+ override def getMessage: String = message
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index b05b6ea345..a932455776 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import org.apache.spark._
import org.apache.spark.executor.ShuffleReadMetrics
import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
-import org.apache.spark._
private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
@@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
- throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
+ throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 95ba273f16..9702838085 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -24,6 +24,7 @@ import akka.testkit.TestActorRef
import org.scalatest.FunSuite
import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils