aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-11-02 23:20:22 -0800
committerAaron Davidson <aaron@databricks.com>2014-11-02 23:20:22 -0800
commit76386e1a23c55a58c0aeea67820aab2bac71b24b (patch)
tree62d150c35362deebbe2d8762c32768baa0afeda4 /core
parent001acc446345ccb1e494af9ff1d16dd65db8034e (diff)
downloadspark-76386e1a23c55a58c0aeea67820aab2bac71b24b.tar.gz
spark-76386e1a23c55a58c0aeea67820aab2bac71b24b.tar.bz2
spark-76386e1a23c55a58c0aeea67820aab2bac71b24b.zip
[SPARK-4163][Core][WebUI] Send the fetch failure message back to Web UI
This is a PR to send the fetch failure message back to Web UI. Before: ![f1](https://cloud.githubusercontent.com/assets/1000778/4856595/1f036c80-60be-11e4-956f-335147fbccb7.png) ![f2](https://cloud.githubusercontent.com/assets/1000778/4856596/1f11cbea-60be-11e4-8fe9-9f9b2b35c884.png) After (Please ignore the meaning of exception, I threw it in the code directly because it's hard to simulate a fetch failure): ![e1](https://cloud.githubusercontent.com/assets/1000778/4856600/2657ea38-60be-11e4-9f2d-d56c5f900f10.png) ![e2](https://cloud.githubusercontent.com/assets/1000778/4856601/26595008-60be-11e4-912b-2744af786991.png) Author: zsxwing <zsxwing@gmail.com> Closes #3032 from zsxwing/SPARK-4163 and squashes the following commits: f7e1faf [zsxwing] Discard changes for FetchFailedException and minor modification 4e946f7 [zsxwing] Add e as the cause of SparkException 316767d [zsxwing] Add private[storage] to FetchResult d51b0b6 [zsxwing] Set e as the cause of FetchFailedException b88c919 [zsxwing] Use 'private[storage]' for case classes instead of 'sealed' 62103fd [zsxwing] Update as per review 0c07d1f [zsxwing] Backward-compatible support a3bca65 [zsxwing] Send the fetch failure message back to Web UI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala4
12 files changed, 92 insertions, 65 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 202fba699a..f45b463fb6 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -69,11 +69,13 @@ case class FetchFailed(
bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
- reduceId: Int)
+ reduceId: Int,
+ message: String)
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
- s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
+ s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
+ s"message=\n$message\n)"
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index af17b5d5d2..96114c0423 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1053,7 +1053,7 @@ class DAGScheduler(
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task
- case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+ case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)
@@ -1063,7 +1063,7 @@ class DAGScheduler(
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
- markStageAsFinished(failedStage, Some("Fetch failure"))
+ markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
runningStages -= failedStage
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 54904bffdf..4e3d9de540 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -215,7 +215,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + taskEnd.stageId
stageLogInfo(taskEnd.stageId, taskStatus)
- case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+ case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index 71c08e9d5a..0c1b6f4def 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -19,6 +19,7 @@ package org.apache.spark.shuffle
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{FetchFailed, TaskEndReason}
+import org.apache.spark.util.Utils
/**
* Failed to fetch a shuffle block. The executor catches this exception and propagates it
@@ -30,13 +31,11 @@ private[spark] class FetchFailedException(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
- reduceId: Int)
- extends Exception {
-
- override def getMessage: String =
- "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
+ reduceId: Int,
+ message: String)
+ extends Exception(message) {
- def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
+ def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
}
/**
@@ -46,7 +45,4 @@ private[spark] class MetadataFetchFailedException(
shuffleId: Int,
reduceId: Int,
message: String)
- extends FetchFailedException(null, shuffleId, -1, reduceId) {
-
- override def getMessage: String = message
-}
+ extends FetchFailedException(null, shuffleId, -1, reduceId, message)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index f49917b7fe..0d5247f417 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -19,12 +19,13 @@ package org.apache.spark.shuffle.hash
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
-import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.{CompletionIterator, Utils}
private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
@@ -52,21 +53,22 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}
- def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
+ def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
- case Some(block) => {
+ case Success(block) => {
block.asInstanceOf[Iterator[T]]
}
- case None => {
+ case Failure(e) => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
- throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
+ throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
+ Utils.exceptionString(e))
case _ =>
throw new SparkException(
- "Failed to get block " + blockId + ", which is not a shuffle block")
+ "Failed to get block " + blockId + ", which is not a shuffle block", e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index ee89c7e521..1e579187e4 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
+import scala.util.{Failure, Success, Try}
import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.network.BlockTransferService
@@ -55,7 +56,7 @@ final class ShuffleBlockFetcherIterator(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
maxBytesInFlight: Long)
- extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
+ extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging {
import ShuffleBlockFetcherIterator._
@@ -118,16 +119,18 @@ final class ShuffleBlockFetcherIterator(
private[this] def cleanup() {
isZombie = true
// Release the current buffer if necessary
- if (currentResult != null && !currentResult.failed) {
- currentResult.buf.release()
+ currentResult match {
+ case SuccessFetchResult(_, _, buf) => buf.release()
+ case _ =>
}
// Release buffers in the results queue
val iter = results.iterator()
while (iter.hasNext) {
val result = iter.next()
- if (!result.failed) {
- result.buf.release()
+ result match {
+ case SuccessFetchResult(_, _, buf) => buf.release()
+ case _ =>
}
}
}
@@ -151,7 +154,7 @@ final class ShuffleBlockFetcherIterator(
// Increment the ref count because we need to pass this to a different thread.
// This needs to be released after use.
buf.retain()
- results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), buf))
+ results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
shuffleMetrics.remoteBytesRead += buf.size
shuffleMetrics.remoteBlocksFetched += 1
}
@@ -160,7 +163,7 @@ final class ShuffleBlockFetcherIterator(
override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
- results.put(new FetchResult(BlockId(blockId), -1, null))
+ results.put(new FailureFetchResult(BlockId(blockId), e))
}
}
)
@@ -231,12 +234,12 @@ final class ShuffleBlockFetcherIterator(
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.localBlocksFetched += 1
buf.retain()
- results.put(new FetchResult(blockId, 0, buf))
+ results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
case e: Exception =>
// If we see an exception, stop immediately.
logError(s"Error occurred while fetching local blocks", e)
- results.put(new FetchResult(blockId, -1, null))
+ results.put(new FailureFetchResult(blockId, e))
return
}
}
@@ -267,15 +270,17 @@ final class ShuffleBlockFetcherIterator(
override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
- override def next(): (BlockId, Option[Iterator[Any]]) = {
+ override def next(): (BlockId, Try[Iterator[Any]]) = {
numBlocksProcessed += 1
val startFetchWait = System.currentTimeMillis()
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTimeMillis()
shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
- if (!result.failed) {
- bytesInFlight -= result.size
+
+ result match {
+ case SuccessFetchResult(_, size, _) => bytesInFlight -= size
+ case _ =>
}
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
@@ -283,20 +288,21 @@ final class ShuffleBlockFetcherIterator(
sendRequest(fetchRequests.dequeue())
}
- val iteratorOpt: Option[Iterator[Any]] = if (result.failed) {
- None
- } else {
- val is = blockManager.wrapForCompression(result.blockId, result.buf.createInputStream())
- val iter = serializer.newInstance().deserializeStream(is).asIterator
- Some(CompletionIterator[Any, Iterator[Any]](iter, {
- // Once the iterator is exhausted, release the buffer and set currentResult to null
- // so we don't release it again in cleanup.
- currentResult = null
- result.buf.release()
- }))
+ val iteratorTry: Try[Iterator[Any]] = result match {
+ case FailureFetchResult(_, e) => Failure(e)
+ case SuccessFetchResult(blockId, _, buf) => {
+ val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
+ val iter = serializer.newInstance().deserializeStream(is).asIterator
+ Success(CompletionIterator[Any, Iterator[Any]](iter, {
+ // Once the iterator is exhausted, release the buffer and set currentResult to null
+ // so we don't release it again in cleanup.
+ currentResult = null
+ buf.release()
+ }))
+ }
}
- (result.blockId, iteratorOpt)
+ (result.blockId, iteratorTry)
}
}
@@ -315,14 +321,30 @@ object ShuffleBlockFetcherIterator {
}
/**
- * Result of a fetch from a remote block. A failure is represented as size == -1.
+ * Result of a fetch from a remote block.
+ */
+ private[storage] sealed trait FetchResult {
+ val blockId: BlockId
+ }
+
+ /**
+ * Result of a fetch from a remote block successfully.
* @param blockId block id
* @param size estimated size of the block, used to calculate bytesInFlight.
- * Note that this is NOT the exact bytes. -1 if failure is present.
- * @param buf [[ManagedBuffer]] for the content. null is error.
+ * Note that this is NOT the exact bytes.
+ * @param buf [[ManagedBuffer]] for the content.
*/
- case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) {
- def failed: Boolean = size == -1
- if (failed) assert(buf == null) else assert(buf != null)
+ private[storage] case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer)
+ extends FetchResult {
+ require(buf != null)
+ require(size >= 0)
}
+
+ /**
+ * Result of a fetch from a remote block unsuccessfully.
+ * @param blockId block id
+ * @param e the failure exception
+ */
+ private[storage] case class FailureFetchResult(blockId: BlockId, e: Throwable)
+ extends FetchResult
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 43c7fba066..f7ae1f7f33 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -279,7 +279,8 @@ private[spark] object JsonProtocol {
("Block Manager Address" -> blockManagerAddress) ~
("Shuffle ID" -> fetchFailed.shuffleId) ~
("Map ID" -> fetchFailed.mapId) ~
- ("Reduce ID" -> fetchFailed.reduceId)
+ ("Reduce ID" -> fetchFailed.reduceId) ~
+ ("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
@@ -629,7 +630,9 @@ private[spark] object JsonProtocol {
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Int]
val reduceId = (json \ "Reduce ID").extract[Int]
- new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
+ val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
+ new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
+ message.getOrElse("Unknown reason"))
case `exceptionFailure` =>
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b402c5f334..a33046d204 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1597,7 +1597,7 @@ private[spark] object Utils extends Logging {
}
/** Return a nice string representation of the exception, including the stack trace. */
- def exceptionString(e: Exception): String = {
+ def exceptionString(e: Throwable): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index a2e4f712db..819f95634b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -431,7 +431,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
- (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("exec-hostA")
// ask the scheduler to try it again
@@ -461,7 +461,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
- FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
+ FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null,
Map[Long, Any](),
null,
@@ -472,7 +472,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
- FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
+ FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
null,
Map[Long, Any](),
null,
@@ -624,7 +624,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
(Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// have DAGScheduler try again
@@ -655,7 +655,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
(Success, makeMapStatus("hostB", 1))))
// pretend stage 0 failed because hostA went down
complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 28f766570e..1eaabb93ad 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -102,7 +102,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
for (i <- 0 until 5) {
assert(iterator.hasNext, s"iterator should have 5 elements but actually has $i elements")
val (blockId, subIterator) = iterator.next()
- assert(subIterator.isDefined,
+ assert(subIterator.isSuccess,
s"iterator should have 5 elements defined but actually has $i elements")
// Make sure we release the buffer once the iterator is exhausted.
@@ -230,8 +230,8 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
sem.acquire()
// The first block should be defined, and the last two are not defined (due to failure)
- assert(iterator.next()._2.isDefined === true)
- assert(iterator.next()._2.isDefined === false)
- assert(iterator.next()._2.isDefined === false)
+ assert(iterator.next()._2.isSuccess)
+ assert(iterator.next()._2.isFailure)
+ assert(iterator.next()._2.isFailure)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6567c5ab83..2efbae6897 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -115,7 +115,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// Go through all the failure cases to make sure we are counting them as failures.
val taskFailedReasons = Seq(
Resubmitted,
- new FetchFailed(null, 0, 0, 0),
+ new FetchFailed(null, 0, 0, 0, "ignored"),
new ExceptionFailure("Exception", "description", null, None),
TaskResultLost,
TaskKilled,
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d235d7a0ed..a91c9ddeae 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -107,7 +107,8 @@ class JsonProtocolSuite extends FunSuite {
testJobResult(jobFailed)
// TaskEndReason
- val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19)
+ val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
+ "Some exception")
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
@@ -396,6 +397,7 @@ class JsonProtocolSuite extends FunSuite {
assert(r1.mapId === r2.mapId)
assert(r1.reduceId === r2.reduceId)
assertEquals(r1.bmAddress, r2.bmAddress)
+ assert(r1.message === r2.message)
case (r1: ExceptionFailure, r2: ExceptionFailure) =>
assert(r1.className === r2.className)
assert(r1.description === r2.description)