diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-03-31 12:07:19 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-03-31 12:07:19 -0700 |
commit | 0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4 (patch) | |
tree | cf2edfa11e4e4dc5dd9c3b95bbfdef6464e62a22 /core/src/main/scala | |
parent | 4d93b653f7294698526674950d3dc303691260f8 (diff) | |
download | spark-0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4.tar.gz spark-0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4.tar.bz2 spark-0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4.zip |
[SPARK-14069][SQL] Improve SparkStatusTracker to also track executor information
## What changes were proposed in this pull request?
Track executor information like host and port, cache size, running tasks.
TODO: tests
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes #11888 from cloud-fan/status-tracker.
Diffstat (limited to 'core/src/main/scala')
5 files changed, 47 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dcb41f3a40..d7cb253d69 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli appName: String, sparkHome: String = null, jars: Seq[String] = Nil, - environment: Map[String, String] = Map()) = - { + environment: Map[String, String] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) } diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 34ee3a48f8..52c4656c27 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.scheduler.TaskSchedulerImpl + /** * Low-level status reporting APIs for monitoring job and stage progress. * @@ -104,4 +106,22 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { } } } + + /** + * Returns information of all known executors, including host, port, cacheSize, numRunningTasks. + */ + def getExecutorInfos: Array[SparkExecutorInfo] = { + val executorIdToRunningTasks: Map[String, Int] = + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + + sc.getExecutorStorageStatus.map { status => + val bmId = status.blockManagerId + new SparkExecutorInfoImpl( + bmId.host, + bmId.port, + status.cacheSize, + executorIdToRunningTasks.getOrElse(bmId.executorId, 0) + ) + } + } } diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index e5c7c8d0db..c1f24a6377 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -18,18 +18,25 @@ package org.apache.spark private class SparkJobInfoImpl ( - val jobId: Int, - val stageIds: Array[Int], - val status: JobExecutionStatus) - extends SparkJobInfo + val jobId: Int, + val stageIds: Array[Int], + val status: JobExecutionStatus) + extends SparkJobInfo private class SparkStageInfoImpl( - val stageId: Int, - val currentAttemptId: Int, - val submissionTime: Long, - val name: String, - val numTasks: Int, - val numActiveTasks: Int, - val numCompletedTasks: Int, - val numFailedTasks: Int) - extends SparkStageInfo + val stageId: Int, + val currentAttemptId: Int, + val submissionTime: Long, + val name: String, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numFailedTasks: Int) + extends SparkStageInfo + +private class SparkExecutorInfoImpl( + val host: String, + val port: Int, + val cacheSize: Long, + val numRunningTasks: Int) + extends SparkExecutorInfo diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f7790fccc6..daed2ff50e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl( // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] + def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap + // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host protected val executorsByHost = new HashMap[String, HashSet[String]] diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 199a5fc270..fb9941bbd9 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -175,7 +175,10 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { def memRemaining: Long = maxMem - memUsed /** Return the memory used by this block manager. */ - def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + def memUsed: Long = _nonRddStorageInfo._1 + cacheSize + + /** Return the memory used by caching RDDs */ + def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum /** Return the disk space used by this block manager. */ def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum |