aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-07-02 11:28:32 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:47 +0800
commit1daff54b2ed92d0bcee7030d7d3ab5c274f80d2f (patch)
treef2d740deda353efa0d3fc3f5d441e75c5795cbda
parent5f8802c1fb106cc04c30b6aca0a6ce98fa5c0e15 (diff)
downloadspark-1daff54b2ed92d0bcee7030d7d3ab5c274f80d2f.tar.gz
spark-1daff54b2ed92d0bcee7030d7d3ab5c274f80d2f.tar.bz2
spark-1daff54b2ed92d0bcee7030d7d3ab5c274f80d2f.zip
Change Executor MetricsSystem initialize code to SparkEnv
-rw-r--r--core/src/main/scala/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/spark/executor/ExecutorInstrumentation.scala10
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala10
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala12
6 files changed, 22 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1255d0c72e..f1d9d5e442 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -275,12 +275,10 @@ class SparkContext(
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
- val metricsSystem = MetricsSystem.createMetricsSystem("driver")
def initDriverMetrics() = {
- metricsSystem.registerSource(dagSchedulerSource)
- metricsSystem.registerSource(blockManagerSource)
- metricsSystem.start()
+ SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+ SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics()
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 125dc55bd8..7b3dc69348 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -54,6 +54,7 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
+ val metricsSystem: metricsSystem,
// To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort
@@ -69,6 +70,7 @@ class SparkEnv (
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
+ metricsSystem.stop()
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
@@ -185,6 +187,12 @@ object SparkEnv extends Logging {
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ val metricsSystem = if (isDriver) {
+ MetricsSystem.createMetricsSystem("driver")
+ } else {
+ MetricsSystem.createMetricsSystem("executor")
+ }
+ metricsSystem.start()
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
@@ -215,6 +223,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
+ metricsSystem,
None)
}
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 2e81151882..7179ed84a8 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -86,10 +86,14 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
}
)
+
+ val executorInstrumentation = new ExecutorInstrumentation(this)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
- SparkEnv.set(env)
+ SparkEnv.set(env)
+ env.metricsSystem.registerSource(executorInstrumentation)
+
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
index 80aadb66b0..ebbcbee742 100644
--- a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
+++ b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
@@ -4,32 +4,32 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
-class ExecutorInstrumentation(val executor: Option[Executor]) extends Source{
+class ExecutorInstrumentation(val executor: Executor) extends Source{
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"),
new Gauge[Int] {
- override def getValue: Int = executor.map(_.threadPool.getActiveCount()).getOrElse(0)
+ override def getValue: Int = executor.threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"),
new Gauge[Long] {
- override def getValue: Long = executor.map(_.threadPool.getCompletedTaskCount()).getOrElse(0)
+ override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})
// Gauge for executor thread pool's current number of threads
metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"),
new Gauge[Int] {
- override def getValue: Int = executor.map(_.threadPool.getPoolSize()).getOrElse(0)
+ override def getValue: Int = executor.threadPool.getPoolSize()
})
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"),
new Gauge[Int] {
- override def getValue: Int = executor.map(_.threadPool.getMaximumPoolSize()).getOrElse(0)
+ override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 8b6ab0c391..4961c42fad 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -24,7 +24,6 @@ import spark.TaskState.TaskState
import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
-import spark.metrics.MetricsSystem
private[spark] class MesosExecutorBackend
extends MesosExecutor
@@ -33,9 +32,6 @@ private[spark] class MesosExecutorBackend
var executor: Executor = null
var driver: ExecutorDriver = null
-
- val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
- MesosExecutorBackend.metricsSystem.start()
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
@@ -83,17 +79,13 @@ private[spark] class MesosExecutorBackend
override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
- override def shutdown(d: ExecutorDriver) {
- MesosExecutorBackend.metricsSystem.stop()
- }
+ override def shutdown(d: ExecutorDriver) {}
}
/**
* Entry point for Mesos executor.
*/
private[spark] object MesosExecutorBackend {
- private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
-
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 6ef74cd2ff..f4003da732 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -24,7 +24,6 @@ import spark.util.AkkaUtils
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import spark.metrics.MetricsSystem
import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
@@ -46,8 +45,6 @@ private[spark] class StandaloneExecutorBackend(
var executor: Executor = null
var driver: ActorRef = null
-
- val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
@@ -55,9 +52,6 @@ private[spark] class StandaloneExecutorBackend(
driver ! RegisterExecutor(executorId, hostPort, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(driver) // Doesn't work with remote actors, but useful for testing
-
- StandaloneExecutorBackend.metricsSystem.registerSource(executorInstrumentation)
- StandaloneExecutorBackend.metricsSystem.start()
}
override def receive = {
@@ -87,15 +81,9 @@ private[spark] class StandaloneExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
-
- override def postStop() {
- StandaloneExecutorBackend.metricsSystem.stop()
- }
}
private[spark] object StandaloneExecutorBackend {
- private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
-
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
}