aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-01-13 21:02:54 -0800
committerReynold Xin <rxin@databricks.com>2016-01-13 21:02:54 -0800
commite2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984 (patch)
tree1a8f377d53cdfc3f91f52d8085fb011e6a12f20d /core
parent20d8ef858af6e13db59df118b562ea33cba5464d (diff)
downloadspark-e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984.tar.gz
spark-e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984.tar.bz2
spark-e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984.zip
[SPARK-12819] Deprecate TaskContext.isRunningLocally()
We've already removed local execution but didn't deprecate `TaskContext.isRunningLocally()`; we should deprecate it for 2.0. Author: Josh Rosen <joshrosen@databricks.com> Closes #10751 from JoshRosen/remove-local-exec-from-taskcontext.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala9
5 files changed, 4 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 4d20c73693..36b536e89c 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -68,11 +68,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
- // If the task is running locally, do not persist the result
- if (context.isRunningLocally) {
- return computedValues
- }
-
// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index e25ed0fdd7..7704abc134 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -97,8 +97,9 @@ abstract class TaskContext extends Serializable {
/**
* Returns true if the task is running locally in the driver program.
- * @return
+ * @return false
*/
+ @deprecated("Local execution was removed, so this always returns false", "2.0.0")
def isRunningLocally(): Boolean
/**
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 6c49363099..94ff884b74 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -33,7 +33,6 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
internalAccumulators: Seq[Accumulator[Long]],
- val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
with Logging {
@@ -85,7 +84,7 @@ private[spark] class TaskContextImpl(
override def isCompleted(): Boolean = completed
- override def isRunningLocally(): Boolean = runningLocally
+ override def isRunningLocally(): Boolean = false
override def isInterrupted(): Boolean = interrupted
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 0379ca2af6..fca57928ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -74,8 +74,7 @@ private[spark] abstract class Task[T](
attemptNumber,
taskMemoryManager,
metricsSystem,
- internalAccumulators,
- runningLocally = false)
+ internalAccumulators)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index cb8bd04e49..30aa94c8a5 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -82,15 +82,6 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
assert(value.toList === List(5, 6, 7))
}
- test("get uncached local rdd") {
- // Local computation should not persist the resulting value, so don't expect a put().
- when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
-
- val context = new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty, runningLocally = true)
- val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- assert(value.toList === List(1, 2, 3, 4))
- }
-
test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
val context = TaskContext.empty()