aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-04-29 01:07:26 -0700
committerReynold Xin <rxin@databricks.com>2015-04-29 01:07:26 -0700
commitf49284b5bf3a69ed91a5e3e6e0ed3be93a6ab9e4 (patch)
tree64c1145ffc8a444d5f8865e055d0161e0041bb44 /core
parent1fd6ed9a56ac4671f4a3d25a42823ba3bf01f60f (diff)
downloadspark-f49284b5bf3a69ed91a5e3e6e0ed3be93a6ab9e4.tar.gz
spark-f49284b5bf3a69ed91a5e3e6e0ed3be93a6ab9e4.tar.bz2
spark-f49284b5bf3a69ed91a5e3e6e0ed3be93a6ab9e4.zip
[SPARK-7076][SPARK-7077][SPARK-7080][SQL] Use managed memory for aggregations
This patch adds managed-memory-based aggregation to Spark SQL / DataFrames. Instead of working with Java objects, this new aggregation path uses `sun.misc.Unsafe` to manipulate raw memory. This reduces the memory footprint for aggregations, resulting in fewer spills, OutOfMemoryErrors, and garbage collection pauses. As a result, this allows for higher memory utilization. It can also result in better cache locality since objects will be stored closer together in memory. This feature can be eanbled by setting `spark.sql.unsafe.enabled=true`. For now, this feature is only supported when codegen is enabled and only supports aggregations for which the grouping columns are primitive numeric types or strings and aggregated values are numeric. ### Managing memory with sun.misc.Unsafe This patch supports both on- and off-heap managed memory. - In on-heap mode, memory addresses are identified by the combination of a base Object and an offset within that object. - In off-heap mode, memory is addressed directly with 64-bit long addresses. To support both modes, functions that manipulate memory accept both `baseObject` and `baseOffset` fields. In off-heap mode, we simply pass `null` as `baseObject`. We allocate memory in large chunks, so memory fragmentation and allocation speed are not significant bottlenecks. By default, we use on-heap mode. To enable off-heap mode, set `spark.unsafe.offHeap=true`. To track allocated memory, this patch extends `SparkEnv` with an `ExecutorMemoryManager` and supplies each `TaskContext` with a `TaskMemoryManager`. These classes work together to track allocations and detect memory leaks. ### Compact tuple format This patch introduces `UnsafeRow`, a compact row layout. In this format, each tuple has three parts: a null bit set, fixed length values, and variable-length values: ![image](https://cloud.githubusercontent.com/assets/50748/7328538/2fdb65ce-ea8b-11e4-9743-6c0f02bb7d1f.png) - Rows are always 8-byte word aligned (so their sizes will always be a multiple of 8 bytes) - The bit set is used for null tracking: - Position _i_ is set if and only if field _i_ is null - The bit set is aligned to an 8-byte word boundary. - Every field appears as an 8-byte word in the fixed-length values part: - If a field is null, we zero out the values. - If a field is variable-length, the word stores a relative offset (w.r.t. the base of the tuple) that points to the beginning of the field's data in the variable-length part. - Each variable-length data type can have its own encoding: - For strings, the first word stores the length of the string and is followed by UTF-8 encoded bytes. If necessary, the end of the string is padded with empty bytes in order to ensure word-alignment. For example, a tuple that consists 3 fields of type (int, string, string), with value (null, “data”, “bricks”) would look like this: ![image](https://cloud.githubusercontent.com/assets/50748/7328526/1e21959c-ea8b-11e4-9a28-a4350fe4a7b5.png) This format allows us to compare tuples for equality by directly comparing their raw bytes. This also enables fast hashing of tuples. ### Hash map for performing aggregations This patch introduces `UnsafeFixedWidthAggregationMap`, a hash map for performing aggregations where the aggregation result columns are fixed-with. This map's keys and values are `Row` objects. `UnsafeFixedWidthAggregationMap` is implemented on top of `BytesToBytesMap`, an append-only map which supports byte-array keys and values. `BytesToBytesMap` stores pointers to key and value tuples. For each record with a new key, we copy the key and create the aggregation value buffer for that key and put them in a buffer. The hash table then simply stores pointers to the key and value. For each record with an existing key, we simply run the aggregation function to update the values in place. This map is implemented using open hashing with triangular sequence probing. Each entry stores two words in a long array: the first word stores the address of the key and the second word stores the relative offset from the key tuple to the value tuple, as well as the key's 32-bit hashcode. By storing the full hashcode, we reduce the number of equality checks that need to be performed to handle position collisions ()since the chance of hashcode collision is much lower than position collision). `UnsafeFixedWidthAggregationMap` allows regular Spark SQL `Row` objects to be used when probing the map. Internally, it encodes these rows into `UnsafeRow` format using `UnsafeRowConverter`. This conversion has a small overhead that can be eliminated in the future once we use UnsafeRows in other operators. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5725) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5725 from JoshRosen/unsafe and squashes the following commits: eeee512 [Josh Rosen] Add converters for Null, Boolean, Byte, and Short columns. 81f34f8 [Josh Rosen] Follow 'place children last' convention for GeneratedAggregate 1bc36cc [Josh Rosen] Refactor UnsafeRowConverter to avoid unnecessary boxing. 017b2dc [Josh Rosen] Remove BytesToBytesMap.finalize() 50e9671 [Josh Rosen] Throw memory leak warning even in case of error; add warning about code duplication 70a39e4 [Josh Rosen] Split MemoryManager into ExecutorMemoryManager and TaskMemoryManager: 6e4b192 [Josh Rosen] Remove an unused method from ByteArrayMethods. de5e001 [Josh Rosen] Fix debug vs. trace in logging message. a19e066 [Josh Rosen] Rename unsafe Java test suites to match Scala test naming convention. 78a5b84 [Josh Rosen] Add logging to MemoryManager ce3c565 [Josh Rosen] More comments, formatting, and code cleanup. 529e571 [Josh Rosen] Measure timeSpentResizing in nanoseconds instead of milliseconds. 3ca84b2 [Josh Rosen] Only zero the used portion of groupingKeyConversionScratchSpace 162caf7 [Josh Rosen] Fix test compilation b45f070 [Josh Rosen] Don't redundantly store the offset from key to value, since we can compute this from the key size. a8e4a3f [Josh Rosen] Introduce MemoryManager interface; add to SparkEnv. 0925847 [Josh Rosen] Disable MiMa checks for new unsafe module cde4132 [Josh Rosen] Add missing pom.xml 9c19fc0 [Josh Rosen] Add configuration options for heap vs. offheap 6ffdaa1 [Josh Rosen] Null handling improvements in UnsafeRow. 31eaabc [Josh Rosen] Lots of TODO and doc cleanup. a95291e [Josh Rosen] Cleanups to string handling code afe8dca [Josh Rosen] Some Javadoc cleanup f3dcbfe [Josh Rosen] More mod replacement 854201a [Josh Rosen] Import and comment cleanup 06e929d [Josh Rosen] More warning cleanup ef6b3d3 [Josh Rosen] Fix a bunch of FindBugs and IntelliJ inspections 29a7575 [Josh Rosen] Remove debug logging 49aed30 [Josh Rosen] More long -> int conversion. b26f1d3 [Josh Rosen] Fix bug in murmur hash implementation. 765243d [Josh Rosen] Enable optional performance metrics for hash map. 23a440a [Josh Rosen] Bump up default hash map size 628f936 [Josh Rosen] Use ints intead of longs for indexing. 92d5a06 [Josh Rosen] Address a number of minor code review comments. 1f4b716 [Josh Rosen] Merge Unsafe code into the regular GeneratedAggregate, guarded by a configuration flag; integrate planner support and re-enable all tests. d85eeff [Josh Rosen] Add basic sanity test for UnsafeFixedWidthAggregationMap bade966 [Josh Rosen] Comment update (bumping to refresh GitHub cache...) b3eaccd [Josh Rosen] Extract aggregation map into its own class. d2bb986 [Josh Rosen] Update to implement new Row methods added upstream 58ac393 [Josh Rosen] Use UNSAFE allocator in GeneratedAggregate (TODO: make this configurable) 7df6008 [Josh Rosen] Optimizations related to zeroing out memory: c1b3813 [Josh Rosen] Fix bug in UnsafeMemoryAllocator.free(): 738fa33 [Josh Rosen] Add feature flag to guard UnsafeGeneratedAggregate c55bf66 [Josh Rosen] Free buffer once iterator has been fully consumed. 62ab054 [Josh Rosen] Optimize for fact that get() is only called on String columns. c7f0b56 [Josh Rosen] Reuse UnsafeRow pointer in UnsafeRowConverter ae39694 [Josh Rosen] Add finalizer as "cleanup method of last resort" c754ae1 [Josh Rosen] Now that the store*() contract has been stregthened, we can remove an extra lookup f764d13 [Josh Rosen] Simplify address + length calculation in Location. 079f1bf [Josh Rosen] Some clarification of the BytesToBytesMap.lookup() / set() contract. 1a483c5 [Josh Rosen] First version that passes some aggregation tests: fc4c3a8 [Josh Rosen] Sketch how the converters will be used in UnsafeGeneratedAggregate 53ba9b7 [Josh Rosen] Start prototyping Java Row -> UnsafeRow converters 1ff814d [Josh Rosen] Add reminder to free memory on iterator completion 8a8f9df [Josh Rosen] Add skeleton for GeneratedAggregate integration. 5d55cef [Josh Rosen] Add skeleton for Row implementation. f03e9c1 [Josh Rosen] Play around with Unsafe implementations of more string methods. ab68e08 [Josh Rosen] Begin merging the UTF8String implementations. 480a74a [Josh Rosen] Initial import of code from Databricks unsafe utils repo.
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala16
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala6
12 files changed, 87 insertions, 15 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 459ef66712..2dfb00d7ec 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -96,6 +96,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 959aefabd8..0c4d28f786 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,6 +40,7 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
+import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
import org.apache.spark.util.{RpcUtils, Utils}
/**
@@ -69,6 +70,7 @@ class SparkEnv (
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
+ val executorMemoryManager: ExecutorMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
@@ -382,6 +384,15 @@ object SparkEnv extends Logging {
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
+ val executorMemoryManager: ExecutorMemoryManager = {
+ val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
+ MemoryAllocator.UNSAFE
+ } else {
+ MemoryAllocator.HEAP
+ }
+ new ExecutorMemoryManager(allocator)
+ }
+
val envInstance = new SparkEnv(
executorId,
rpcEnv,
@@ -398,6 +409,7 @@ object SparkEnv extends Logging {
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
+ executorMemoryManager,
outputCommitCoordinator,
conf)
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 7d7fe1a446..d09e17dea0 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -21,6 +21,7 @@ import java.io.Serializable
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.TaskCompletionListener
@@ -133,4 +134,9 @@ abstract class TaskContext extends Serializable {
/** ::DeveloperApi:: */
@DeveloperApi
def taskMetrics(): TaskMetrics
+
+ /**
+ * Returns the manager for this task's managed memory.
+ */
+ private[spark] def taskMemoryManager(): TaskMemoryManager
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 337c8e4ebe..b4d572cb52 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
import scala.collection.mutable.ArrayBuffer
@@ -27,6 +28,7 @@ private[spark] class TaskContextImpl(
val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
+ override val taskMemoryManager: TaskMemoryManager,
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f57e215c3f..dd1c48e6cb 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
+import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
/**
@@ -178,6 +179,7 @@ private[spark] class Executor(
}
override def run(): Unit = {
+ val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
@@ -190,6 +192,7 @@ private[spark] class Executor(
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
+ task.setTaskMemoryManager(taskMemoryManager)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
@@ -206,7 +209,21 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
- val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
+ val value = try {
+ task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
+ } finally {
+ // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
+ // when changing this, make sure to update both copies.
+ val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
+ if (freedMemory > 0) {
+ val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
+ if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
+ throw new SparkException(errMsg)
+ } else {
+ logError(errMsg)
+ }
+ }
+ }
val taskFinish = System.currentTimeMillis()
// If the task has been killed, let's fail it.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c4bff4e83..b7901c06a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,6 +34,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
+import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -643,8 +644,15 @@ class DAGScheduler(
try {
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
- val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskAttemptId = 0,
- attemptNumber = 0, runningLocally = true)
+ val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
+ val taskContext =
+ new TaskContextImpl(
+ job.finalStage.id,
+ job.partitions(0),
+ taskAttemptId = 0,
+ attemptNumber = 0,
+ taskMemoryManager = taskMemoryManager,
+ runningLocally = true)
TaskContext.setTaskContext(taskContext)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
@@ -652,6 +660,16 @@ class DAGScheduler(
} finally {
taskContext.markTaskCompleted()
TaskContext.unset()
+ // Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
+ // make sure to update both copies.
+ val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
+ if (freedMemory > 0) {
+ if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
+ throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
+ } else {
+ logError(s"Managed memory leak detected; size = $freedMemory bytes")
+ }
+ }
}
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index b09b19e2ac..586d1e0620 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.{TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.Utils
@@ -52,8 +53,13 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
* @return the result of the task
*/
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
- context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
- taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
+ context = new TaskContextImpl(
+ stageId = stageId,
+ partitionId = partitionId,
+ taskAttemptId = taskAttemptId,
+ attemptNumber = attemptNumber,
+ taskMemoryManager = taskMemoryManager,
+ runningLocally = false)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
@@ -68,6 +74,12 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
}
}
+ private var taskMemoryManager: TaskMemoryManager = _
+
+ def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = {
+ this.taskMemoryManager = taskMemoryManager
+ }
+
def runTask(context: TaskContext): T
def preferredLocations: Seq[TaskLocation] = Nil
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 8a4f2a08fe..34ac9361d4 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1009,7 +1009,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- TaskContext context = new TaskContextImpl(0, 0, 0L, 0, false, new TaskMetrics());
+ TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, new TaskMetrics());
Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
}
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 70529d9216..668ddf9f5f 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -65,7 +65,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
// in blockManager.put is a losing battle. You have been warned.
blockManager = sc.env.blockManager
cacheManager = sc.env.cacheManager
- val context = new TaskContextImpl(0, 0, 0, 0)
+ val context = new TaskContextImpl(0, 0, 0, 0, null)
val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
assert(computeValue.toList === List(1, 2, 3, 4))
@@ -77,7 +77,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
- val context = new TaskContextImpl(0, 0, 0, 0)
+ val context = new TaskContextImpl(0, 0, 0, 0, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
@@ -86,14 +86,14 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
// Local computation should not persist the resulting value, so don't expect a put().
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
- val context = new TaskContextImpl(0, 0, 0, 0, true)
+ val context = new TaskContextImpl(0, 0, 0, 0, null, true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
- val context = new TaskContextImpl(0, 0, 0, 0)
+ val context = new TaskContextImpl(0, 0, 0, 0, null)
cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index aea76c1adc..85eb2a1d07 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -176,7 +176,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
- val tContext = new TaskContextImpl(0, 0, 0, 0)
+ val tContext = new TaskContextImpl(0, 0, 0, 0, null)
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 057e226916..83ae870124 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -51,7 +51,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
}
test("all TaskCompletionListeners should be called even if some fail") {
- val context = new TaskContextImpl(0, 0, 0, 0)
+ val context = new TaskContextImpl(0, 0, 0, 0, null)
val listener = mock(classOf[TaskCompletionListener])
context.addTaskCompletionListener(_ => throw new Exception("blah"))
context.addTaskCompletionListener(listener)
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 37b593b2c5..2080c432d7 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -89,7 +89,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
)
val iterator = new ShuffleBlockFetcherIterator(
- new TaskContextImpl(0, 0, 0, 0),
+ new TaskContextImpl(0, 0, 0, 0, null),
transfer,
blockManager,
blocksByAddress,
@@ -154,7 +154,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))
- val taskContext = new TaskContextImpl(0, 0, 0, 0)
+ val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,
@@ -217,7 +217,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))
- val taskContext = new TaskContextImpl(0, 0, 0, 0)
+ val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,