aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/scheduler/ShuffleMapTask.scala')
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala32
1 files changed, 9 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 1c25605f75..f2a038576b 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -18,16 +18,9 @@
package spark.scheduler
import java.io._
-import java.util.{HashMap => JHashMap}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.collection.JavaConversions._
-
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-
-import com.ning.compress.lzf.LZFInputStream
-import com.ning.compress.lzf.LZFOutputStream
+import scala.collection.mutable.HashMap
import spark._
import spark.executor.ShuffleWriteMetrics
@@ -95,25 +88,18 @@ private[spark] class ShuffleMapTask(
var rdd: RDD[_],
var dep: ShuffleDependency[_,_],
var partition: Int,
- @transient private var locs: Seq[String])
+ @transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId)
with Externalizable
with Logging {
protected def this() = this(0, null, null, 0, null)
- @transient private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
-
- {
- // DEBUG code
- preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
+ @transient private val preferredLocs: Seq[TaskLocation] = {
+ if (locs == null) Nil else locs.toSet.toSeq
}
- var split = if (rdd == null) {
- null
- } else {
- rdd.partitions(partition)
- }
+ var split = if (rdd == null) null else rdd.partitions(partition)
override def writeExternal(out: ObjectOutput) {
RDDCheckpointData.synchronized {
@@ -123,7 +109,7 @@ private[spark] class ShuffleMapTask(
out.writeInt(bytes.length)
out.write(bytes)
out.writeInt(partition)
- out.writeLong(generation)
+ out.writeLong(epoch)
out.writeObject(split)
}
}
@@ -137,7 +123,7 @@ private[spark] class ShuffleMapTask(
rdd = rdd_
dep = dep_
partition = in.readInt()
- generation = in.readLong()
+ epoch = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
@@ -159,7 +145,7 @@ private[spark] class ShuffleMapTask(
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, taskContext)) {
- val pair = elem.asInstanceOf[(Any, Any)]
+ val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair)
}
@@ -197,7 +183,7 @@ private[spark] class ShuffleMapTask(
}
}
- override def preferredLocations: Seq[String] = preferredLocs
+ override def preferredLocations: Seq[TaskLocation] = preferredLocs
override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
}