aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-31 12:07:19 -0700
committerAndrew Or <andrew@databricks.com>2016-03-31 12:07:19 -0700
commit0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4 (patch)
treecf2edfa11e4e4dc5dd9c3b95bbfdef6464e62a22
parent4d93b653f7294698526674950d3dc303691260f8 (diff)
downloadspark-0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4.tar.gz
spark-0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4.tar.bz2
spark-0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4.zip
[SPARK-14069][SQL] Improve SparkStatusTracker to also track executor information
## What changes were proposed in this pull request? Track executor information like host and port, cache size, running tasks. TODO: tests ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #11888 from cloud-fan/status-tracker.
-rw-r--r--core/src/main/java/org/apache/spark/SparkExecutorInfo.java33
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/SparkStatusTracker.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/StatusAPIImpl.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala5
6 files changed, 80 insertions, 16 deletions
diff --git a/core/src/main/java/org/apache/spark/SparkExecutorInfo.java b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
new file mode 100644
index 0000000000..dc3e826475
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+
+/**
+ * Exposes information about Spark Executors.
+ *
+ * This interface is not designed to be implemented outside of Spark. We may add additional methods
+ * which may break binary compatibility with outside implementations.
+ */
+public interface SparkExecutorInfo extends Serializable {
+ String host();
+ int port();
+ long cacheSize();
+ int numRunningTasks();
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dcb41f3a40..d7cb253d69 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
- environment: Map[String, String] = Map()) =
- {
+ environment: Map[String, String] = Map()) = {
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
}
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 34ee3a48f8..52c4656c27 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
/**
* Low-level status reporting APIs for monitoring job and stage progress.
*
@@ -104,4 +106,22 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
}
}
}
+
+ /**
+ * Returns information of all known executors, including host, port, cacheSize, numRunningTasks.
+ */
+ def getExecutorInfos: Array[SparkExecutorInfo] = {
+ val executorIdToRunningTasks: Map[String, Int] =
+ sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
+
+ sc.getExecutorStorageStatus.map { status =>
+ val bmId = status.blockManagerId
+ new SparkExecutorInfoImpl(
+ bmId.host,
+ bmId.port,
+ status.cacheSize,
+ executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
+ )
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
index e5c7c8d0db..c1f24a6377 100644
--- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
+++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
@@ -18,18 +18,25 @@
package org.apache.spark
private class SparkJobInfoImpl (
- val jobId: Int,
- val stageIds: Array[Int],
- val status: JobExecutionStatus)
- extends SparkJobInfo
+ val jobId: Int,
+ val stageIds: Array[Int],
+ val status: JobExecutionStatus)
+ extends SparkJobInfo
private class SparkStageInfoImpl(
- val stageId: Int,
- val currentAttemptId: Int,
- val submissionTime: Long,
- val name: String,
- val numTasks: Int,
- val numActiveTasks: Int,
- val numCompletedTasks: Int,
- val numFailedTasks: Int)
- extends SparkStageInfo
+ val stageId: Int,
+ val currentAttemptId: Int,
+ val submissionTime: Long,
+ val name: String,
+ val numTasks: Int,
+ val numActiveTasks: Int,
+ val numCompletedTasks: Int,
+ val numFailedTasks: Int)
+ extends SparkStageInfo
+
+private class SparkExecutorInfoImpl(
+ val host: String,
+ val port: Int,
+ val cacheSize: Long,
+ val numRunningTasks: Int)
+ extends SparkExecutorInfo
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f7790fccc6..daed2ff50e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl(
// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]
+ def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
+
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
protected val executorsByHost = new HashMap[String, HashSet[String]]
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 199a5fc270..fb9941bbd9 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -175,7 +175,10 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def memRemaining: Long = maxMem - memUsed
/** Return the memory used by this block manager. */
- def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+ def memUsed: Long = _nonRddStorageInfo._1 + cacheSize
+
+ /** Return the memory used by caching RDDs */
+ def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
/** Return the disk space used by this block manager. */
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum