aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-10-04 10:29:22 +0100
committerSean Owen <sowen@cloudera.com>2016-10-04 10:29:22 +0100
commit8e8de0073d71bb00baeb24c612d7841b6274f652 (patch)
tree83892704a5534bdf8d3d9419b1ba276cf00066cd
parent126baa8d32bc0e7bf8b43f9efa84f2728f02347d (diff)
downloadspark-8e8de0073d71bb00baeb24c612d7841b6274f652.tar.gz
spark-8e8de0073d71bb00baeb24c612d7841b6274f652.tar.bz2
spark-8e8de0073d71bb00baeb24c612d7841b6274f652.zip
[SPARK-17671][WEBUI] Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications
## What changes were proposed in this pull request? Return Iterator of applications internally in history server, for consistency and performance. See https://github.com/apache/spark/pull/15248 for some back-story. The code called by and calling HistoryServer.getApplicationList wants an Iterator, but this method materializes an Iterable, which potentially causes a performance problem. It's simpler too to make this internal method also pass through an Iterator. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #15321 from srowen/SPARK-17671.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala38
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala4
-rw-r--r--project/MimaExcludes.scala2
7 files changed, 22 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ba42b4862a..ad7a0972ef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -77,7 +77,7 @@ private[history] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
- def getListing(): Iterable[ApplicationHistoryInfo]
+ def getListing(): Iterator[ApplicationHistoryInfo]
/**
* Returns the Spark UI for a specific application.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c5740e4737..3c2d169f32 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -222,7 +222,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
- override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
+ override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator
override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = {
applications.get(appId)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index b4f5a6114f..95b72224e0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -29,10 +29,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
- val allApps = parent.getApplicationList()
- .filter(_.completed != requestedIncomplete)
- val allAppsSize = allApps.size
-
+ val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
val providerConfig = parent.getProviderConfig()
val content =
<div>
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 735aa43cfc..087c69e648 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -174,12 +174,12 @@ class HistoryServer(
*
* @return List of all known applications.
*/
- def getApplicationList(): Iterable[ApplicationHistoryInfo] = {
+ def getApplicationList(): Iterator[ApplicationHistoryInfo] = {
provider.getListing()
}
def getApplicationInfoList: Iterator[ApplicationInfo] = {
- getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+ getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}
def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index 075b9ba37d..76779290d4 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.status.api.v1
-import java.util.{Arrays, Date, List => JList}
+import java.util.{Date, List => JList}
import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType
@@ -32,33 +32,21 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) {
@DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam,
@QueryParam("limit") limit: Integer)
: Iterator[ApplicationInfo] = {
- val allApps = uiRoot.getApplicationInfoList
- val adjStatus = {
- if (status.isEmpty) {
- Arrays.asList(ApplicationStatus.values(): _*)
- } else {
- status
- }
- }
- val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED)
- val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING)
- val appList = allApps.filter { app =>
+
+ val numApps = Option(limit).map(_.toInt).getOrElse(Integer.MAX_VALUE)
+ val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED)
+ val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING)
+
+ uiRoot.getApplicationInfoList.filter { app =>
val anyRunning = app.attempts.exists(!_.completed)
- // if any attempt is still running, we consider the app to also still be running
- val statusOk = (!anyRunning && includeCompleted) ||
- (anyRunning && includeRunning)
+ // if any attempt is still running, we consider the app to also still be running;
// keep the app if *any* attempts fall in the right time window
- val dateOk = app.attempts.exists { attempt =>
- attempt.startTime.getTime >= minDate.timestamp &&
- attempt.startTime.getTime <= maxDate.timestamp
+ ((!anyRunning && includeCompleted) || (anyRunning && includeRunning)) &&
+ app.attempts.exists { attempt =>
+ val start = attempt.startTime.getTime
+ start >= minDate.timestamp && start <= maxDate.timestamp
}
- statusOk && dateOk
- }
- if (limit != null) {
- appList.take(limit)
- } else {
- appList
- }
+ }.take(numApps)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index ae3f5d9c01..5b316b2f6b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -447,7 +447,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
}
val jobcount = getNumJobs("/jobs")
- assert(!provider.getListing().head.completed)
+ assert(!provider.getListing().next.completed)
listApplications(false) should contain(appId)
@@ -455,7 +455,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
resetSparkContext()
// check the app is now found as completed
eventually(stdTimeout, stdInterval) {
- assert(provider.getListing().head.completed,
+ assert(provider.getListing().next.completed,
s"application never completed, server=$server\n")
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7362041428..163e3f2fde 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -37,6 +37,8 @@ object MimaExcludes {
// Exclude rules for 2.1.x
lazy val v21excludes = v20excludes ++ {
Seq(
+ // [SPARK-17671] Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.deploy.history.HistoryServer.getApplicationList"),
// [SPARK-14743] Improve delegation token handling in secure cluster
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal"),
// [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter