diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-05-08 12:13:07 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2014-05-08 12:13:07 -0700 |
commit | c3f8b78c211df6c5adae74f37e39fb55baeff723 (patch) | |
tree | aedc2a8c201e2d697080b405af9820f932f68ecf | |
parent | 44dd57fb66bb676d753ad8d9757f9f4c03364113 (diff) | |
download | spark-c3f8b78c211df6c5adae74f37e39fb55baeff723.tar.gz spark-c3f8b78c211df6c5adae74f37e39fb55baeff723.tar.bz2 spark-c3f8b78c211df6c5adae74f37e39fb55baeff723.zip |
[SPARK-1745] Move interrupted flag from TaskContext constructor (minor)
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction.
This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget.
Author: Andrew Or <andrewor14@gmail.com>
Closes #675 from andrewor14/task-context and squashes the following commits:
9575e02 [Andrew Or] Add space
69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context
c471490 [Andrew Or] Oops, removed one flag too many. Adding it back.
85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor
5 files changed, 17 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index fc4812753d..51f40c339d 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -28,13 +28,12 @@ import org.apache.spark.executor.TaskMetrics */ @DeveloperApi class TaskContext( - val stageId: Int, - val partitionId: Int, - val attemptId: Long, - val runningLocally: Boolean = false, - @volatile var interrupted: Boolean = false, - private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty -) extends Serializable { + val stageId: Int, + val partitionId: Int, + val attemptId: Long, + val runningLocally: Boolean = false, + private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends Serializable { @deprecated("use partitionId", "0.8.1") def splitId = partitionId @@ -42,7 +41,10 @@ class TaskContext( // List of callback functions to execute when the task completes. @transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit] - // Set to true when the task is completed, before the onCompleteCallbacks are executed. + // Whether the corresponding task has been killed. + @volatile var interrupted: Boolean = false + + // Whether the task has completed, before the onCompleteCallbacks are executed. @volatile var completed: Boolean = false /** @@ -58,6 +60,6 @@ class TaskContext( def executeOnCompleteCallbacks() { completed = true // Process complete callbacks in the reverse order of registration - onCompleteCallbacks.reverse.foreach{_()} + onCompleteCallbacks.reverse.foreach { _() } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 2259df0b56..4b0324f2b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -23,7 +23,6 @@ import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.HashMap -import scala.util.Try import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics @@ -70,7 +69,7 @@ private[spark] object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = { + def deserializeFileSet(bytes: Array[Byte]): HashMap[String, Long] = { val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val objIn = new ObjectInputStream(in) val set = objIn.readObject().asInstanceOf[Array[(String, Long)]].toMap diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index c3e03cea91..1912015827 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -597,7 +597,7 @@ public class JavaAPISuite implements Serializable { @Test public void iterator() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContext(0, 0, 0, false, false, new TaskMetrics()); + TaskContext context = new TaskContext(0, 0, 0, false, new TaskMetrics()); Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); } diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index fd5b0906e6..4f178db40f 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -23,7 +23,6 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar import org.apache.spark.rdd.RDD -import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage._ // TODO: Test the CacheManager's thread-safety aspects @@ -59,8 +58,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, - taskMetrics = TaskMetrics.empty) + val context = new TaskContext(0, 0, 0) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) } @@ -72,8 +70,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, - taskMetrics = TaskMetrics.empty) + val context = new TaskContext(0, 0, 0) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(5, 6, 7)) } @@ -86,8 +83,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false, - taskMetrics = TaskMetrics.empty) + val context = new TaskContext(0, 0, 0, runningLocally = true) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) } diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 0bb6a6b09c..db56a4acdd 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -178,14 +178,12 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { } val hadoopPart1 = generateFakeHadoopPartition() val pipedRdd = new PipedRDD(nums, "printenv " + varName) - val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, - taskMetrics = TaskMetrics.empty) + val tContext = new TaskContext(0, 0, 0) val rddIter = pipedRdd.compute(hadoopPart1, tContext) val arr = rddIter.toArray assert(arr(0) == "/some/path") } else { // printenv isn't available so just pass the test - assert(true) } } |