aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-18 23:52:47 -0700
committerReynold Xin <rxin@apache.org>2014-07-18 23:52:47 -0700
commit7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2 (patch)
tree616eedb38efffeaffca1d2d583e07f2bdf3947ca /core/src/main/scala/org
parenta243364b225da9a91813234027eafedffc495ecc (diff)
downloadspark-7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2.tar.gz
spark-7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2.tar.bz2
spark-7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2.zip
[SPARK-2521] Broadcast RDD object (instead of sending it along with every task).
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin <rxin@apache.org> Closes #1452 from rxin/broadcast-task and squashes the following commits: 762e0be [Reynold Xin] Warn large broadcasts. ade6eac [Reynold Xin] Log broadcast size. c3b6f11 [Reynold Xin] Added a unit test for clean up. 754085f [Reynold Xin] Explain why broadcasting serialized copy of the task. 04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task).
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala128
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala125
7 files changed, 100 insertions, 226 deletions
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index 09a6057123..3935c87722 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -27,7 +27,9 @@ import org.apache.spark.shuffle.ShuffleHandle
* Base class for dependencies.
*/
@DeveloperApi
-abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
+abstract class Dependency[T] extends Serializable {
+ def rdd: RDD[T]
+}
/**
@@ -36,20 +38,24 @@ abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
-abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
+abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
+
+ override def rdd: RDD[T] = _rdd
}
/**
* :: DeveloperApi ::
- * Represents a dependency on the output of a shuffle stage.
- * @param rdd the parent RDD
+ * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
+ * the RDD is transient since we don't need it on the executor side.
+ *
+ * @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
@@ -57,20 +63,22 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
- @transient rdd: RDD[_ <: Product2[K, V]],
+ @transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
- extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
+ extends Dependency[Product2[K, V]] {
+
+ override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
- val shuffleId: Int = rdd.context.newShuffleId()
+ val shuffleId: Int = _rdd.context.newShuffleId()
- val shuffleHandle: ShuffleHandle = rdd.context.env.shuffleManager.registerShuffle(
- shuffleId, rdd.partitions.size, this)
+ val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
+ shuffleId, _rdd.partitions.size, this)
- rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+ _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8052499ab7..48a09657fd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -997,8 +997,6 @@ class SparkContext(config: SparkConf) extends Logging {
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
- ShuffleMapTask.clearCache()
- ResultTask.clearCache()
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 88a918aebf..2ee9a8f1a8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -35,12 +35,13 @@ import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
@@ -1195,21 +1196,36 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD has been checkpointed or not
*/
- def isCheckpointed: Boolean = {
- checkpointData.map(_.isCheckpointed).getOrElse(false)
- }
+ def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
/**
* Gets the name of the file to which this RDD was checkpointed
*/
- def getCheckpointFile: Option[String] = {
- checkpointData.flatMap(_.getCheckpointFile)
- }
+ def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile)
// =======================================================================
// Other internal methods and fields
// =======================================================================
+ /**
+ * Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast
+ * the serialized copy of the RDD and for each task we will deserialize it, which means each
+ * task gets a different copy of the RDD. This provides stronger isolation between tasks that
+ * might modify state of objects referenced in their closures. This is necessary in Hadoop
+ * where the JobConf/Configuration object is not thread-safe.
+ */
+ @transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = {
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val bytes = ser.serialize(this).array()
+ val size = Utils.bytesToString(bytes.length)
+ if (bytes.length > (1L << 20)) {
+ logWarning(s"Broadcasting RDD $id ($size), which contains large objects")
+ } else {
+ logDebug(s"Broadcasting RDD $id ($size)")
+ }
+ sc.broadcast(bytes)
+ }
+
private var storageLevel: StorageLevel = StorageLevel.NONE
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index c3b2a33fb5..f67e5f1857 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -106,7 +106,6 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
- RDDCheckpointData.clearTaskCaches()
}
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}
@@ -131,9 +130,5 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
}
}
-private[spark] object RDDCheckpointData {
- def clearTaskCaches() {
- ShuffleMapTask.clearCache()
- ResultTask.clearCache()
- }
-}
+// Used for synchronization
+private[spark] object RDDCheckpointData
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ede3c7d9f0..88cb5feaaf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -376,9 +376,6 @@ class DAGScheduler(
stageIdToStage -= stageId
stageIdToJobIds -= stageId
- ShuffleMapTask.removeStage(stageId)
- ResultTask.removeStage(stageId)
-
logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}
@@ -723,7 +720,6 @@ class DAGScheduler(
}
}
-
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index bbf9f7388b..62beb0d02a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -17,134 +17,68 @@
package org.apache.spark.scheduler
-import scala.language.existentials
+import java.nio.ByteBuffer
import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
- // A simple map between the stage id to the serialized byte array of a task.
- // Served as a cache for task serialization because serialization can be
- // expensive on the master node if it needs to launch thousands of tasks.
- private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
- def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] =
- {
- synchronized {
- val old = serializedInfoCache.get(stageId).orNull
- if (old != null) {
- old
- } else {
- val out = new ByteArrayOutputStream
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objOut = ser.serializeStream(new GZIPOutputStream(out))
- objOut.writeObject(rdd)
- objOut.writeObject(func)
- objOut.close()
- val bytes = out.toByteArray
- serializedInfoCache.put(stageId, bytes)
- bytes
- }
- }
- }
-
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) =
- {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objIn = ser.deserializeStream(in)
- val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
- (rdd, func)
- }
-
- def removeStage(stageId: Int) {
- serializedInfoCache.remove(stageId)
- }
-
- def clearCache() {
- synchronized {
- serializedInfoCache.clear()
- }
- }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
/**
* A task that sends back the output to the driver application.
*
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
*
* @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
* @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
*/
private[spark] class ResultTask[T, U](
stageId: Int,
- var rdd: RDD[T],
- var func: (TaskContext, Iterator[T]) => U,
- _partitionId: Int,
+ val rddBinary: Broadcast[Array[Byte]],
+ val func: (TaskContext, Iterator[T]) => U,
+ val partition: Partition,
@transient locs: Seq[TaskLocation],
- var outputId: Int)
- extends Task[U](stageId, _partitionId) with Externalizable {
-
- def this() = this(0, null, null, 0, null, 0)
-
- var split = if (rdd == null) null else rdd.partitions(partitionId)
+ val outputId: Int)
+ extends Task[U](stageId, partition.index) with Serializable {
+
+ // TODO: Should we also broadcast func? For that we would need a place to
+ // keep a reference to it (perhaps in DAGScheduler's job object).
+
+ def this(
+ stageId: Int,
+ rdd: RDD[T],
+ func: (TaskContext, Iterator[T]) => U,
+ partitionId: Int,
+ locs: Seq[TaskLocation],
+ outputId: Int) = {
+ this(stageId, rdd.broadcasted, func, rdd.partitions(partitionId), locs, outputId)
+ }
- @transient private val preferredLocs: Seq[TaskLocation] = {
+ @transient private[this] val preferredLocs: Seq[TaskLocation] = {
if (locs == null) Nil else locs.toSet.toSeq
}
override def runTask(context: TaskContext): U = {
+ // Deserialize the RDD using the broadcast variable.
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val rdd = ser.deserialize[RDD[T]](ByteBuffer.wrap(rddBinary.value),
+ Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
try {
- func(context, rdd.iterator(split, context))
+ func(context, rdd.iterator(partition, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
+ // This is only callable on the driver side.
override def preferredLocations: Seq[TaskLocation] = preferredLocs
override def toString = "ResultTask(" + stageId + ", " + partitionId + ")"
-
- override def writeExternal(out: ObjectOutput) {
- RDDCheckpointData.synchronized {
- split = rdd.partitions(partitionId)
- out.writeInt(stageId)
- val bytes = ResultTask.serializeInfo(
- stageId, rdd, func.asInstanceOf[(TaskContext, Iterator[_]) => _])
- out.writeInt(bytes.length)
- out.write(bytes)
- out.writeInt(partitionId)
- out.writeInt(outputId)
- out.writeLong(epoch)
- out.writeObject(split)
- }
- }
-
- override def readExternal(in: ObjectInput) {
- val stageId = in.readInt()
- val numBytes = in.readInt()
- val bytes = new Array[Byte](numBytes)
- in.readFully(bytes)
- val (rdd_, func_) = ResultTask.deserializeInfo(stageId, bytes)
- rdd = rdd_.asInstanceOf[RDD[T]]
- func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
- partitionId = in.readInt()
- outputId = in.readInt()
- epoch = in.readLong()
- split = in.readObject().asInstanceOf[Partition]
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index fdaf1de83f..033c6e5286 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -17,71 +17,13 @@
package org.apache.spark.scheduler
-import scala.language.existentials
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
+import java.nio.ByteBuffer
import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter
-private[spark] object ShuffleMapTask {
-
- // A simple map between the stage id to the serialized byte array of a task.
- // Served as a cache for task serialization because serialization can be
- // expensive on the master node if it needs to launch thousands of tasks.
- private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
- def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_, _, _]): Array[Byte] = {
- synchronized {
- val old = serializedInfoCache.get(stageId).orNull
- if (old != null) {
- return old
- } else {
- val out = new ByteArrayOutputStream
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objOut = ser.serializeStream(new GZIPOutputStream(out))
- objOut.writeObject(rdd)
- objOut.writeObject(dep)
- objOut.close()
- val bytes = out.toByteArray
- serializedInfoCache.put(stageId, bytes)
- bytes
- }
- }
- }
-
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_, _, _]) = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objIn = ser.deserializeStream(in)
- val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_, _, _]]
- (rdd, dep)
- }
-
- // Since both the JarSet and FileSet have the same format this is used for both.
- def deserializeFileSet(bytes: Array[Byte]): HashMap[String, Long] = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val objIn = new ObjectInputStream(in)
- val set = objIn.readObject().asInstanceOf[Array[(String, Long)]].toMap
- HashMap(set.toSeq: _*)
- }
-
- def removeStage(stageId: Int) {
- serializedInfoCache.remove(stageId)
- }
-
- def clearCache() {
- synchronized {
- serializedInfoCache.clear()
- }
- }
-}
-
/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
* specified in the ShuffleDependency).
@@ -89,62 +31,47 @@ private[spark] object ShuffleMapTask {
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
- * @param rdd the final RDD in this stage
+ * @param rddBinary broadcast version of of the serialized RDD
* @param dep the ShuffleDependency
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
*/
private[spark] class ShuffleMapTask(
stageId: Int,
- var rdd: RDD[_],
+ var rddBinary: Broadcast[Array[Byte]],
var dep: ShuffleDependency[_, _, _],
- _partitionId: Int,
+ partition: Partition,
@transient private var locs: Seq[TaskLocation])
- extends Task[MapStatus](stageId, _partitionId)
- with Externalizable
- with Logging {
-
- protected def this() = this(0, null, null, 0, null)
+ extends Task[MapStatus](stageId, partition.index) with Logging {
+
+ // TODO: Should we also broadcast the ShuffleDependency? For that we would need a place to
+ // keep a reference to it (perhaps in Stage).
+
+ def this(
+ stageId: Int,
+ rdd: RDD[_],
+ dep: ShuffleDependency[_, _, _],
+ partitionId: Int,
+ locs: Seq[TaskLocation]) = {
+ this(stageId, rdd.broadcasted, dep, rdd.partitions(partitionId), locs)
+ }
@transient private val preferredLocs: Seq[TaskLocation] = {
if (locs == null) Nil else locs.toSet.toSeq
}
- var split = if (rdd == null) null else rdd.partitions(partitionId)
-
- override def writeExternal(out: ObjectOutput) {
- RDDCheckpointData.synchronized {
- split = rdd.partitions(partitionId)
- out.writeInt(stageId)
- val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep)
- out.writeInt(bytes.length)
- out.write(bytes)
- out.writeInt(partitionId)
- out.writeLong(epoch)
- out.writeObject(split)
- }
- }
-
- override def readExternal(in: ObjectInput) {
- val stageId = in.readInt()
- val numBytes = in.readInt()
- val bytes = new Array[Byte](numBytes)
- in.readFully(bytes)
- val (rdd_, dep_) = ShuffleMapTask.deserializeInfo(stageId, bytes)
- rdd = rdd_
- dep = dep_
- partitionId = in.readInt()
- epoch = in.readLong()
- split = in.readObject().asInstanceOf[Partition]
- }
-
override def runTask(context: TaskContext): MapStatus = {
+ // Deserialize the RDD using the broadcast variable.
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val rdd = ser.deserialize[RDD[_]](ByteBuffer.wrap(rddBinary.value),
+ Thread.currentThread.getContextClassLoader)
+
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
- writer.write(rdd.iterator(split, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
+ writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>