aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-17 13:36:43 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-17 13:36:43 -0800
commit445a755b884885b88c1778fd56a3151045b0b0ed (patch)
treee36607b0aedc8040fa1946f364ceba85aadbcf68 /core
parentde4836f8f12c36c1b350cef288a75b5e59155735 (diff)
downloadspark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.gz
spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.bz2
spark-445a755b884885b88c1778fd56a3151045b0b0ed.zip
[SPARK-4172] [PySpark] Progress API in Python
This patch bring the pull based progress API into Python, also a example in Python. Author: Davies Liu <davies@databricks.com> Closes #3027 from davies/progress_api and squashes the following commits: b1ba984 [Davies Liu] fix style d3b9253 [Davies Liu] add tests, mute the exception after stop 4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 25590c9 [Davies Liu] update with Java API 360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 023afb3 [Davies Liu] add Python API and example for progress API
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala40
1 files changed, 23 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 774f3d8cdb..3938580aee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.nio.ByteBuffer
+import java.util.concurrent.RejectedExecutionException
import scala.language.existentials
import scala.util.control.NonFatal
@@ -95,25 +96,30 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
serializedData: ByteBuffer) {
var reason : TaskEndReason = UnknownReason
- getTaskResultExecutor.execute(new Runnable {
- override def run(): Unit = Utils.logUncaughtExceptions {
- try {
- if (serializedData != null && serializedData.limit() > 0) {
- reason = serializer.get().deserialize[TaskEndReason](
- serializedData, Utils.getSparkClassLoader)
+ try {
+ getTaskResultExecutor.execute(new Runnable {
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ try {
+ if (serializedData != null && serializedData.limit() > 0) {
+ reason = serializer.get().deserialize[TaskEndReason](
+ serializedData, Utils.getSparkClassLoader)
+ }
+ } catch {
+ case cnd: ClassNotFoundException =>
+ // Log an error but keep going here -- the task failed, so not catastrophic
+ // if we can't deserialize the reason.
+ val loader = Utils.getContextOrSparkClassLoader
+ logError(
+ "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
+ case ex: Exception => {}
}
- } catch {
- case cnd: ClassNotFoundException =>
- // Log an error but keep going here -- the task failed, so not catastrophic if we can't
- // deserialize the reason.
- val loader = Utils.getContextOrSparkClassLoader
- logError(
- "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
- case ex: Exception => {}
+ scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
- scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
- }
- })
+ })
+ } catch {
+ case e: RejectedExecutionException if sparkEnv.isStopped =>
+ // ignore it
+ }
}
def stop() {