diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/ShuffleMapTask.scala')
-rw-r--r-- | core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 32 |
1 files changed, 9 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 1c25605f75..f2a038576b 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -18,16 +18,9 @@ package spark.scheduler import java.io._ -import java.util.{HashMap => JHashMap} import java.util.zip.{GZIPInputStream, GZIPOutputStream} -import scala.collection.mutable.{ArrayBuffer, HashMap} -import scala.collection.JavaConversions._ - -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - -import com.ning.compress.lzf.LZFInputStream -import com.ning.compress.lzf.LZFOutputStream +import scala.collection.mutable.HashMap import spark._ import spark.executor.ShuffleWriteMetrics @@ -95,25 +88,18 @@ private[spark] class ShuffleMapTask( var rdd: RDD[_], var dep: ShuffleDependency[_,_], var partition: Int, - @transient private var locs: Seq[String]) + @transient private var locs: Seq[TaskLocation]) extends Task[MapStatus](stageId) with Externalizable with Logging { protected def this() = this(0, null, null, 0, null) - @transient private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq - - { - // DEBUG code - preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs)) + @transient private val preferredLocs: Seq[TaskLocation] = { + if (locs == null) Nil else locs.toSet.toSeq } - var split = if (rdd == null) { - null - } else { - rdd.partitions(partition) - } + var split = if (rdd == null) null else rdd.partitions(partition) override def writeExternal(out: ObjectOutput) { RDDCheckpointData.synchronized { @@ -123,7 +109,7 @@ private[spark] class ShuffleMapTask( out.writeInt(bytes.length) out.write(bytes) out.writeInt(partition) - out.writeLong(generation) + out.writeLong(epoch) out.writeObject(split) } } @@ -137,7 +123,7 @@ private[spark] class ShuffleMapTask( rdd = rdd_ dep = dep_ partition = in.readInt() - generation = in.readLong() + epoch = in.readLong() split = in.readObject().asInstanceOf[Partition] } @@ -159,7 +145,7 @@ private[spark] class ShuffleMapTask( // Write the map output to its associated buckets. for (elem <- rdd.iterator(split, taskContext)) { - val pair = elem.asInstanceOf[(Any, Any)] + val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) buckets.writers(bucketId).write(pair) } @@ -197,7 +183,7 @@ private[spark] class ShuffleMapTask( } } - override def preferredLocations: Seq[String] = preferredLocs + override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition) } |