aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorwangda.tan <wheeleast@gmail.com>2013-12-17 17:57:27 +0800
committerwangda.tan <wheeleast@gmail.com>2013-12-17 17:57:27 +0800
commit59e53fa21caa202a57093c74ada128fca2be5bac (patch)
tree1eb0bd8b005dd4019637a0d0ca97335e4260be58 /core/src/main/scala/org/apache
parent36060f4f50ead2632117bb12e8c5bc1fb4f91f1e (diff)
downloadspark-59e53fa21caa202a57093c74ada128fca2be5bac.tar.gz
spark-59e53fa21caa202a57093c74ada128fca2be5bac.tar.bz2
spark-59e53fa21caa202a57093c74ada128fca2be5bac.zip
spark-968, changes for avoid a NPE
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala24
2 files changed, 29 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index 808bbe8c8f..f62ae37466 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -150,7 +150,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks += taskStart.taskInfo
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
@@ -168,20 +168,22 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
// update shuffle read/write
- val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
- shuffleRead match {
- case Some(s) =>
- val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
- executorToShuffleRead.put(eid, newShuffleRead)
- case _ => {}
- }
- val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
- shuffleWrite match {
- case Some(s) => {
- val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
- executorToShuffleWrite.put(eid, newShuffleWrite)
+ if (null != taskEnd.taskMetrics) {
+ val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
+ shuffleRead match {
+ case Some(s) =>
+ val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
+ executorToShuffleRead.put(eid, newShuffleRead)
+ case _ => {}
+ }
+ val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
+ shuffleWrite match {
+ case Some(s) => {
+ val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
+ executorToShuffleWrite.put(eid, newShuffleWrite)
+ }
+ case _ => {}
}
- case _ => {}
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 8c92ff19a6..64ce715993 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -147,18 +147,20 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
y.duration += taskEnd.taskInfo.duration
// update shuffle read/write
- val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
- shuffleRead match {
- case Some(s) =>
- y.shuffleRead += s.remoteBytesRead
- case _ => {}
- }
- val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
- shuffleWrite match {
- case Some(s) => {
- y.shuffleWrite += s.shuffleBytesWritten
+ if (null != taskEnd.taskMetrics) {
+ val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
+ shuffleRead match {
+ case Some(s) =>
+ y.shuffleRead += s.remoteBytesRead
+ case _ => {}
+ }
+ val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
+ shuffleWrite match {
+ case Some(s) => {
+ y.shuffleWrite += s.shuffleBytesWritten
+ }
+ case _ => {}
}
- case _ => {}
}
}
case _ => {}