diff options
6 files changed, 80 insertions, 16 deletions
diff --git a/core/src/main/java/org/apache/spark/SparkExecutorInfo.java b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java new file mode 100644 index 0000000000..dc3e826475 --- /dev/null +++ b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; + +/** + * Exposes information about Spark Executors. + * + * This interface is not designed to be implemented outside of Spark. We may add additional methods + * which may break binary compatibility with outside implementations. + */ +public interface SparkExecutorInfo extends Serializable { + String host(); + int port(); + long cacheSize(); + int numRunningTasks(); +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dcb41f3a40..d7cb253d69 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli appName: String, sparkHome: String = null, jars: Seq[String] = Nil, - environment: Map[String, String] = Map()) = - { + environment: Map[String, String] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) } diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 34ee3a48f8..52c4656c27 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.scheduler.TaskSchedulerImpl + /** * Low-level status reporting APIs for monitoring job and stage progress. * @@ -104,4 +106,22 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { } } } + + /** + * Returns information of all known executors, including host, port, cacheSize, numRunningTasks. + */ + def getExecutorInfos: Array[SparkExecutorInfo] = { + val executorIdToRunningTasks: Map[String, Int] = + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + + sc.getExecutorStorageStatus.map { status => + val bmId = status.blockManagerId + new SparkExecutorInfoImpl( + bmId.host, + bmId.port, + status.cacheSize, + executorIdToRunningTasks.getOrElse(bmId.executorId, 0) + ) + } + } } diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index e5c7c8d0db..c1f24a6377 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -18,18 +18,25 @@ package org.apache.spark private class SparkJobInfoImpl ( - val jobId: Int, - val stageIds: Array[Int], - val status: JobExecutionStatus) - extends SparkJobInfo + val jobId: Int, + val stageIds: Array[Int], + val status: JobExecutionStatus) + extends SparkJobInfo private class SparkStageInfoImpl( - val stageId: Int, - val currentAttemptId: Int, - val submissionTime: Long, - val name: String, - val numTasks: Int, - val numActiveTasks: Int, - val numCompletedTasks: Int, - val numFailedTasks: Int) - extends SparkStageInfo + val stageId: Int, + val currentAttemptId: Int, + val submissionTime: Long, + val name: String, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numFailedTasks: Int) + extends SparkStageInfo + +private class SparkExecutorInfoImpl( + val host: String, + val port: Int, + val cacheSize: Long, + val numRunningTasks: Int) + extends SparkExecutorInfo diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f7790fccc6..daed2ff50e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl( // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] + def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap + // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host protected val executorsByHost = new HashMap[String, HashSet[String]] diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 199a5fc270..fb9941bbd9 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -175,7 +175,10 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { def memRemaining: Long = maxMem - memUsed /** Return the memory used by this block manager. */ - def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + def memUsed: Long = _nonRddStorageInfo._1 + cacheSize + + /** Return the memory used by caching RDDs */ + def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum /** Return the disk space used by this block manager. */ def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum |