diff options
Diffstat (limited to 'core/src')
8 files changed, 80 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 9e912d3adb..f344804b4c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -245,6 +245,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** + * Return an array that contains all of the elements in a specific partition of this RDD. + */ + def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = { + // This is useful for implementing `take` from other language frontends + // like Python where the data is serialized. + import scala.collection.JavaConversions._ + val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true) + res.map(x => new java.util.ArrayList(x.toSeq)).toArray + } + + /** * Reduces the elements of this RDD using the specified commutative and associative binary operator. */ def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a659cc06c2..ca42c76928 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -235,10 +235,6 @@ private[spark] object PythonRDD { file.close() } - def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { - implicit val cm : ClassTag[T] = rdd.elementClassTag - rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator - } } private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0b0a60ee60..0f19d7a96b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -222,18 +222,22 @@ private[spark] class Executor( return } + val resultSer = SparkEnv.get.serializer.newInstance() + val beforeSerialization = System.currentTimeMillis() + val valueBytes = resultSer.serialize(value) + val afterSerialization = System.currentTimeMillis() + for (m <- task.metrics) { m.hostname = Utils.localHostName() m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime + m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt } - // TODO I'd also like to track the time it takes to serialize the task results, but that is - // huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a - // custom serialized format, we could just change the relevants bytes in the byte buffer + val accumUpdates = Accumulators.values - val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null)) + val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index c0ce46e379..bb1471d9ee 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -44,6 +44,11 @@ class TaskMetrics extends Serializable { var jvmGCTime: Long = _ /** + * Amount of time spent serializing the task result + */ + var resultSerializationTime: Long = _ + + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ var shuffleReadMetrics: Option[ShuffleReadMetrics] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 7e468d0d67..e80cc6b0f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] -class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) +class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) extends TaskResult[T] with Externalizable { - def this() = this(null.asInstanceOf[T], null, null) + def this() = this(null.asInstanceOf[ByteBuffer], null, null) override def writeExternal(out: ObjectOutput) { - val objectSer = SparkEnv.get.serializer.newInstance() - val bb = objectSer.serialize(value) - - out.writeInt(bb.remaining()) - Utils.writeByteBuffer(bb, out) + out.writeInt(valueBytes.remaining); + Utils.writeByteBuffer(valueBytes, out) out.writeInt(accumUpdates.size) for ((key, value) <- accumUpdates) { @@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me override def readExternal(in: ObjectInput) { - val objectSer = SparkEnv.get.serializer.newInstance() - val blen = in.readInt() val byteVal = new Array[Byte](blen) in.readFully(byteVal) - value = objectSer.deserialize(ByteBuffer.wrap(byteVal)) + valueBytes = ByteBuffer.wrap(byteVal) val numUpdates = in.readInt if (numUpdates == 0) { @@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me } metrics = in.readObject().asInstanceOf[TaskMetrics] } + + def value(): T = { + val resultSer = SparkEnv.get.serializer.newInstance() + return resultSer.deserialize(valueBytes) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 69f9446bab..996e1b4d1a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) { val taskHeaders: Seq[String] = Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ - Seq("Duration", "GC Time") ++ + Seq("Duration", "GC Time", "Result Ser Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ Seq("Errors") @@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) { None } else { + val serializationTimes = validTasks.map{case (info, metrics, exception) => + metrics.get.resultSerializationTime.toDouble} + val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map( + ms => parent.formatDuration(ms.toLong)) + val serviceTimes = validTasks.map{case (info, metrics, exception) => metrics.get.executorRunTime.toDouble} val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( @@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) { val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) val listings: Seq[Seq[String]] = Seq( + serializationQuantiles, serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, @@ -183,6 +189,7 @@ private[spark] class StagePage(parent: JobProgressUI) { val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L) + val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L) val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead} val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") @@ -210,6 +217,9 @@ private[spark] class StagePage(parent: JobProgressUI) { <td sorttable_customkey={gcTime.toString}> {if (gcTime > 0) parent.formatDuration(gcTime) else ""} </td> + <td sorttable_customkey={serializationTime.toString}> + {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""} + </td> {if (shuffleRead) { <td sorttable_customkey={shuffleReadSortable}> {shuffleReadReadable} diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 4234f6eac7..79913dc718 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -897,4 +897,37 @@ public class JavaAPISuite implements Serializable { new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); } + + @Test + public void collectPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); + + JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i, i % 2); + } + }); + + List[] parts = rdd1.collectPartitions(new int[] {0}); + Assert.assertEquals(Arrays.asList(1, 2), parts[0]); + + parts = rdd1.collectPartitions(new int[] {1, 2}); + Assert.assertEquals(Arrays.asList(3, 4), parts[0]); + Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); + + Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), + new Tuple2<Integer, Integer>(2, 0)), + rdd2.collectPartitions(new int[] {0})[0]); + + parts = rdd2.collectPartitions(new int[] {1, 2}); + Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1), + new Tuple2<Integer, Integer>(4, 0)), + parts[0]); + Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1), + new Tuple2<Integer, Integer>(6, 0), + new Tuple2<Integer, Integer>(7, 1)), + parts[1]); + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index 29c4cc5d9c..bb28a31a99 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo } def createTaskResult(id: Int): DirectTaskResult[Int] = { - new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics) + val valueSer = SparkEnv.get.serializer.newInstance() + new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) } } |