From 7de0ea4d426e9964dee8173c7dca19601ad0f4b1 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 22 Oct 2013 10:04:41 -0700 Subject: Response to code review and adding some more tests --- .../org/apache/spark/scheduler/DAGScheduler.scala | 6 +- .../org/apache/spark/scheduler/JobLogger.scala | 4 +- .../org/apache/spark/scheduler/SparkListener.scala | 2 +- .../scala/org/apache/spark/scheduler/Stage.scala | 3 +- .../org/apache/spark/scheduler/StageInfo.scala | 5 +- .../scala/org/apache/spark/ui/jobs/IndexPage.scala | 2 +- .../apache/spark/ui/jobs/JobProgressListener.scala | 72 +++++++++++----------- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 12 ++-- .../org/apache/spark/ui/jobs/StageTable.scala | 14 ++--- .../spark/scheduler/SparkListenerSuite.scala | 61 ++++++++++++++---- 10 files changed, 107 insertions(+), 74 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b412ee2e2b..ff68255583 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -212,7 +212,7 @@ class DAGScheduler( val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) stageIdToStage(id) = stage - stageToInfos(stage) = StageInfo(stage) + stageToInfos(stage) = new StageInfo(stage) stage } @@ -610,7 +610,6 @@ class DAGScheduler( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stage.submissionTime = Some(System.currentTimeMillis()) stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) - } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -637,7 +636,6 @@ class DAGScheduler( case _ => "Unkown" } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) - stage.completionTime = Some(System.currentTimeMillis) stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) listenerBus.post(StageCompleted(stageToInfos(stage))) running -= stage @@ -808,7 +806,7 @@ class DAGScheduler( */ private def abortStage(failedStage: Stage, reason: String) { val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq - failedStage.completionTime = Some(System.currentTimeMillis()) + stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis()) for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) val error = new SparkException("Job aborted: " + reason) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index e16436a9c7..12b0d74fb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -212,8 +212,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } override def onStageCompleted(stageCompleted: StageCompleted) { - stageLogInfo(stageCompleted.stage.stage.id, "STAGE_ID=%d STATUS=COMPLETED".format( - stageCompleted.stage.stage.id)) + stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( + stageCompleted.stage.stageId)) } override def onTaskStart(taskStart: SparkListenerTaskStart) { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 1391993147..324cd639b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -111,7 +111,7 @@ object StatsReportListener extends Logging { val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%" def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = { - Distribution(stage.stage.taskInfos.flatMap{ + Distribution(stage.stage.taskInfos.flatMap { case ((info,metric)) => getMetric(info, metric)}) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index b320d8aa65..d06633d7d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -51,9 +51,8 @@ private[spark] class Stage( val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 - /** When first task was submitted to scheduler. */ + /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None - var completionTime: Option[Long] = None private var nextAttemptId = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 52002f48e3..37bb0891f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -21,14 +21,15 @@ import scala.collection._ import org.apache.spark.executor.TaskMetrics -case class StageInfo( +class StageInfo( stage: Stage, val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]() ) { val stageId = stage.id + /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None var completionTime: Option[Long] = None - val rddName = stage.rdd.toString + val rddName = stage.rdd.name val name = stage.name val numPartitions = stage.numPartitions val numTasks = stage.numTasks diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index b39c0e9769..ca5a28625b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -38,7 +38,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { val now = System.currentTimeMillis() var activeTime = 0L - for (tasks <- listener.stageToTasksActive.values; t <- tasks) { + for (tasks <- listener.stageIdToTasksActive.values; t <- tasks) { activeTime += t.timeRunning(now) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index c048e9b1e8..9bb8a13ec4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -36,8 +36,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt val DEFAULT_POOL_NAME = "default" - val stageToPool = new HashMap[Int, String]() - val stageToDescription = new HashMap[Int, String]() + val stageIdToPool = new HashMap[Int, String]() + val stageIdToDescription = new HashMap[Int, String]() val poolToActiveStages = new HashMap[String, HashSet[StageInfo]]() val activeStages = HashSet[StageInfo]() @@ -49,20 +49,20 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList var totalShuffleRead = 0L var totalShuffleWrite = 0L - val stageToTime = HashMap[Int, Long]() - val stageToShuffleRead = HashMap[Int, Long]() - val stageToShuffleWrite = HashMap[Int, Long]() - val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]() - val stageToTasksComplete = HashMap[Int, Int]() - val stageToTasksFailed = HashMap[Int, Int]() - val stageToTaskInfos = + val stageIdToTime = HashMap[Int, Long]() + val stageIdToShuffleRead = HashMap[Int, Long]() + val stageIdToShuffleWrite = HashMap[Int, Long]() + val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]() + val stageIdToTasksComplete = HashMap[Int, Int]() + val stageIdToTasksFailed = HashMap[Int, Int]() + val stageIdToTaskInfos = HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() override def onJobStart(jobStart: SparkListenerJobStart) {} override def onStageCompleted(stageCompleted: StageCompleted) = synchronized { val stage = stageCompleted.stage - poolToActiveStages(stageToPool(stage.stageId)) -= stage + poolToActiveStages(stageIdToPool(stage.stageId)) -= stage activeStages -= stage completedStages += stage trimIfNecessary(completedStages) @@ -73,15 +73,15 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList if (stages.size > RETAINED_STAGES) { val toRemove = RETAINED_STAGES / 10 stages.takeRight(toRemove).foreach( s => { - stageToTaskInfos.remove(s.stageId) - stageToTime.remove(s.stageId) - stageToShuffleRead.remove(s.stageId) - stageToShuffleWrite.remove(s.stageId) - stageToTasksActive.remove(s.stageId) - stageToTasksComplete.remove(s.stageId) - stageToTasksFailed.remove(s.stageId) - stageToPool.remove(s.stageId) - if (stageToDescription.contains(s.stageId)) {stageToDescription.remove(s.stageId)} + stageIdToTaskInfos.remove(s.stageId) + stageIdToTime.remove(s.stageId) + stageIdToShuffleRead.remove(s.stageId) + stageIdToShuffleWrite.remove(s.stageId) + stageIdToTasksActive.remove(s.stageId) + stageIdToTasksComplete.remove(s.stageId) + stageIdToTasksFailed.remove(s.stageId) + stageIdToPool.remove(s.stageId) + if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)} }) stages.trimEnd(toRemove) } @@ -95,12 +95,12 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) - stageToPool(stage.stageId) = poolName + stageIdToPool(stage.stageId) = poolName val description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - description.map(d => stageToDescription(stage.stageId) = d) + description.map(d => stageIdToDescription(stage.stageId) = d) val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[StageInfo]()) stages += stage @@ -108,50 +108,50 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val sid = taskStart.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive += taskStart.taskInfo - val taskList = stageToTaskInfos.getOrElse( + val taskList = stageIdToTaskInfos.getOrElse( sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) - stageToTaskInfos(sid) = taskList + stageIdToTaskInfos(sid) = taskList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val sid = taskEnd.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => - stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 (Some(e), e.metrics) case _ => - stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 (None, Option(taskEnd.taskMetrics)) } - stageToTime.getOrElseUpdate(sid, 0L) + stageIdToTime.getOrElseUpdate(sid, 0L) val time = metrics.map(m => m.executorRunTime).getOrElse(0) - stageToTime(sid) += time + stageIdToTime(sid) += time totalTime += time - stageToShuffleRead.getOrElseUpdate(sid, 0L) + stageIdToShuffleRead.getOrElseUpdate(sid, 0L) val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => s.remoteBytesRead).getOrElse(0L) - stageToShuffleRead(sid) += shuffleRead + stageIdToShuffleRead(sid) += shuffleRead totalShuffleRead += shuffleRead - stageToShuffleWrite.getOrElseUpdate(sid, 0L) + stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => s.shuffleBytesWritten).getOrElse(0L) - stageToShuffleWrite(sid) += shuffleWrite + stageIdToShuffleWrite(sid) += shuffleWrite totalShuffleWrite += shuffleWrite - val taskList = stageToTaskInfos.getOrElse( + val taskList = stageIdToTaskInfos.getOrElse( sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList -= ((taskEnd.taskInfo, None, None)) taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - stageToTaskInfos(sid) = taskList + stageIdToTaskInfos(sid) = taskList } override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { @@ -164,7 +164,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageInfo = activeStages.filter(s => s.stageId == stage.id).headOption stageInfo.foreach {s => activeStages -= s - poolToActiveStages(stageToPool(stage.id)) -= s + poolToActiveStages(stageIdToPool(stage.id)) -= s failedStages += s trimIfNecessary(failedStages) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index b7c81d091c..0dd57235e3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -40,7 +40,7 @@ private[spark] class StagePage(parent: JobProgressUI) { val stageId = request.getParameter("id").toInt val now = System.currentTimeMillis() - if (!listener.stageToTaskInfos.contains(stageId)) { + if (!listener.stageIdToTaskInfos.contains(stageId)) { val content =

Summary Metrics

No tasks have started yet @@ -49,23 +49,23 @@ private[spark] class StagePage(parent: JobProgressUI) { return headerSparkPage(content, parent.sc, "Details for Stage %s".format(stageId), Stages) } - val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime) + val tasks = listener.stageIdToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime) val numCompleted = tasks.count(_._1.finished) - val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L) + val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L) val hasShuffleRead = shuffleReadBytes > 0 - val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L) + val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) val hasShuffleWrite = shuffleWriteBytes > 0 var activeTime = 0L - listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) + listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) val summary =
  • CPU time: - {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)} + {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
  • {if (hasShuffleRead)
  • diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index b08cbe8d22..2c29ea7d44 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -79,28 +79,28 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr case None => "Unknown" } - val shuffleRead = listener.stageToShuffleRead.getOrElse(s.stageId, 0L) match { + val shuffleRead = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L) match { case 0 => "" case b => Utils.bytesToString(b) } - val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.stageId, 0L) match { + val shuffleWrite = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L) match { case 0 => "" case b => Utils.bytesToString(b) } - val startedTasks = listener.stageToTasksActive.getOrElse(s.stageId, HashSet[TaskInfo]()).size - val completedTasks = listener.stageToTasksComplete.getOrElse(s.stageId, 0) - val failedTasks = listener.stageToTasksFailed.getOrElse(s.stageId, 0) match { + val startedTasks = listener.stageIdToTasksActive.getOrElse(s.stageId, HashSet[TaskInfo]()).size + val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0) + val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match { case f if f > 0 => "(%s failed)".format(f) case _ => "" } val totalTasks = s.numTasks - val poolName = listener.stageToPool.get(s.stageId) + val poolName = listener.stageIdToPool.get(s.stageId) val nameLink = {s.toString} - val description = listener.stageToDescription.get(s.stageId) + val description = listener.stageIdToDescription.get(s.stageId) .map(d =>
    {d}
    {nameLink}
    ).getOrElse(nameLink) val finishTime = s.completionTime.getOrElse(System.currentTimeMillis()) val duration = s.submissionTime.map(t => finishTime - t) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 1fe4e19e89..5ce92bc8d5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -17,16 +17,57 @@ package org.apache.spark.scheduler -import org.scalatest.FunSuite -import org.apache.spark.{SparkContext, LocalSparkContext} +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.spark.{TaskContext, SparkContext, LocalSparkContext} import scala.collection.mutable import org.scalatest.matchers.ShouldMatchers import org.apache.spark.SparkContext._ -class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { +class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers + with BeforeAndAfter { + /** Length of time to wait while draining listener events. */ + val WAIT_TIMEOUT_MILLIS = 10000 + + before { + sc = new SparkContext("local", "DAGSchedulerSuite") + } + + test("basic creation of StageInfo") { + val listener = new SaveStageInfo + sc.addSparkListener(listener) + val rdd1 = sc.parallelize(1 to 100, 4) + val rdd2 = rdd1.map(x => x.toString) + rdd2.setName("Target RDD") + rdd2.count + + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + + listener.stageInfos.size should be {1} + val first = listener.stageInfos.head + first.rddName should be {"Target RDD"} + first.numTasks should be {4} + first.numPartitions should be {4} + first.submissionTime should be ('defined) + first.completionTime should be ('defined) + first.taskInfos.length should be {4} + } + + test("StageInfo with fewer tasks than partitions") { + val listener = new SaveStageInfo + sc.addSparkListener(listener) + val rdd1 = sc.parallelize(1 to 100, 4) + val rdd2 = rdd1.map(x => x.toString) + sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true) + + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + + listener.stageInfos.size should be {1} + val first = listener.stageInfos.head + first.numTasks should be {2} + first.numPartitions should be {4} + } test("local metrics") { - sc = new SparkContext("local[4]", "test") val listener = new SaveStageInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) @@ -39,7 +80,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count() - val WAIT_TIMEOUT_MILLIS = 10000 assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be (1) @@ -64,7 +104,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc checkNonZeroAvg( stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime") - if (stageInfo.stage.rdd.name == d4.name) { + if (stageInfo.rddName == d4.name) { checkNonZeroAvg( stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime") @@ -72,11 +112,11 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc stageInfo.taskInfos.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) - if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) { + if (stageInfo.rddName == d2.name || stageInfo.rddName == d3.name) { taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } - if (stageInfo.stage.rdd.name == d4.name) { + if (stageInfo.rddName == d4.name) { taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) @@ -93,11 +133,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc assert(m.sum / m.size.toDouble > 0.0, msg) } - def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = { - val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name} - !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty - } - class SaveStageInfo extends SparkListener { val stageInfos = mutable.Buffer[StageInfo]() override def onStageCompleted(stage: StageCompleted) { -- cgit v1.2.3