diff options
9 files changed, 179 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index f2da417724..21dc8f0b65 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -18,9 +18,11 @@ package org.apache.spark.ui import java.text.SimpleDateFormat -import java.util.{Locale, Date} +import java.util.{Date, Locale} -import scala.xml.{Node, Text, Unparsed} +import scala.util.control.NonFatal +import scala.xml._ +import scala.xml.transform.{RewriteRule, RuleTransformer} import org.apache.spark.Logging import org.apache.spark.ui.scope.RDDOperationGraph @@ -395,4 +397,60 @@ private[spark] object UIUtils extends Logging { </script> } + /** + * Returns HTML rendering of a job or stage description. It will try to parse the string as HTML + * and make sure that it only contains anchors with root-relative links. Otherwise, + * the whole string will rendered as a simple escaped text. + * + * Note: In terms of security, only anchor tags with root relative links are supported. So any + * attempts to embed links outside Spark UI, or other tags like <script> will cause in the whole + * description to be treated as plain text. + */ + def makeDescription(desc: String, basePathUri: String): NodeSeq = { + import scala.language.postfixOps + + // If the description can be parsed as HTML and has only relative links, then render + // as HTML, otherwise render as escaped string + try { + // Try to load the description as unescaped HTML + val xml = XML.loadString(s"""<span class="description-input">$desc</span>""") + + // Verify that this has only anchors and span (we are wrapping in span) + val allowedNodeLabels = Set("a", "span") + val illegalNodes = xml \\ "_" filterNot { case node: Node => + allowedNodeLabels.contains(node.label) + } + if (illegalNodes.nonEmpty) { + throw new IllegalArgumentException( + "Only HTML anchors allowed in job descriptions\n" + + illegalNodes.map { n => s"${n.label} in $n"}.mkString("\n\t")) + } + + // Verify that all links are relative links starting with "/" + val allLinks = + xml \\ "a" flatMap { _.attributes } filter { _.key == "href" } map { _.value.toString } + if (allLinks.exists { ! _.startsWith ("/") }) { + throw new IllegalArgumentException( + "Links in job descriptions must be root-relative:\n" + allLinks.mkString("\n\t")) + } + + // Prepend the relative links with basePathUri + val rule = new RewriteRule() { + override def transform(n: Node): Seq[Node] = { + n match { + case e: Elem if e \ "@href" nonEmpty => + val relativePath = e.attribute("href").get.toString + val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}" + e % Attribute(null, "href", fullUri, Null) + case _ => n + } + } + } + new RuleTransformer(rule).transform(xml) + } catch { + case NonFatal(e) => + logWarning(s"Invalid job description: $desc ", e) + <span class="description-input">{desc}</span> + } + } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index e72547df72..041cd55ea4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -17,15 +17,15 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, ListBuffer} -import scala.xml.{Node, NodeSeq, Unparsed, Utility} - import java.util.Date import javax.servlet.http.HttpServletRequest -import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} -import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData} +import scala.collection.mutable.{HashMap, ListBuffer} +import scala.xml._ + import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData} +import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} /** Page showing list of all ongoing and recently finished jobs */ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { @@ -224,6 +224,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") + val jobDescription = UIUtils.makeDescription(lastStageDescription, parent.basePath) + val detailUrl = "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId) <tr id={"job-" + job.jobId}> @@ -231,7 +233,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} </td> <td> - <span class="description-input" title={lastStageDescription}>{lastStageDescription}</span> + {jobDescription} <a href={detailUrl} class="name-link">{lastStageName}</a> </td> <td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 99812db491..ea806d09b6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,11 +17,10 @@ package org.apache.spark.ui.jobs -import scala.xml.Node -import scala.xml.Text - import java.util.Date +import scala.xml.{Node, Text} + import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.scheduler.StageInfo @@ -116,7 +115,7 @@ private[ui] class StageTableBase( stageData <- listener.stageIdToData.get((s.stageId, s.attemptId)) desc <- stageData.description } yield { - <span class="description-input" title={desc}>{desc}</span> + UIUtils.makeDescription(desc, basePathUri) } <div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div> } diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala new file mode 100644 index 0000000000..2b693c1651 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import scala.xml.Elem + +import org.apache.spark.SparkFunSuite + +class UIUtilsSuite extends SparkFunSuite { + import UIUtils._ + + test("makeDescription") { + verify( + """test <a href="/link"> text </a>""", + <span class="description-input">test <a href="/link"> text </a></span>, + "Correctly formatted text with only anchors and relative links should generate HTML" + ) + + verify( + """test <a href="/link" text </a>""", + <span class="description-input">{"""test <a href="/link" text </a>"""}</span>, + "Badly formatted text should make the description be treated as a streaming instead of HTML" + ) + + verify( + """test <a href="link"> text </a>""", + <span class="description-input">{"""test <a href="link"> text </a>"""}</span>, + "Non-relative links should make the description be treated as a string instead of HTML" + ) + + verify( + """test<a><img></img></a>""", + <span class="description-input">{"""test<a><img></img></a>"""}</span>, + "Non-anchor elements should make the description be treated as a string instead of HTML" + ) + + verify( + """test <a href="/link"> text </a>""", + <span class="description-input">test <a href="base/link"> text </a></span>, + baseUrl = "base", + errorMsg = "Base URL should be prepended to html links" + ) + } + + private def verify( + desc: String, expected: Elem, errorMsg: String = "", baseUrl: String = ""): Unit = { + val generated = makeDescription(desc, baseUrl) + assert(generated.sameElements(expected), + s"\n$errorMsg\n\nExpected:\n$expected\nGenerated:\n$generated") + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6720ba4f72..94fea63f55 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -200,6 +200,8 @@ class StreamingContext private[streaming] ( private val startSite = new AtomicReference[CallSite](null) + private[streaming] def getStartSite(): CallSite = startSite.get() + private var shutdownHookRef: AnyRef = _ conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint) @@ -744,7 +746,7 @@ object StreamingContext extends Logging { throw new IllegalStateException( "Only one StreamingContext may be started in this JVM. " + "Currently running StreamingContext was started at" + - activeContext.get.startSite.get.longForm) + activeContext.get.getStartSite().longForm) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 0cd39594ee..32d995dc42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -25,6 +25,7 @@ import scala.util.{Failure, Success} import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ +import org.apache.spark.streaming.ui.UIUtils import org.apache.spark.util.{EventLoop, ThreadUtils} @@ -190,10 +191,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } private class JobHandler(job: Job) extends Runnable with Logging { + import JobScheduler._ + def run() { - ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) - ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) try { + val formattedTime = UIUtils.formatBatchTime( + job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) + val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" + val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" + + ssc.sc.setJobDescription( + s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") + ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) + ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) + // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index f86fd44b48..204e6142fd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver._ -import org.apache.spark.util.{ThreadUtils, SerializableConfiguration} +import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration} /** Enumeration to identify current state of a Receiver */ @@ -554,6 +554,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors)) } receiverRDD.setName(s"Receiver $receiverId") + ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") + ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) + val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 90d1b0fade..9129c1f26a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -19,14 +19,14 @@ package org.apache.spark.streaming.ui import javax.servlet.http.HttpServletRequest -import scala.xml.{NodeSeq, Node, Text, Unparsed} +import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.streaming.Time -import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} -import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId} +import org.apache.spark.streaming.ui.StreamingJobProgressListener.{OutputOpId, SparkJobId} import org.apache.spark.ui.jobs.UIData.JobUIData +import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) @@ -207,16 +207,25 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { sparkListener.stageIdToInfo.get(sparkJob.stageIds.max) } } - val lastStageData = lastStageInfo.flatMap { s => - sparkListener.stageIdToData.get((s.stageId, s.attemptId)) - } - - val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") - val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") + lastStageInfo match { + case Some(stageInfo) => + val details = if (stageInfo.details.nonEmpty) { + <span + onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" + class="expand-details"> + +details + </span> ++ + <div class="stage-details collapsed"> + <pre>{stageInfo.details}</pre> + </div> + } else { + NodeSeq.Empty + } - <span class="description-input" title={lastStageDescription}> - {lastStageDescription} - </span> ++ Text(lastStageName) + <div> {stageInfo.name} {details} </div> + case None => + Text("(Unknown)") + } } private def failureReasonCell(failureReason: String): Seq[Node] = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3b9d0d15ea..c7a877142b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -204,7 +204,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo // Verify streaming jobs have expected thread-local properties assert(jobGroupFound === null) - assert(jobDescFound === null) + assert(jobDescFound.contains("Streaming job from")) assert(jobInterruptFound === "false") // Verify current thread's thread-local properties have not changed |