aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-03-25 17:40:00 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-02 13:12:32 -0700
commita6664dcd88a0bdaa8985844cd485d3c4a71eba1b (patch)
tree80ba3c737963592fb6b28c307ccebec859cc8724
parentee2bd70a48e35ab045a440328f97d88055b4dc28 (diff)
downloadspark-a6664dcd88a0bdaa8985844cd485d3c4a71eba1b.tar.gz
spark-a6664dcd88a0bdaa8985844cd485d3c4a71eba1b.tar.bz2
spark-a6664dcd88a0bdaa8985844cd485d3c4a71eba1b.zip
[SPARK-6079] Use index to speed up StatusTracker.getJobIdsForGroup()
`StatusTracker.getJobIdsForGroup()` is implemented via a linear scan over a HashMap rather than using an index, which might be an expensive operation if there are many (e.g. thousands) of retained jobs. This patch adds a new map to `JobProgressListener` in order to speed up these lookups. Author: Josh Rosen <joshrosen@databricks.com> Closes #4830 from JoshRosen/statustracker-job-group-indexing and squashes the following commits: e39c5c7 [Josh Rosen] Address review feedback 6709fb2 [Josh Rosen] Merge remote-tracking branch 'origin/master' into statustracker-job-group-indexing 2c49614 [Josh Rosen] getOrElse 97275a7 [Josh Rosen] Add jobGroup to jobId index to JobProgressListener (cherry picked from commit d44a3362ed8cf3068f8ff233e13851a39da42219) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r--core/src/main/scala/org/apache/spark/SparkStatusTracker.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala31
3 files changed, 51 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index edbdda8a0b..34ee3a48f8 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
- val jobData = jobProgressListener.jobIdToData.valuesIterator
- jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
+ jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 937d95a934..78eae535c6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// These type aliases are public because they're used in the types of public fields:
type JobId = Int
+ type JobGroupId = String
type StageId = Int
type StageAttemptId = Int
type PoolName = String
@@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
+ val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
// Stages:
val pendingStages = new HashMap[StageId, StageInfo]
@@ -119,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
Map(
"jobIdToData" -> jobIdToData.size,
"stageIdToData" -> stageIdToData.size,
- "stageIdToStageInfo" -> stageIdToInfo.size
+ "stageIdToStageInfo" -> stageIdToInfo.size,
+ "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
+ // Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values:
+ "jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
)
}
@@ -140,7 +145,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (jobs.size > retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
jobs.take(toRemove).foreach { job =>
- jobIdToData.remove(job.jobId)
+ // Remove the job's UI data, if it exists
+ jobIdToData.remove(job.jobId).foreach { removedJob =>
+ // A null jobGroupId is used for jobs that are run without a job group
+ val jobGroupId = removedJob.jobGroup.orNull
+ // Remove the job group -> job mapping entry, if it exists
+ jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
+ jobsInGroup.remove(job.jobId)
+ // If this was the last job in this job group, remove the map entry for the job group
+ if (jobsInGroup.isEmpty) {
+ jobGroupToJobIds.remove(jobGroupId)
+ }
+ }
+ }
}
jobs.trimStart(toRemove)
}
@@ -158,6 +175,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
+ // A null jobGroupId is used for jobs that are run without a job group
+ jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 730a4b54f5..c0c28cb60e 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui.jobs
+import java.util.Properties
+
import org.scalatest.FunSuite
import org.scalatest.Matchers
@@ -44,11 +46,19 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
SparkListenerStageCompleted(stageInfo)
}
- private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
+ private def createJobStartEvent(
+ jobId: Int,
+ stageIds: Seq[Int],
+ jobGroup: Option[String] = None): SparkListenerJobStart = {
val stageInfos = stageIds.map { stageId =>
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
}
- SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
+ val properties: Option[Properties] = jobGroup.map { groupId =>
+ val props = new Properties()
+ props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+ props
+ }
+ SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull)
}
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
@@ -110,6 +120,23 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.stageIdToActiveJobIds.size should be (0)
}
+ test("test clearing of jobGroupToJobIds") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedJobs", 5.toString)
+ val listener = new JobProgressListener(conf)
+
+ // Run 50 jobs, each with one stage
+ for (jobId <- 0 to 50) {
+ listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
+ listener.onStageSubmitted(createStageStartEvent(0))
+ listener.onStageCompleted(createStageEndEvent(0, failed = false))
+ listener.onJobEnd(createJobEndEvent(jobId, false))
+ }
+ assertActiveJobsStateIsEmpty(listener)
+ // This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
+ listener.jobGroupToJobIds.size should be (5)
+ }
+
test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)