diff options
author | Reynold Xin <rxin@databricks.com> | 2015-05-05 19:27:30 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-05-05 19:27:30 -0700 |
commit | 51b3d41e160a1326a04536241b427e65b39ed8df (patch) | |
tree | b65623a33e4da5aaab1425bb7a420dbe060f037f /core/src/main/scala | |
parent | 1fd31ba08928f8554f74609f48f4344bd69444e5 (diff) | |
download | spark-51b3d41e160a1326a04536241b427e65b39ed8df.tar.gz spark-51b3d41e160a1326a04536241b427e65b39ed8df.tar.bz2 spark-51b3d41e160a1326a04536241b427e65b39ed8df.zip |
Revert "[SPARK-3454] separate json endpoints for data in the UI"
This reverts commit d49735800db27239c11478aac4b0f2ec9df91a3f.
The commit broke Spark on Windows.
Diffstat (limited to 'core/src/main/scala')
37 files changed, 129 insertions, 1803 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2ca6882c8d..682dec44ac 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -428,7 +428,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, - _env.securityManager,appName, startTime = startTime)) + _env.securityManager,appName)) } else { // For tests, do not enable the UI None diff --git a/core/src/main/scala/org/apache/spark/annotation/Private.java b/core/src/main/scala/org/apache/spark/annotation/Private.java deleted file mode 100644 index 9082fcf0c8..0000000000 --- a/core/src/main/scala/org/apache/spark/annotation/Private.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.annotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * A class that is considered private to the internals of Spark -- there is a high-likelihood - * they will be changed in future versions of Spark. - * - * This should be used only when the standard Scala / Java means of protecting classes are - * insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation - * in its place. - * - * NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first - * line of the comment must be ":: Private ::" with no trailing blank line. This is because - * of the known issue that Scaladoc displays only either the annotation or the comment, whichever - * comes first. - */ -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, - ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE}) -public @interface Private {} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 298a820196..6a5011af17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history import org.apache.spark.ui.SparkUI -private[spark] case class ApplicationAttemptInfo( +private[history] case class ApplicationAttemptInfo( attemptId: Option[String], startTime: Long, endTime: Long, @@ -27,7 +27,7 @@ private[spark] case class ApplicationAttemptInfo( sparkUser: String, completed: Boolean = false) -private[spark] case class ApplicationHistoryInfo( +private[history] case class ApplicationHistoryInfo( id: String, name: String, attempts: List[ApplicationAttemptInfo]) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 45c2be34c8..993763f3aa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,21 +17,23 @@ package org.apache.spark.deploy.history -import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream} +import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream} import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import scala.collection.mutable +import scala.concurrent.duration.Duration -import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.fs.permission.AccessControlException +import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import com.google.common.util.concurrent.MoreExecutors +import org.apache.hadoop.fs.permission.AccessControlException +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.{Logging, SecurityManager, SparkConf} /** * A class that provides application history from event logs stored in the file system. @@ -149,7 +151,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, - HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) + HistoryServer.getAttemptURI(appId, attempt.attemptId)) // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 50522e69dc..754c8e9b66 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -25,7 +25,6 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{SignalLogger, Utils} @@ -46,7 +45,7 @@ class HistoryServer( provider: ApplicationHistoryProvider, securityManager: SecurityManager, port: Int) - extends WebUI(securityManager, port, conf) with Logging with UIRoot { + extends WebUI(securityManager, port, conf) with Logging { // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) @@ -57,7 +56,7 @@ class HistoryServer( require(parts.length == 1 || parts.length == 2, s"Invalid app key $key") val ui = provider .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None) - .getOrElse(throw new NoSuchElementException(s"no app with key $key")) + .getOrElse(throw new NoSuchElementException()) attachSparkUI(ui) ui } @@ -114,10 +113,6 @@ class HistoryServer( } } - def getSparkUI(appKey: String): Option[SparkUI] = { - Option(appCache.get(appKey)) - } - initialize() /** @@ -128,9 +123,6 @@ class HistoryServer( */ def initialize() { attachPage(new HistoryPage(this)) - - attachHandler(JsonRootResource.getJsonServlet(this)) - attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) val contextHandler = new ServletContextHandler @@ -168,13 +160,7 @@ class HistoryServer( * * @return List of all known applications. */ - def getApplicationList(): Iterable[ApplicationHistoryInfo] = { - provider.getListing() - } - - def getApplicationInfoList: Iterator[ApplicationInfo] = { - getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) - } + def getApplicationList(): Iterable[ApplicationHistoryInfo] = provider.getListing() /** * Returns the provider configuration to show in the listing page. diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 1620e95bea..f59d550d4f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.ApplicationDescription import org.apache.spark.util.Utils -private[spark] class ApplicationInfo( +private[deploy] class ApplicationInfo( val startTime: Long, val id: String, val desc: ApplicationDescription, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 53e1903a3d..0fac3cdcf5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -754,9 +754,9 @@ private[master] class Master( /** * Rebuild a new SparkUI from the given application's event logs. - * Return the UI if successful, else None + * Return whether this is successful. */ - private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = { + private def rebuildSparkUI(app: ApplicationInfo): Boolean = { val appName = app.desc.name val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found" try { @@ -764,7 +764,7 @@ private[master] class Master( .getOrElse { // Event logging is not enabled for this application app.desc.appUiUrl = notFoundBasePath - return None + return false } val eventLogFilePrefix = EventLoggingListener.getLogPath( @@ -787,7 +787,7 @@ private[master] class Master( val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) val replayBus = new ReplayListenerBus() val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), - appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime) + appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}") val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS) try { replayBus.replay(logInput, eventLogFile, maybeTruncated) @@ -798,7 +798,7 @@ private[master] class Master( webUi.attachSparkUI(ui) // Application UI is successfully rebuilt, so link the Master UI to it app.desc.appUiUrl = ui.basePath - Some(ui) + true } catch { case fnf: FileNotFoundException => // Event logging is enabled for this application, but no event logs are found @@ -808,7 +808,7 @@ private[master] class Master( msg += " Did you specify the correct logging directory?" msg = URLEncoder.encode(msg, "UTF-8") app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title" - None + false case e: Exception => // Relay exception message to application UI page val title = s"Application history load error (${app.id})" @@ -817,7 +817,7 @@ private[master] class Master( logError(msg, e) msg = URLEncoder.encode(msg, "UTF-8") app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title" - None + false } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 06e265f99e..273f077bd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -23,8 +23,10 @@ import scala.concurrent.Await import scala.xml.Node import akka.pattern.ask +import org.json4s.JValue +import org.json4s.JsonAST.JNothing -import org.apache.spark.deploy.ExecutorState +import org.apache.spark.deploy.{ExecutorState, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorDesc import org.apache.spark.ui.{UIUtils, WebUIPage} @@ -36,6 +38,21 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") private val timeout = parent.timeout /** Executor details for a particular application */ + override def renderJson(request: HttpServletRequest): JValue = { + val appId = request.getParameter("appId") + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] + val state = Await.result(stateFuture, timeout) + val app = state.activeApps.find(_.id == appId).getOrElse({ + state.completedApps.find(_.id == appId).getOrElse(null) + }) + if (app == null) { + JNothing + } else { + JsonProtocol.writeApplicationInfo(app) + } + } + + /** Executor details for a particular application */ def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 756927682c..1f2c3fdbfb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -35,13 +35,10 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { private val master = parent.masterActorRef private val timeout = parent.timeout - def getMasterState: MasterStateResponse = { - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - Await.result(stateFuture, timeout) - } - override def renderJson(request: HttpServletRequest): JValue = { - JsonProtocol.writeMasterState(getMasterState) + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] + val state = Await.result(stateFuture, timeout) + JsonProtocol.writeMasterState(state) } def handleAppKillRequest(request: HttpServletRequest): Unit = { @@ -71,7 +68,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { - val state = getMasterState + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] + val state = Await.result(stateFuture, timeout) val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory") val workers = state.workers.sortBy(_.id) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index eb26e9f99c..dea0a65eee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.master.ui import org.apache.spark.Logging import org.apache.spark.deploy.master.Master -import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot} import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.RpcUtils @@ -29,15 +28,12 @@ import org.apache.spark.util.RpcUtils */ private[master] class MasterWebUI(val master: Master, requestedPort: Int) - extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging - with UIRoot { + extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging { val masterActorRef = master.self val timeout = RpcUtils.askTimeout(master.conf) val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) - val masterPage = new MasterPage(this) - initialize() /** Initialize all components of the server. */ @@ -47,7 +43,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) attachPage(new HistoryNotFoundPage(this)) attachPage(masterPage) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) - attachHandler(JsonRootResource.getJsonServlet(this)) attachHandler(createRedirectHandler( "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) attachHandler(createRedirectHandler( @@ -65,23 +60,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) } - - def getApplicationInfoList: Iterator[ApplicationInfo] = { - val state = masterPage.getMasterState - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val completedApps = state.completedApps.sortBy(_.endTime).reverse - activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++ - completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) } - } - - def getSparkUI(appId: String): Option[SparkUI] = { - val state = masterPage.getMasterState - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val completedApps = state.completedApps.sortBy(_.endTime).reverse - (activeApps ++ completedApps).find { _.id == appId }.flatMap { - master.rebuildSparkUI - } - } } private[master] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala deleted file mode 100644 index 5783df5d82..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.{Arrays, Date, List => JList} -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -import org.apache.spark.JobExecutionStatus -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.JobProgressListener -import org.apache.spark.ui.jobs.UIData.JobUIData - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllJobsResource(ui: SparkUI) { - - @GET - def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = { - val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = - AllJobsResource.getStatusToJobs(ui) - val adjStatuses: JList[JobExecutionStatus] = { - if (statuses.isEmpty) { - Arrays.asList(JobExecutionStatus.values(): _*) - } else { - statuses - } - } - val jobInfos = for { - (status, jobs) <- statusToJobs - job <- jobs if adjStatuses.contains(status) - } yield { - AllJobsResource.convertJobData(job, ui.jobProgressListener, false) - } - jobInfos.sortBy{- _.jobId} - } - -} - -private[v1] object AllJobsResource { - - def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] = { - val statusToJobs = ui.jobProgressListener.synchronized { - Seq( - JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq, - JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq, - JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq - ) - } - statusToJobs - } - - def convertJobData( - job: JobUIData, - listener: JobProgressListener, - includeStageDetails: Boolean): JobData = { - listener.synchronized { - val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max) - val lastStageData = lastStageInfo.flatMap { s => - listener.stageIdToData.get((s.stageId, s.attemptId)) - } - val lastStageName = lastStageInfo.map { _.name }.getOrElse("(Unknown Stage Name)") - val lastStageDescription = lastStageData.flatMap { _.description } - new JobData( - jobId = job.jobId, - name = lastStageName, - description = lastStageDescription, - submissionTime = job.submissionTime.map{new Date(_)}, - completionTime = job.completionTime.map{new Date(_)}, - stageIds = job.stageIds, - jobGroup = job.jobGroup, - status = job.status, - numTasks = job.numTasks, - numActiveTasks = job.numActiveTasks, - numCompletedTasks = job.numCompletedTasks, - numSkippedTasks = job.numCompletedTasks, - numFailedTasks = job.numFailedTasks, - numActiveStages = job.numActiveStages, - numCompletedStages = job.completedStageIndices.size, - numSkippedStages = job.numSkippedStages, - numFailedStages = job.numFailedStages - ) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala deleted file mode 100644 index 645ede26a0..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils} -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.storage.StorageListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllRDDResource(ui: SparkUI) { - - @GET - def rddList(): Seq[RDDStorageInfo] = { - val storageStatusList = ui.storageListener.storageStatusList - val rddInfos = ui.storageListener.rddInfoList - rddInfos.map{rddInfo => - AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList, - includeDetails = false) - } - } - -} - -private[spark] object AllRDDResource { - - def getRDDStorageInfo( - rddId: Int, - listener: StorageListener, - includeDetails: Boolean): Option[RDDStorageInfo] = { - val storageStatusList = listener.storageStatusList - listener.rddInfoList.find { _.id == rddId }.map { rddInfo => - getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails) - } - } - - def getRDDStorageInfo( - rddId: Int, - rddInfo: RDDInfo, - storageStatusList: Seq[StorageStatus], - includeDetails: Boolean): RDDStorageInfo = { - val workers = storageStatusList.map { (rddId, _) } - val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) - val blocks = storageStatusList - .flatMap { _.rddBlocksById(rddId) } - .sortWith { _._1.name < _._1.name } - .map { case (blockId, status) => - (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) - } - - val dataDistribution = if (includeDetails) { - Some(storageStatusList.map { status => - new RDDDataDistribution( - address = status.blockManagerId.hostPort, - memoryUsed = status.memUsedByRdd(rddId), - memoryRemaining = status.memRemaining, - diskUsed = status.diskUsedByRdd(rddId) - ) } ) - } else { - None - } - val partitions = if (includeDetails) { - Some(blocks.map { case (id, block, locations) => - new RDDPartitionInfo( - blockName = id.name, - storageLevel = block.storageLevel.description, - memoryUsed = block.memSize, - diskUsed = block.diskSize, - executors = locations - ) - } ) - } else { - None - } - - new RDDStorageInfo( - id = rddId, - name = rddInfo.name, - numPartitions = rddInfo.numPartitions, - numCachedPartitions = rddInfo.numCachedPartitions, - storageLevel = rddInfo.storageLevel.description, - memoryUsed = rddInfo.memSize, - diskUsed = rddInfo.diskSize, - dataDistribution = dataDistribution, - partitions = partitions - ) - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala deleted file mode 100644 index 50608588f0..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.{Arrays, Date, List => JList} -import javax.ws.rs.{GET, PathParam, Produces, QueryParam} -import javax.ws.rs.core.MediaType - -import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics} -import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo} -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData} -import org.apache.spark.util.Distribution - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllStagesResource(ui: SparkUI) { - - @GET - def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { - val listener = ui.jobProgressListener - val stageAndStatus = AllStagesResource.stagesAndStatus(ui) - val adjStatuses = { - if (statuses.isEmpty()) { - Arrays.asList(StageStatus.values(): _*) - } else { - statuses - } - } - for { - (status, stageList) <- stageAndStatus - stageInfo: StageInfo <- stageList if adjStatuses.contains(status) - stageUiData: StageUIData <- listener.synchronized { - listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) - } - } yield { - AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) - } - } -} - -private[v1] object AllStagesResource { - def stageUiToStageData( - status: StageStatus, - stageInfo: StageInfo, - stageUiData: StageUIData, - includeDetails: Boolean): StageData = { - - val taskData = if (includeDetails) { - Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) - } else { - None - } - val executorSummary = if (includeDetails) { - Some(stageUiData.executorSummary.map { case (k, summary) => - k -> new ExecutorStageSummary( - taskTime = summary.taskTime, - failedTasks = summary.failedTasks, - succeededTasks = summary.succeededTasks, - inputBytes = summary.inputBytes, - outputBytes = summary.outputBytes, - shuffleRead = summary.shuffleRead, - shuffleWrite = summary.shuffleWrite, - memoryBytesSpilled = summary.memoryBytesSpilled, - diskBytesSpilled = summary.diskBytesSpilled - ) - }) - } else { - None - } - - val accumulableInfo = stageUiData.accumulables.values.map { convertAccumulableInfo }.toSeq - - new StageData( - status = status, - stageId = stageInfo.stageId, - attemptId = stageInfo.attemptId, - numActiveTasks = stageUiData.numActiveTasks, - numCompleteTasks = stageUiData.numCompleteTasks, - numFailedTasks = stageUiData.numFailedTasks, - executorRunTime = stageUiData.executorRunTime, - inputBytes = stageUiData.inputBytes, - inputRecords = stageUiData.inputRecords, - outputBytes = stageUiData.outputBytes, - outputRecords = stageUiData.outputRecords, - shuffleReadBytes = stageUiData.shuffleReadTotalBytes, - shuffleReadRecords = stageUiData.shuffleReadRecords, - shuffleWriteBytes = stageUiData.shuffleWriteBytes, - shuffleWriteRecords = stageUiData.shuffleWriteRecords, - memoryBytesSpilled = stageUiData.memoryBytesSpilled, - diskBytesSpilled = stageUiData.diskBytesSpilled, - schedulingPool = stageUiData.schedulingPool, - name = stageInfo.name, - details = stageInfo.details, - accumulatorUpdates = accumulableInfo, - tasks = taskData, - executorSummary = executorSummary - ) - } - - def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = { - val listener = ui.jobProgressListener - listener.synchronized { - Seq( - StageStatus.ACTIVE -> listener.activeStages.values.toSeq, - StageStatus.COMPLETE -> listener.completedStages.reverse.toSeq, - StageStatus.FAILED -> listener.failedStages.reverse.toSeq, - StageStatus.PENDING -> listener.pendingStages.values.toSeq - ) - } - } - - def convertTaskData(uiData: TaskUIData): TaskData = { - new TaskData( - taskId = uiData.taskInfo.taskId, - index = uiData.taskInfo.index, - attempt = uiData.taskInfo.attempt, - launchTime = new Date(uiData.taskInfo.launchTime), - executorId = uiData.taskInfo.executorId, - host = uiData.taskInfo.host, - taskLocality = uiData.taskInfo.taskLocality.toString(), - speculative = uiData.taskInfo.speculative, - accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo }, - errorMessage = uiData.errorMessage, - taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics } - ) - } - - def taskMetricDistributions( - allTaskData: Iterable[TaskUIData], - quantiles: Array[Double]): TaskMetricDistributions = { - - val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq - - def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] = - Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) - - // We need to do a lot of similar munging to nested metrics here. For each one, - // we want (a) extract the values for nested metrics (b) make a distribution for each metric - // (c) shove the distribution into the right field in our return type and (d) only return - // a result if the option is defined for any of the tasks. MetricHelper is a little util - // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just - // implement one "build" method, which just builds the quantiles for each field. - - val inputMetrics: Option[InputMetricDistributions] = - new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) { - def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = { - raw.inputMetrics - } - - def build: InputMetricDistributions = new InputMetricDistributions( - bytesRead = submetricQuantiles(_.bytesRead), - recordsRead = submetricQuantiles(_.recordsRead) - ) - }.metricOption - - val outputMetrics: Option[OutputMetricDistributions] = - new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) { - def getSubmetrics(raw:InternalTaskMetrics): Option[InternalOutputMetrics] = { - raw.outputMetrics - } - def build: OutputMetricDistributions = new OutputMetricDistributions( - bytesWritten = submetricQuantiles(_.bytesWritten), - recordsWritten = submetricQuantiles(_.recordsWritten) - ) - }.metricOption - - val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] = - new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics, - quantiles) { - def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = { - raw.shuffleReadMetrics - } - def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions( - readBytes = submetricQuantiles(_.totalBytesRead), - readRecords = submetricQuantiles(_.recordsRead), - remoteBytesRead = submetricQuantiles(_.remoteBytesRead), - remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), - localBlocksFetched = submetricQuantiles(_.localBlocksFetched), - totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched), - fetchWaitTime = submetricQuantiles(_.fetchWaitTime) - ) - }.metricOption - - val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] = - new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics, - quantiles) { - def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = { - raw.shuffleWriteMetrics - } - def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions( - writeBytes = submetricQuantiles(_.shuffleBytesWritten), - writeRecords = submetricQuantiles(_.shuffleRecordsWritten), - writeTime = submetricQuantiles(_.shuffleWriteTime) - ) - }.metricOption - - new TaskMetricDistributions( - quantiles = quantiles, - executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), - executorRunTime = metricQuantiles(_.executorRunTime), - resultSize = metricQuantiles(_.resultSize), - jvmGcTime = metricQuantiles(_.jvmGCTime), - resultSerializationTime = metricQuantiles(_.resultSerializationTime), - memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled), - diskBytesSpilled = metricQuantiles(_.diskBytesSpilled), - inputMetrics = inputMetrics, - outputMetrics = outputMetrics, - shuffleReadMetrics = shuffleReadMetrics, - shuffleWriteMetrics = shuffleWriteMetrics - ) - } - - def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = { - new AccumulableInfo(acc.id, acc.name, acc.update, acc.value) - } - - def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = { - new TaskMetrics( - executorDeserializeTime = internal.executorDeserializeTime, - executorRunTime = internal.executorRunTime, - resultSize = internal.resultSize, - jvmGcTime = internal.jvmGCTime, - resultSerializationTime = internal.resultSerializationTime, - memoryBytesSpilled = internal.memoryBytesSpilled, - diskBytesSpilled = internal.diskBytesSpilled, - inputMetrics = internal.inputMetrics.map { convertInputMetrics }, - outputMetrics = Option(internal.outputMetrics).flatten.map { convertOutputMetrics }, - shuffleReadMetrics = internal.shuffleReadMetrics.map { convertShuffleReadMetrics }, - shuffleWriteMetrics = internal.shuffleWriteMetrics.map { convertShuffleWriteMetrics } - ) - } - - def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = { - new InputMetrics( - bytesRead = internal.bytesRead, - recordsRead = internal.recordsRead - ) - } - - def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = { - new OutputMetrics( - bytesWritten = internal.bytesWritten, - recordsWritten = internal.recordsWritten - ) - } - - def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = { - new ShuffleReadMetrics( - remoteBlocksFetched = internal.remoteBlocksFetched, - localBlocksFetched = internal.localBlocksFetched, - fetchWaitTime = internal.fetchWaitTime, - remoteBytesRead = internal.remoteBytesRead, - totalBlocksFetched = internal.totalBlocksFetched, - recordsRead = internal.recordsRead - ) - } - - def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = { - new ShuffleWriteMetrics( - bytesWritten = internal.shuffleBytesWritten, - writeTime = internal.shuffleWriteTime, - recordsWritten = internal.shuffleRecordsWritten - ) - } -} - -/** - * Helper for getting distributions from nested metric types. Many of the metrics we want are - * contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This makes it easy to handle - * the options (returning None if the metrics are all empty), and extract the quantiles for each - * metric. After creating an instance, call metricOption to get the result type. - */ -private[v1] abstract class MetricHelper[I,O]( - rawMetrics: Seq[InternalTaskMetrics], - quantiles: Array[Double]) { - - def getSubmetrics(raw: InternalTaskMetrics): Option[I] - - def build: O - - val data: Seq[I] = rawMetrics.flatMap(getSubmetrics) - - /** applies the given function to all input metrics, and returns the quantiles */ - def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { - Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) - } - - def metricOption: Option[O] = { - if (data.isEmpty) { - None - } else { - Some(build) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala deleted file mode 100644 index 17b521f3e1..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.{Arrays, Date, List => JList} -import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam} -import javax.ws.rs.core.MediaType - -import org.apache.spark.deploy.history.ApplicationHistoryInfo -import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo} - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ApplicationListResource(uiRoot: UIRoot) { - - @GET - def appList( - @QueryParam("status") status: JList[ApplicationStatus], - @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam, - @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam) - : Iterator[ApplicationInfo] = { - val allApps = uiRoot.getApplicationInfoList - val adjStatus = { - if (status.isEmpty) { - Arrays.asList(ApplicationStatus.values(): _*) - } else { - status - } - } - val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED) - val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING) - allApps.filter { app => - val anyRunning = app.attempts.exists(!_.completed) - // if any attempt is still running, we consider the app to also still be running - val statusOk = (!anyRunning && includeCompleted) || - (anyRunning && includeRunning) - // keep the app if *any* attempts fall in the right time window - val dateOk = app.attempts.exists { attempt => - attempt.startTime.getTime >= minDate.timestamp && - attempt.startTime.getTime <= maxDate.timestamp - } - statusOk && dateOk - } - } -} - -private[spark] object ApplicationsListResource { - def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = { - new ApplicationInfo( - id = app.id, - name = app.name, - attempts = app.attempts.map { internalAttemptInfo => - new ApplicationAttemptInfo( - attemptId = internalAttemptInfo.attemptId, - startTime = new Date(internalAttemptInfo.startTime), - endTime = new Date(internalAttemptInfo.endTime), - sparkUser = internalAttemptInfo.sparkUser, - completed = internalAttemptInfo.completed - ) - } - ) - } - - def convertApplicationInfo( - internal: InternalApplicationInfo, - completed: Boolean): ApplicationInfo = { - // standalone application info always has just one attempt - new ApplicationInfo( - id = internal.id, - name = internal.desc.name, - attempts = Seq(new ApplicationAttemptInfo( - attemptId = None, - startTime = new Date(internal.startTime), - endTime = new Date(internal.endTime), - sparkUser = internal.desc.user, - completed = completed - )) - ) - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala deleted file mode 100644 index 8ad4656b4d..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.exec.ExecutorsPage - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ExecutorListResource(ui: SparkUI) { - - @GET - def executorList(): Seq[ExecutorSummary] = { - val listener = ui.executorsListener - val storageStatusList = listener.storageStatusList - (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala deleted file mode 100644 index 202a5191ad..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.io.OutputStream -import java.lang.annotation.Annotation -import java.lang.reflect.Type -import java.text.SimpleDateFormat -import java.util.{Calendar, SimpleTimeZone} -import javax.ws.rs.Produces -import javax.ws.rs.core.{MediaType, MultivaluedMap} -import javax.ws.rs.ext.{MessageBodyWriter, Provider} - -import com.fasterxml.jackson.annotation.JsonInclude -import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} - -/** - * This class converts the POJO metric responses into json, using jackson. - * - * This doesn't follow the standard jersey-jackson plugin options, because we want to stick - * with an old version of jersey (since we have it from yarn anyway) and don't want to pull in lots - * of dependencies from a new plugin. - * - * Note that jersey automatically discovers this class based on its package and its annotations. - */ -@Provider -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ - - val mapper = new ObjectMapper() { - override def writeValueAsString(t: Any): String = { - super.writeValueAsString(t) - } - } - mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule) - mapper.enable(SerializationFeature.INDENT_OUTPUT) - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) - mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat) - - override def isWriteable( - aClass: Class[_], - `type`: Type, - annotations: Array[Annotation], - mediaType: MediaType): Boolean = { - true - } - - override def writeTo( - t: Object, - aClass: Class[_], - `type`: Type, - annotations: Array[Annotation], - mediaType: MediaType, - multivaluedMap: MultivaluedMap[String, AnyRef], - outputStream: OutputStream): Unit = { - t match { - case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8")) - case _ => mapper.writeValue(outputStream, t) - } - } - - override def getSize( - t: Object, - aClass: Class[_], - `type`: Type, - annotations: Array[Annotation], - mediaType: MediaType): Long = { - -1L - } -} - -private[spark] object JacksonMessageWriter { - def makeISODateFormat: SimpleDateFormat = { - val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'") - val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT")) - iso8601.setCalendar(cal) - iso8601 - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala deleted file mode 100644 index c3ec45f546..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.servlet.ServletContext -import javax.ws.rs._ -import javax.ws.rs.core.{Context, Response} - -import com.sun.jersey.api.core.ResourceConfig -import com.sun.jersey.spi.container.servlet.ServletContainer -import org.eclipse.jetty.server.handler.ContextHandler -import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} - -import org.apache.spark.SecurityManager -import org.apache.spark.ui.SparkUI - -/** - * Main entry point for serving spark application metrics as json, using JAX-RS. - * - * Each resource should have endpoints that return **public** classes defined in api.scala. Mima - * binary compatibility checks ensure that we don't inadvertently make changes that break the api. - * The returned objects are automatically converted to json by jackson with JacksonMessageWriter. - * In addition, there are a number of tests in HistoryServerSuite that compare the json to "golden - * files". Any changes and additions should be reflected there as well -- see the notes in - * HistoryServerSuite. - */ -@Path("/v1") -private[v1] class JsonRootResource extends UIRootFromServletContext { - - @Path("applications") - def getApplicationList(): ApplicationListResource = { - new ApplicationListResource(uiRoot) - } - - @Path("applications/{appId}") - def getApplication(): OneApplicationResource = { - new OneApplicationResource(uiRoot) - } - - @Path("applications/{appId}/{attemptId}/jobs") - def getJobs( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllJobsResource = { - uiRoot.withSparkUI(appId, Some(attemptId)) { ui => - new AllJobsResource(ui) - } - } - - @Path("applications/{appId}/jobs") - def getJobs(@PathParam("appId") appId: String): AllJobsResource = { - uiRoot.withSparkUI(appId, None) { ui => - new AllJobsResource(ui) - } - } - - @Path("applications/{appId}/jobs/{jobId: \\d+}") - def getJob(@PathParam("appId") appId: String): OneJobResource = { - uiRoot.withSparkUI(appId, None) { ui => - new OneJobResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}") - def getJob( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneJobResource = { - uiRoot.withSparkUI(appId, Some(attemptId)) { ui => - new OneJobResource(ui) - } - } - - @Path("applications/{appId}/executors") - def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = { - uiRoot.withSparkUI(appId, None) { ui => - new ExecutorListResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/executors") - def getExecutors( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): ExecutorListResource = { - uiRoot.withSparkUI(appId, Some(attemptId)) { ui => - new ExecutorListResource(ui) - } - } - - - @Path("applications/{appId}/stages") - def getStages(@PathParam("appId") appId: String): AllStagesResource= { - uiRoot.withSparkUI(appId, None) { ui => - new AllStagesResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/stages") - def getStages( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllStagesResource= { - uiRoot.withSparkUI(appId, Some(attemptId)) { ui => - new AllStagesResource(ui) - } - } - - @Path("applications/{appId}/stages/{stageId: \\d+}") - def getStage(@PathParam("appId") appId: String): OneStageResource= { - uiRoot.withSparkUI(appId, None) { ui => - new OneStageResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}") - def getStage( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneStageResource = { - uiRoot.withSparkUI(appId, Some(attemptId)) { ui => - new OneStageResource(ui) - } - } - - @Path("applications/{appId}/storage/rdd") - def getRdds(@PathParam("appId") appId: String): AllRDDResource = { - uiRoot.withSparkUI(appId, None) { ui => - new AllRDDResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/storage/rdd") - def getRdds( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllRDDResource = { - uiRoot.withSparkUI(appId, Some(attemptId)) { ui => - new AllRDDResource(ui) - } - } - - @Path("applications/{appId}/storage/rdd/{rddId: \\d+}") - def getRdd(@PathParam("appId") appId: String): OneRDDResource = { - uiRoot.withSparkUI(appId, None) { ui => - new OneRDDResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}") - def getRdd( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneRDDResource = { - uiRoot.withSparkUI(appId, Some(attemptId)) { ui => - new OneRDDResource(ui) - } - } - -} - -private[spark] object JsonRootResource { - - def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = { - val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS) - jerseyContext.setContextPath("/json") - val holder:ServletHolder = new ServletHolder(classOf[ServletContainer]) - holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", - "com.sun.jersey.api.core.PackagesResourceConfig") - holder.setInitParameter("com.sun.jersey.config.property.packages", - "org.apache.spark.status.api.v1") - holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS, - classOf[SecurityFilter].getCanonicalName) - UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot) - jerseyContext.addServlet(holder, "/*") - jerseyContext - } -} - -/** - * This trait is shared by the all the root containers for application UI information -- - * the HistoryServer, the Master UI, and the application UI. This provides the common - * interface needed for them all to expose application info as json. - */ -private[spark] trait UIRoot { - def getSparkUI(appKey: String): Option[SparkUI] - def getApplicationInfoList: Iterator[ApplicationInfo] - - /** - * Get the spark UI with the given appID, and apply a function - * to it. If there is no such app, throw an appropriate exception - */ - def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { - val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) - getSparkUI(appKey) match { - case Some(ui) => - f(ui) - case None => throw new NotFoundException("no such app: " + appId) - } - } - def securityManager: SecurityManager -} - -private[v1] object UIRootFromServletContext { - - private val attribute = getClass.getCanonicalName - - def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = { - contextHandler.setAttribute(attribute, uiRoot) - } - - def getUiRoot(context: ServletContext): UIRoot = { - context.getAttribute(attribute).asInstanceOf[UIRoot] - } -} - -private[v1] trait UIRootFromServletContext { - @Context - var servletContext: ServletContext = _ - - def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext) -} - -private[v1] class NotFoundException(msg: String) extends WebApplicationException( - new NoSuchElementException(msg), - Response - .status(Response.Status.NOT_FOUND) - .entity(ErrorWrapper(msg)) - .build() -) - -private[v1] class BadParameterException(msg: String) extends WebApplicationException( - new IllegalArgumentException(msg), - Response - .status(Response.Status.BAD_REQUEST) - .entity(ErrorWrapper(msg)) - .build() -) { - def this(param: String, exp: String, actual: String) = { - this(raw"""Bad value for parameter "$param". Expected a $exp, got "$actual"""") - } -} - -/** - * Signal to JacksonMessageWriter to not convert the message into json (which would result in an - * extra set of quotes). - */ -private[v1] case class ErrorWrapper(s: String) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala deleted file mode 100644 index b5ef72649e..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.core.MediaType -import javax.ws.rs.{Produces, PathParam, GET} - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneApplicationResource(uiRoot: UIRoot) { - - @GET - def getApp(@PathParam("appId") appId: String): ApplicationInfo = { - val apps = uiRoot.getApplicationInfoList.find { _.id == appId } - apps.getOrElse(throw new NotFoundException("unknown app: " + appId)) - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala deleted file mode 100644 index 6d8a60d480..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{PathParam, GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.JobExecutionStatus -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData.JobUIData - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneJobResource(ui: SparkUI) { - - @GET - def oneJob(@PathParam("jobId") jobId: Int): JobData = { - val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = - AllJobsResource.getStatusToJobs(ui) - val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId} - jobOpt.map { job => - AllJobsResource.convertJobData(job, ui.jobProgressListener, false) - }.getOrElse { - throw new NotFoundException("unknown job: " + jobId) - } - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala deleted file mode 100644 index 07b224fac4..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{PathParam, GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneRDDResource(ui: SparkUI) { - - @GET - def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = { - AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse( - throw new NotFoundException(s"no rdd found w/ id $rddId") - ) - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala deleted file mode 100644 index fd24aea63a..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -import org.apache.spark.SparkException -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.status.api.v1.StageStatus._ -import org.apache.spark.status.api.v1.TaskSorting._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.JobProgressListener -import org.apache.spark.ui.jobs.UIData.StageUIData - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneStageResource(ui: SparkUI) { - - @GET - @Path("") - def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = { - withStage(stageId){ stageAttempts => - stageAttempts.map { stage => - AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, - includeDetails = true) - } - } - } - - @GET - @Path("/{stageAttemptId: \\d+}") - def oneAttemptData( - @PathParam("stageId") stageId: Int, - @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { - withStageAttempt(stageId, stageAttemptId) { stage => - AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, - includeDetails = true) - } - } - - @GET - @Path("/{stageAttemptId: \\d+}/taskSummary") - def taskSummary( - @PathParam("stageId") stageId: Int, - @PathParam("stageAttemptId") stageAttemptId: Int, - @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) - : TaskMetricDistributions = { - withStageAttempt(stageId, stageAttemptId) { stage => - val quantiles = quantileString.split(",").map { s => - try { - s.toDouble - } catch { - case nfe: NumberFormatException => - throw new BadParameterException("quantiles", "double", s) - } - } - AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, quantiles) - } - } - - @GET - @Path("/{stageAttemptId: \\d+}/taskList") - def taskList( - @PathParam("stageId") stageId: Int, - @PathParam("stageAttemptId") stageAttemptId: Int, - @DefaultValue("0") @QueryParam("offset") offset: Int, - @DefaultValue("20") @QueryParam("length") length: Int, - @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { - withStageAttempt(stageId, stageAttemptId) { stage => - val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq - .sorted(OneStageResource.ordering(sortBy)) - tasks.slice(offset, offset + length) - } - } - - private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, ui: StageUIData) - - private def withStage[T](stageId: Int)(f: Seq[StageStatusInfoUi] => T): T = { - val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId) - if (stageAttempts.isEmpty) { - throw new NotFoundException("unknown stage: " + stageId) - } else { - f(stageAttempts) - } - } - - private def findStageStatusUIData( - listener: JobProgressListener, - stageId: Int): Seq[StageStatusInfoUi] = { - listener.synchronized { - def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): Seq[StageStatusInfoUi] = { - infos.filter { _.stageId == stageId }.map { info => - val ui = listener.stageIdToData.getOrElse((info.stageId, info.attemptId), - // this is an internal error -- we should always have uiData - throw new SparkException( - s"no stage ui data found for stage: ${info.stageId}:${info.attemptId}") - ) - StageStatusInfoUi(status, info, ui) - } - } - getStatusInfoUi(ACTIVE, listener.activeStages.values.toSeq) ++ - getStatusInfoUi(COMPLETE, listener.completedStages) ++ - getStatusInfoUi(FAILED, listener.failedStages) ++ - getStatusInfoUi(PENDING, listener.pendingStages.values.toSeq) - } - } - - private def withStageAttempt[T]( - stageId: Int, - stageAttemptId: Int) - (f: StageStatusInfoUi => T): T = { - withStage(stageId) { attempts => - val oneAttempt = attempts.find { stage => stage.info.attemptId == stageAttemptId } - oneAttempt match { - case Some(stage) => - f(stage) - case None => - val stageAttempts = attempts.map { _.info.attemptId } - throw new NotFoundException(s"unknown attempt for stage $stageId. " + - s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}") - } - } - } -} - -object OneStageResource { - def ordering(taskSorting: TaskSorting): Ordering[TaskData] = { - val extractor: (TaskData => Long) = td => - taskSorting match { - case ID => td.taskId - case INCREASING_RUNTIME => td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L) - case DECREASING_RUNTIME => -td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L) - } - Ordering.by(extractor) - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala deleted file mode 100644 index 95fbd96ade..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.WebApplicationException -import javax.ws.rs.core.Response - -import com.sun.jersey.spi.container.{ContainerRequest, ContainerRequestFilter} - -private[v1] class SecurityFilter extends ContainerRequestFilter with UIRootFromServletContext { - def filter(req: ContainerRequest): ContainerRequest = { - val user = Option(req.getUserPrincipal).map { _.getName }.orNull - if (uiRoot.securityManager.checkUIViewPermissions(user)) { - req - } else { - throw new WebApplicationException( - Response - .status(Response.Status.FORBIDDEN) - .entity(raw"""user "$user"is not authorized""") - .build() - ) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala b/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala deleted file mode 100644 index cee29786c3..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.text.SimpleDateFormat -import java.util.TimeZone -import javax.ws.rs.WebApplicationException -import javax.ws.rs.core.Response -import javax.ws.rs.core.Response.Status - -import scala.util.Try - -private[v1] class SimpleDateParam(val originalValue: String) { - val timestamp: Long = { - SimpleDateParam.formats.collectFirst { - case fmt if Try(fmt.parse(originalValue)).isSuccess => - fmt.parse(originalValue).getTime() - }.getOrElse( - throw new WebApplicationException( - Response - .status(Status.BAD_REQUEST) - .entity("Couldn't parse date: " + originalValue) - .build() - ) - ) - } -} - -private[v1] object SimpleDateParam { - - val formats: Seq[SimpleDateFormat] = { - - val gmtDay = new SimpleDateFormat("yyyy-MM-dd") - gmtDay.setTimeZone(TimeZone.getTimeZone("GMT")) - - Seq( - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz"), - gmtDay - ) - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala deleted file mode 100644 index ef3c8570d8..0000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.Date - -import scala.collection.Map - -import org.apache.spark.JobExecutionStatus - -class ApplicationInfo private[spark]( - val id: String, - val name: String, - val attempts: Seq[ApplicationAttemptInfo]) - -class ApplicationAttemptInfo private[spark]( - val attemptId: Option[String], - val startTime: Date, - val endTime: Date, - val sparkUser: String, - val completed: Boolean = false) - -class ExecutorStageSummary private[spark]( - val taskTime : Long, - val failedTasks : Int, - val succeededTasks : Int, - val inputBytes : Long, - val outputBytes : Long, - val shuffleRead : Long, - val shuffleWrite : Long, - val memoryBytesSpilled : Long, - val diskBytesSpilled : Long) - -class ExecutorSummary private[spark]( - val id: String, - val hostPort: String, - val rddBlocks: Int, - val memoryUsed: Long, - val diskUsed: Long, - val activeTasks: Int, - val failedTasks: Int, - val completedTasks: Int, - val totalTasks: Int, - val totalDuration: Long, - val totalInputBytes: Long, - val totalShuffleRead: Long, - val totalShuffleWrite: Long, - val maxMemory: Long, - val executorLogs: Map[String, String]) - -class JobData private[spark]( - val jobId: Int, - val name: String, - val description: Option[String], - val submissionTime: Option[Date], - val completionTime: Option[Date], - val stageIds: Seq[Int], - val jobGroup: Option[String], - val status: JobExecutionStatus, - val numTasks: Int, - val numActiveTasks: Int, - val numCompletedTasks: Int, - val numSkippedTasks: Int, - val numFailedTasks: Int, - val numActiveStages: Int, - val numCompletedStages: Int, - val numSkippedStages: Int, - val numFailedStages: Int) - -// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage -// page ... does anybody pay attention to it? -class RDDStorageInfo private[spark]( - val id: Int, - val name: String, - val numPartitions: Int, - val numCachedPartitions: Int, - val storageLevel: String, - val memoryUsed: Long, - val diskUsed: Long, - val dataDistribution: Option[Seq[RDDDataDistribution]], - val partitions: Option[Seq[RDDPartitionInfo]]) - -class RDDDataDistribution private[spark]( - val address: String, - val memoryUsed: Long, - val memoryRemaining: Long, - val diskUsed: Long) - -class RDDPartitionInfo private[spark]( - val blockName: String, - val storageLevel: String, - val memoryUsed: Long, - val diskUsed: Long, - val executors: Seq[String]) - -class StageData private[spark]( - val status: StageStatus, - val stageId: Int, - val attemptId: Int, - val numActiveTasks: Int , - val numCompleteTasks: Int, - val numFailedTasks: Int, - - val executorRunTime: Long, - - val inputBytes: Long, - val inputRecords: Long, - val outputBytes: Long, - val outputRecords: Long, - val shuffleReadBytes: Long, - val shuffleReadRecords: Long, - val shuffleWriteBytes: Long, - val shuffleWriteRecords: Long, - val memoryBytesSpilled: Long, - val diskBytesSpilled: Long, - - val name: String, - val details: String, - val schedulingPool: String, - - val accumulatorUpdates: Seq[AccumulableInfo], - val tasks: Option[Map[Long, TaskData]], - val executorSummary:Option[Map[String,ExecutorStageSummary]]) - -class TaskData private[spark]( - val taskId: Long, - val index: Int, - val attempt: Int, - val launchTime: Date, - val executorId: String, - val host: String, - val taskLocality: String, - val speculative: Boolean, - val accumulatorUpdates: Seq[AccumulableInfo], - val errorMessage: Option[String] = None, - val taskMetrics: Option[TaskMetrics] = None) - -class TaskMetrics private[spark]( - val executorDeserializeTime: Long, - val executorRunTime: Long, - val resultSize: Long, - val jvmGcTime: Long, - val resultSerializationTime: Long, - val memoryBytesSpilled: Long, - val diskBytesSpilled: Long, - val inputMetrics: Option[InputMetrics], - val outputMetrics: Option[OutputMetrics], - val shuffleReadMetrics: Option[ShuffleReadMetrics], - val shuffleWriteMetrics: Option[ShuffleWriteMetrics]) - -class InputMetrics private[spark]( - val bytesRead: Long, - val recordsRead: Long) - -class OutputMetrics private[spark]( - val bytesWritten: Long, - val recordsWritten: Long) - -class ShuffleReadMetrics private[spark]( - val remoteBlocksFetched: Int, - val localBlocksFetched: Int, - val fetchWaitTime: Long, - val remoteBytesRead: Long, - val totalBlocksFetched: Int, - val recordsRead: Long) - -class ShuffleWriteMetrics private[spark]( - val bytesWritten: Long, - val writeTime: Long, - val recordsWritten: Long) - -class TaskMetricDistributions private[spark]( - val quantiles: IndexedSeq[Double], - - val executorDeserializeTime: IndexedSeq[Double], - val executorRunTime: IndexedSeq[Double], - val resultSize: IndexedSeq[Double], - val jvmGcTime: IndexedSeq[Double], - val resultSerializationTime: IndexedSeq[Double], - val memoryBytesSpilled: IndexedSeq[Double], - val diskBytesSpilled: IndexedSeq[Double], - - val inputMetrics: Option[InputMetricDistributions], - val outputMetrics: Option[OutputMetricDistributions], - val shuffleReadMetrics: Option[ShuffleReadMetricDistributions], - val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions]) - -class InputMetricDistributions private[spark]( - val bytesRead: IndexedSeq[Double], - val recordsRead: IndexedSeq[Double]) - -class OutputMetricDistributions private[spark]( - val bytesWritten: IndexedSeq[Double], - val recordsWritten: IndexedSeq[Double]) - -class ShuffleReadMetricDistributions private[spark]( - val readBytes: IndexedSeq[Double], - val readRecords: IndexedSeq[Double], - val remoteBlocksFetched: IndexedSeq[Double], - val localBlocksFetched: IndexedSeq[Double], - val fetchWaitTime: IndexedSeq[Double], - val remoteBytesRead: IndexedSeq[Double], - val totalBlocksFetched: IndexedSeq[Double]) - -class ShuffleWriteMetricDistributions private[spark]( - val writeBytes: IndexedSeq[Double], - val writeRecords: IndexedSeq[Double], - val writeTime: IndexedSeq[Double]) - -class AccumulableInfo private[spark]( - val id: Long, - val name: String, - val update: Option[String], - val value: String) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index ec711480eb..7d75929b96 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -25,17 +25,13 @@ import org.apache.spark.scheduler._ /** * :: DeveloperApi :: * A SparkListener that maintains executor storage status. - * - * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi class StorageStatusListener extends SparkListener { // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() - def storageStatusList: Seq[StorageStatus] = synchronized { - executorIdToStorageStatus.values.toSeq - } + def storageStatusList: Seq[StorageStatus] = executorIdToStorageStatus.values.toSeq /** Update storage status list to reflect updated block statuses */ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index bfe4a180e8..a5271f0574 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -17,9 +17,6 @@ package org.apache.spark.ui -import java.util.Date - -import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, JsonRootResource, UIRoot} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener @@ -36,7 +33,7 @@ import org.apache.spark.ui.scope.RDDOperationGraphListener private[spark] class SparkUI private ( val sc: Option[SparkContext], val conf: SparkConf, - securityManager: SecurityManager, + val securityManager: SecurityManager, val environmentListener: EnvironmentListener, val storageStatusListener: StorageStatusListener, val executorsListener: ExecutorsListener, @@ -44,27 +41,22 @@ private[spark] class SparkUI private ( val storageListener: StorageListener, val operationGraphListener: RDDOperationGraphListener, var appName: String, - val basePath: String, - val startTime: Long) + val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") - with Logging - with UIRoot { + with Logging { val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) - - val stagesTab = new StagesTab(this) - /** Initialize all components of the server. */ def initialize() { attachTab(new JobsTab(this)) + val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) - attachHandler(JsonRootResource.getJsonServlet(this)) // This should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages", stagesTab.handleKillRequest, @@ -91,24 +83,6 @@ private[spark] class SparkUI private ( private[spark] def appUIHostPort = publicHostName + ":" + boundPort private[spark] def appUIAddress = s"http://$appUIHostPort" - - def getSparkUI(appId: String): Option[SparkUI] = { - if (appId == appName) Some(this) else None - } - - def getApplicationInfoList: Iterator[ApplicationInfo] = { - Iterator(new ApplicationInfo( - id = appName, - name = appName, - attempts = Seq(new ApplicationAttemptInfo( - attemptId = None, - startTime = new Date(startTime), - endTime = new Date(-1), - sparkUser = "", - completed = false - )) - )) - } } private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) @@ -135,10 +109,9 @@ private[spark] object SparkUI { listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, - appName: String, - startTime: Long): SparkUI = { + appName: String): SparkUI = { create(Some(sc), conf, listenerBus, securityManager, appName, - jobProgressListener = Some(jobProgressListener), startTime = startTime) + jobProgressListener = Some(jobProgressListener)) } def createHistoryUI( @@ -146,9 +119,8 @@ private[spark] object SparkUI { listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, - basePath: String, - startTime: Long): SparkUI = { - create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) + basePath: String): SparkUI = { + create(None, conf, listenerBus, securityManager, appName, basePath) } /** @@ -165,8 +137,7 @@ private[spark] object SparkUI { securityManager: SecurityManager, appName: String, basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None, - startTime: Long): SparkUI = { + jobProgressListener: Option[JobProgressListener] = None): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) @@ -188,6 +159,6 @@ private[spark] object SparkUI { new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + appName, basePath) } } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 384f2ad26e..f9860d1a5c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -37,7 +37,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. */ private[spark] abstract class WebUI( - val securityManager: SecurityManager, + securityManager: SecurityManager, port: Int, conf: SparkConf, basePath: String = "", @@ -77,9 +77,15 @@ private[spark] abstract class WebUI( val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, basePath) + val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", + (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath) attachHandler(renderHandler) + attachHandler(renderJsonHandler) pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) .append(renderHandler) + pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) + .append(renderJsonHandler) + } /** Attach a handler to this UI. */ diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index b247e4cdc3..956608d7c0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -22,11 +22,11 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.status.api.v1.ExecutorSummary import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils -// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive +/** Summary information about an executor to display in the UI. */ +// Needs to be private[ui] because of a false positive MiMa failure. private[ui] case class ExecutorSummaryInfo( id: String, hostPort: String, @@ -44,7 +44,6 @@ private[ui] case class ExecutorSummaryInfo( maxMemory: Long, executorLogs: Map[String, String]) - private[ui] class ExecutorsPage( parent: ExecutorsTab, threadDumpEnabled: Boolean) @@ -56,8 +55,7 @@ private[ui] class ExecutorsPage( val maxMem = storageStatusList.map(_.maxMem).sum val memUsed = storageStatusList.map(_.memUsed).sum val diskUsed = storageStatusList.map(_.diskUsed).sum - val execInfo = for (statusId <- 0 until storageStatusList.size) yield - ExecutorsPage.getExecInfo(listener, statusId) + val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty @@ -113,7 +111,7 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { + private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed @@ -172,11 +170,8 @@ private[ui] class ExecutorsPage( </tr> } -} - -private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { + private def getExecInfo(statusId: Int): ExecutorSummaryInfo = { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort @@ -194,7 +189,7 @@ private[spark] object ExecutorsPage { val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) - new ExecutorSummary( + new ExecutorSummaryInfo( execId, hostPort, rddBlocks, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 09323d1d80..f6abf27db4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -271,12 +271,6 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val shouldShowCompletedJobs = completedJobs.nonEmpty val shouldShowFailedJobs = failedJobs.nonEmpty - val completedJobNumStr = if (completedJobs.size == listener.numCompletedJobs) { - s"${completedJobs.size}" - } else { - s"${listener.numCompletedJobs}, only showing ${completedJobs.size}" - } - val summary: NodeSeq = <div> <ul class="unstyled"> @@ -301,9 +295,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } { if (shouldShowCompletedJobs) { - <li id="completed-summary"> + <li> <a href="#completed"><strong>Completed Jobs:</strong></a> - {completedJobNumStr} + {completedJobs.size} </li> } } @@ -311,7 +305,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { if (shouldShowFailedJobs) { <li> <a href="#failed"><strong>Failed Jobs:</strong></a> - {listener.numFailedJobs} + {failedJobs.size} </li> } } @@ -328,7 +322,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { activeJobsTable } if (shouldShowCompletedJobs) { - content ++= <h4 id="completed">Completed Jobs ({completedJobNumStr})</h4> ++ + content ++= <h4 id="completed">Completed Jobs ({completedJobs.size})</h4> ++ completedJobsTable } if (shouldShowFailedJobs) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index a37f739ab9..236bc8ea92 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -64,12 +64,6 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val shouldShowCompletedStages = completedStages.nonEmpty val shouldShowFailedStages = failedStages.nonEmpty - val completedStageNumStr = if (numCompletedStages == completedStages.size) { - s"$numCompletedStages" - } else { - s"$numCompletedStages, only showing ${completedStages.size}" - } - val summary: NodeSeq = <div> <ul class="unstyled"> @@ -104,9 +98,9 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { } { if (shouldShowCompletedStages) { - <li id="completed-summary"> + <li> <a href="#completed"><strong>Completed Stages:</strong></a> - {completedStageNumStr} + {numCompletedStages} </li> } } @@ -138,7 +132,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { pendingStagesTable.toNodeSeq } if (shouldShowCompletedStages) { - content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++ + content ++= <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++ completedStagesTable.toNodeSeq } if (shouldShowFailedStages) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 7163217e1f..96cc3d78d0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -187,7 +187,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { val jobDataOption = listener.jobIdToData.get(jobId) if (jobDataOption.isEmpty) { val content = - <div id="no-info"> + <div> <p>No information to display for job {jobId}</p> </div> return UIUtils.headerSparkPage( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 246e191d64..8f9aa9fdec 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -74,8 +74,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // JobProgressListener's retention limits. var numCompletedStages = 0 var numFailedStages = 0 - var numCompletedJobs = 0 - var numFailedJobs = 0 // Misc: val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]() @@ -219,12 +217,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { completedJobs += jobData trimJobsIfNecessary(completedJobs) jobData.status = JobExecutionStatus.SUCCEEDED - numCompletedJobs += 1 case JobFailed(exception) => failedJobs += jobData trimJobsIfNecessary(failedJobs) jobData.status = JobExecutionStatus.FAILED - numFailedJobs += 1 } for (stageId <- jobData.stageIds) { stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index f3e0b38523..d725b9d856 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.scheduler.StageInfo +import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 89d175b06b..579310070c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -25,11 +25,11 @@ import scala.xml.{Elem, Node, Unparsed} import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.ui.scope.RDDOperationGraph import org.apache.spark.util.{Utils, Distribution} +import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { @@ -48,22 +48,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val stageAttemptId = parameterAttempt.toInt val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId)) - val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)" - if (stageDataOption.isEmpty) { - val content = - <div id="no-info"> - <p>No information to display for Stage {stageId} (Attempt {stageAttemptId})</p> - </div> - return UIUtils.headerSparkPage(stageHeader, content, parent) - - } - if (stageDataOption.get.taskData.isEmpty) { + if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) { val content = <div> <h4>Summary Metrics</h4> No tasks have started yet <h4>Tasks</h4> No tasks have started yet </div> - return UIUtils.headerSparkPage(stageHeader, content, parent) + return UIUtils.headerSparkPage( + s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent) } val stageData = stageDataOption.get @@ -454,7 +446,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { maybeAccumulableTable ++ <h4>Tasks</h4> ++ taskTable - UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true) + UIUtils.headerSparkPage( + "Details for Stage %d".format(stageId), content, parent, showVisualization = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 05f94a7507..199f731b92 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -21,8 +21,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo} -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ @@ -32,19 +32,28 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { def render(request: HttpServletRequest): Seq[Node] = { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + val rddId = parameterId.toInt - val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,includeDetails = true) - .getOrElse { - // Rather than crashing, render an "RDD Not Found" page - return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) - } + val storageStatusList = listener.storageStatusList + val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse { + // Rather than crashing, render an "RDD Not Found" page + return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) + } // Worker table - val workerTable = UIUtils.listingTable(workerHeader, workerRow, - rddStorageInfo.dataDistribution.get, id = Some("rdd-storage-by-worker-table")) + val workers = storageStatusList.map((rddId, _)) + val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers, + id = Some("rdd-storage-by-worker-table")) // Block table - val blockTable = UIUtils.listingTable(blockHeader, blockRow, rddStorageInfo.partitions.get, + val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) + val blocks = storageStatusList + .flatMap(_.rddBlocksById(rddId)) + .sortWith(_._1.name < _._1.name) + .map { case (blockId, status) => + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) + } + val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks, id = Some("rdd-storage-by-block-table")) val content = @@ -53,23 +62,23 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { <ul class="unstyled"> <li> <strong>Storage Level:</strong> - {rddStorageInfo.storageLevel} + {rddInfo.storageLevel.description} </li> <li> <strong>Cached Partitions:</strong> - {rddStorageInfo.numCachedPartitions} + {rddInfo.numCachedPartitions} </li> <li> <strong>Total Partitions:</strong> - {rddStorageInfo.numPartitions} + {rddInfo.numPartitions} </li> <li> <strong>Memory Size:</strong> - {Utils.bytesToString(rddStorageInfo.memoryUsed)} + {Utils.bytesToString(rddInfo.memSize)} </li> <li> <strong>Disk Size:</strong> - {Utils.bytesToString(rddStorageInfo.diskUsed)} + {Utils.bytesToString(rddInfo.diskSize)} </li> </ul> </div> @@ -77,19 +86,19 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { <div class="row-fluid"> <div class="span12"> - <h4> Data Distribution on {rddStorageInfo.dataDistribution.size} Executors </h4> + <h4> Data Distribution on {workers.size} Executors </h4> {workerTable} </div> </div> <div class="row-fluid"> <div class="span12"> - <h4> {rddStorageInfo.partitions.size} Partitions </h4> + <h4> {blocks.size} Partitions </h4> {blockTable} </div> </div>; - UIUtils.headerSparkPage("RDD Storage Info for " + rddStorageInfo.name, content, parent) + UIUtils.headerSparkPage("RDD Storage Info for " + rddInfo.name, content, parent) } /** Header fields for the worker table */ @@ -107,32 +116,34 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { "Executors") /** Render an HTML row representing a worker */ - private def workerRow(worker: RDDDataDistribution): Seq[Node] = { + private def workerRow(worker: (Int, StorageStatus)): Seq[Node] = { + val (rddId, status) = worker <tr> - <td>{worker.address}</td> + <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> <td> - {Utils.bytesToString(worker.memoryUsed)} - ({Utils.bytesToString(worker.memoryRemaining)} Remaining) + {Utils.bytesToString(status.memUsedByRdd(rddId))} + ({Utils.bytesToString(status.memRemaining)} Remaining) </td> - <td>{Utils.bytesToString(worker.diskUsed)}</td> + <td>{Utils.bytesToString(status.diskUsedByRdd(rddId))}</td> </tr> } /** Render an HTML row representing a block */ - private def blockRow(row: RDDPartitionInfo): Seq[Node] = { + private def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = { + val (id, block, locations) = row <tr> - <td>{row.blockName}</td> + <td>{id}</td> <td> - {row.storageLevel} + {block.storageLevel.description} </td> - <td sorttable_customkey={row.memoryUsed.toString}> - {Utils.bytesToString(row.memoryUsed)} + <td sorttable_customkey={block.memSize.toString}> + {Utils.bytesToString(block.memSize)} </td> - <td sorttable_customkey={row.diskUsed.toString}> - {Utils.bytesToString(row.diskUsed)} + <td sorttable_customkey={block.diskSize.toString}> + {Utils.bytesToString(block.diskSize)} </td> <td> - {row.executors.map(l => <span>{l}<br/></span>)} + {locations.map(l => <span>{l}<br/></span>)} </td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 07db783c57..59dc6b547c 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 0351749700..045bd78499 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -35,8 +35,6 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag /** * :: DeveloperApi :: * A SparkListener that prepares information to be displayed on the BlockManagerUI. - * - * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { @@ -45,9 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList /** Filter RDD info to include only those with cached partitions */ - def rddInfoList: Seq[RDDInfo] = synchronized { - _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq - } + def rddInfoList: Seq[RDDInfo] = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq /** Update the storage info of the RDDs whose blocks are among the given updated blocks */ private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = { |