diff options
-rw-r--r-- | core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/IndexPage.scala | 56 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressUI.scala | 64 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 35 | ||||
-rwxr-xr-x | ec2/spark_ec2.py | 39 |
5 files changed, 141 insertions, 67 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index db1c902955..b70153fd30 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -113,7 +113,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } private[spark] class ExecutorsListener extends SparkListener with Logging { - val executorToTasksActive = HashMap[String, HashSet[Long]]() + val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToTaskInfos = @@ -121,9 +121,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId - if (!executorToTasksActive.contains(eid)) - executorToTasksActive(eid) = HashSet[Long]() - executorToTasksActive(eid) += taskStart.taskInfo.taskId + val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) + activeTasks += taskStart.taskInfo val taskList = executorToTaskInfos.getOrElse( eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) @@ -132,9 +131,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId - if (!executorToTasksActive.contains(eid)) - executorToTasksActive(eid) = HashSet[Long]() - executorToTasksActive(eid) -= taskEnd.taskInfo.taskId + val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) + activeTasks -= taskEnd.taskInfo val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => @@ -142,7 +140,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { (Some(e), e.metrics) case _ => executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 - (None, Some(taskEnd.taskMetrics)) + (None, Option(taskEnd.taskMetrics)) } val taskList = executorToTaskInfos.getOrElse( eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index f31af3cda6..646ae5ecbc 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -25,9 +25,10 @@ import scala.Some import scala.xml.{NodeSeq, Node} import spark.scheduler.Stage -import spark.ui.UIUtils._ -import spark.ui.Page._ import spark.storage.StorageLevel +import spark.ui.Page._ +import spark.ui.UIUtils._ +import spark.Utils /** Page showing list of all ongoing and recently finished stages */ private[spark] class IndexPage(parent: JobProgressUI) { @@ -38,6 +39,12 @@ private[spark] class IndexPage(parent: JobProgressUI) { val activeStages = listener.activeStages.toSeq val completedStages = listener.completedStages.reverse.toSeq val failedStages = listener.failedStages.reverse.toSeq + val now = System.currentTimeMillis() + + var activeTime = 0L + for (tasks <- listener.stageToTasksActive.values; t <- tasks) { + activeTime += t.timeRunning(now) + } /** Special table which merges two header cells. */ def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { @@ -48,7 +55,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { <th>Submitted</th> <th>Duration</th> <th colspan="2">Tasks: Complete/Total</th> - <th>Shuffle Activity</th> + <th>Shuffle Read</th> + <th>Shuffle Write</th> <th>Stored RDD</th> </thead> <tbody> @@ -57,11 +65,33 @@ private[spark] class IndexPage(parent: JobProgressUI) { </table> } + val summary: NodeSeq = + <div> + <ul class="unstyled"> + <li> + <strong>CPU time: </strong> + {parent.formatDuration(listener.totalTime + activeTime)} + </li> + {if (listener.totalShuffleRead > 0) + <li> + <strong>Shuffle read: </strong> + {Utils.memoryBytesToString(listener.totalShuffleRead)} + </li> + } + {if (listener.totalShuffleWrite > 0) + <li> + <strong>Shuffle write: </strong> + {Utils.memoryBytesToString(listener.totalShuffleWrite)} + </li> + } + </ul> + </div> val activeStageTable: NodeSeq = stageTable(stageRow, activeStages) val completedStageTable = stageTable(stageRow, completedStages) val failedStageTable: NodeSeq = stageTable(stageRow, failedStages) - val content = <h2>Active Stages</h2> ++ activeStageTable ++ + val content = summary ++ + <h2>Active Stages</h2> ++ activeStageTable ++ <h2>Completed Stages</h2> ++ completedStageTable ++ <h2>Failed Stages</h2> ++ failedStageTable @@ -94,13 +124,16 @@ private[spark] class IndexPage(parent: JobProgressUI) { case Some(t) => dateFmt.format(new Date(t)) case None => "Unknown" } - val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id)) - val shuffleInfo = (read, write) match { - case (true, true) => "Read/Write" - case (true, false) => "Read" - case (false, true) => "Write" - case _ => "" + + val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match { + case 0 => "" + case b => Utils.memoryBytesToString(b) + } + val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match { + case 0 => "" + case b => Utils.memoryBytesToString(b) } + val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) val totalTasks = s.numPartitions @@ -117,7 +150,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { case _ => }} </td> - <td>{shuffleInfo}</td> + <td>{shuffleRead}</td> + <td>{shuffleWrite}</td> <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) { <a href={"/storage/rdd?id=%s".format(s.rdd.id)}> {Option(s.rdd.name).getOrElse(s.rdd.id)} diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 6e332415db..09d24b6302 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -65,7 +65,15 @@ private[spark] class JobProgressListener extends SparkListener { val completedStages = ListBuffer[Stage]() val failedStages = ListBuffer[Stage]() - val stageToTasksActive = HashMap[Int, HashSet[Long]]() + // Total metrics reflect metrics only for completed tasks + var totalTime = 0L + var totalShuffleRead = 0L + var totalShuffleWrite = 0L + + val stageToTime = HashMap[Int, Long]() + val stageToShuffleRead = HashMap[Int, Long]() + val stageToShuffleWrite = HashMap[Int, Long]() + val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]() val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]() val stageToTaskInfos = @@ -86,6 +94,12 @@ private[spark] class JobProgressListener extends SparkListener { val toRemove = RETAINED_STAGES / 10 stages.takeRight(toRemove).foreach( s => { stageToTaskInfos.remove(s.id) + stageToTime.remove(s.id) + stageToShuffleRead.remove(s.id) + stageToShuffleWrite.remove(s.id) + stageToTasksActive.remove(s.id) + stageToTasksComplete.remove(s.id) + stageToTasksFailed.remove(s.id) }) stages.trimEnd(toRemove) } @@ -96,9 +110,8 @@ private[spark] class JobProgressListener extends SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart) { val sid = taskStart.task.stageId - if (!stageToTasksActive.contains(sid)) - stageToTasksActive(sid) = HashSet[Long]() - stageToTasksActive(sid) += taskStart.taskInfo.taskId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive += taskStart.taskInfo val taskList = stageToTaskInfos.getOrElse( sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) @@ -107,9 +120,8 @@ private[spark] class JobProgressListener extends SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val sid = taskEnd.task.stageId - if (!stageToTasksActive.contains(sid)) - stageToTasksActive(sid) = HashSet[Long]() - stageToTasksActive(sid) -= taskEnd.taskInfo.taskId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive -= taskEnd.taskInfo val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => @@ -117,8 +129,26 @@ private[spark] class JobProgressListener extends SparkListener { (Some(e), e.metrics) case _ => stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - (None, Some(taskEnd.taskMetrics)) + (None, Option(taskEnd.taskMetrics)) } + + stageToTime.getOrElseUpdate(sid, 0L) + val time = metrics.map(m => m.executorRunTime).getOrElse(0) + stageToTime(sid) += time + totalTime += time + + stageToShuffleRead.getOrElseUpdate(sid, 0L) + val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => + s.remoteBytesRead).getOrElse(0L) + stageToShuffleRead(sid) += shuffleRead + totalShuffleRead += shuffleRead + + stageToShuffleWrite.getOrElseUpdate(sid, 0L) + val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => + s.shuffleBytesWritten).getOrElse(0L) + stageToShuffleWrite(sid) += shuffleWrite + totalShuffleWrite += shuffleWrite + val taskList = stageToTaskInfos.getOrElse( sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList -= ((taskEnd.taskInfo, None, None)) @@ -139,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener { case _ => } } - - /** Is this stage's input from a shuffle read. */ - def hasShuffleRead(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined - } - return false // No tasks have finished for this stage - } - - /** Is this stage's output to a shuffle write. */ - def hasShuffleWrite(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined - } - return false // No tasks have finished for this stage - } } diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 654f347723..e327cb3947 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -37,6 +37,7 @@ private[spark] class StagePage(parent: JobProgressUI) { def render(request: HttpServletRequest): Seq[Node] = { val stageId = request.getParameter("id").toInt + val now = System.currentTimeMillis() if (!listener.stageToTaskInfos.contains(stageId)) { val content = @@ -49,8 +50,35 @@ private[spark] class StagePage(parent: JobProgressUI) { val tasks = listener.stageToTaskInfos(stageId) - val shuffleRead = listener.hasShuffleRead(stageId) - val shuffleWrite = listener.hasShuffleWrite(stageId) + val shuffleRead = listener.stageToShuffleRead(stageId) > 0 + val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0 + + var activeTime = 0L + listener.stageToTasksActive(stageId).foreach { t => + activeTime += t.timeRunning(now) + } + + val summary = + <div> + <ul class="unstyled"> + <li> + <strong>CPU time: </strong> + {parent.formatDuration(listener.stageToTime(stageId) + activeTime)} + </li> + {if (shuffleRead) + <li> + <strong>Shuffle read: </strong> + {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))} + </li> + } + {if (shuffleWrite) + <li> + <strong>Shuffle write: </strong> + {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))} + </li> + } + </ul> + </div> val taskHeaders: Seq[String] = Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ @@ -98,7 +126,8 @@ private[spark] class StagePage(parent: JobProgressUI) { } val content = - <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable; + summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ + <h2>Tasks</h2> ++ taskTable; headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) } diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 2ec3c007fb..740ec08542 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -9,9 +9,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -53,7 +53,7 @@ def parse_args(): help="Seconds to wait for nodes to start (default: 120)") parser.add_option("-k", "--key-pair", help="Key pair to use on instances") - parser.add_option("-i", "--identity-file", + parser.add_option("-i", "--identity-file", help="SSH private key file to use for logging into instances") parser.add_option("-t", "--instance-type", default="m1.large", help="Type of instance to launch (default: m1.large). " + @@ -69,7 +69,7 @@ def parse_args(): parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") - parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", + parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + "the given local address (for use with login)") parser.add_option("--resume", action="store_true", default=False, @@ -99,7 +99,7 @@ def parse_args(): help="The SSH user you want to connect as (default: root)") parser.add_option("--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") - + (opts, args) = parser.parse_args() if len(args) != 2: parser.print_help() @@ -112,7 +112,7 @@ def parse_args(): if opts.cluster_type not in ["mesos", "standalone"] and action == "launch": print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type) sys.exit(1) - + # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html home_dir = os.getenv('HOME') @@ -178,6 +178,7 @@ def launch_cluster(conn, opts, cluster_name): master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') + master_group.authorize('tcp', 33000, 33010, '0.0.0.0/0') if opts.cluster_type == "mesos": master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0') if opts.ganglia: @@ -257,7 +258,7 @@ def launch_cluster(conn, opts, cluster_name): block_device_map = block_map) my_req_ids += [req.id for req in slave_reqs] i += 1 - + print "Waiting for spot instances to be granted..." try: while True: @@ -413,7 +414,7 @@ def setup_standalone_cluster(master, slave_nodes, opts): slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) ssh(master, opts, "/root/spark/bin/start-all.sh") - + def setup_spark_cluster(master, opts): ssh(master, opts, "chmod u+x spark-ec2/setup.sh") ssh(master, opts, "spark-ec2/setup.sh") @@ -528,7 +529,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes, dest.write(text) dest.close() # rsync the whole directory over to the master machine - command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + + command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master)) subprocess.check_call(command, shell=True) # Remove the temp directory we created above @@ -557,9 +558,9 @@ def ssh(host, opts, command): print "Error connecting to host {0}, sleeping 30".format(e) time.sleep(30) tries = tries + 1 - - - + + + # Gets a list of zones to launch instances in @@ -618,12 +619,12 @@ def main(): print "Terminating zoo..." for inst in zoo_nodes: inst.terminate() - + # Delete security groups as well if opts.delete_groups: print "Deleting security groups (this will take some time)..." group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"] - + attempt = 1; while attempt <= 3: print "Attempt %d" % attempt @@ -639,7 +640,7 @@ def main(): from_port=rule.from_port, to_port=rule.to_port, src_group=grant) - + # Sleep for AWS eventual-consistency to catch up, and for instances # to terminate time.sleep(30) # Yes, it does have to be this long :-( @@ -650,13 +651,13 @@ def main(): except boto.exception.EC2ResponseError: success = False; print "Failed to delete security group " + group.name - + # Unfortunately, group.revoke() returns True even if a rule was not # deleted, so this needs to be rerun if something fails if success: break; - + attempt += 1 - + if not success: print "Failed to delete all security groups after 3 tries." print "Try re-running in a few minutes." @@ -679,7 +680,7 @@ def main(): elif action == "stop": response = raw_input("Are you sure you want to stop the cluster " + cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + - "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": |