aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-07-01 21:37:21 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:47 +0800
commit7d2eada451686824bd467641bf1763e82011f2a6 (patch)
tree790aa61e8e252dab567e17ae7a40f2a81083333a
parente9ac88754d4c5d58aedd4de8768787300b15eada (diff)
downloadspark-7d2eada451686824bd467641bf1763e82011f2a6.tar.gz
spark-7d2eada451686824bd467641bf1763e82011f2a6.tar.bz2
spark-7d2eada451686824bd467641bf1763e82011f2a6.zip
Add metrics source of DAGScheduler and blockManager
Conflicts: core/src/main/scala/spark/SparkContext.scala core/src/main/scala/spark/SparkEnv.scala
-rw-r--r--core/src/main/scala/spark/SparkContext.scala13
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala31
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSource.scala35
5 files changed, 84 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 24ba605646..a6128a9f30 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -60,11 +60,11 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI}
@@ -270,6 +270,15 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
+ val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
+ val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+ def initDriverMetrics() = {
+ SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+ SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
+ }
+
+ initDriverMetrics()
+
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index f2bdc11bdb..d34dafecc5 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -30,6 +30,7 @@ import spark.network.ConnectionManager
import spark.serializer.{Serializer, SerializerManager}
import spark.util.AkkaUtils
import spark.api.python.PythonWorkerFactory
+import spark.metrics._
/**
@@ -53,6 +54,7 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
+ val metricsSystem: MetricsSystem,
// To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort
@@ -184,6 +186,9 @@ object SparkEnv extends Logging {
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ val metricsSystem = MetricsSystem.createMetricsSystem("driver")
+ metricsSystem.start()
+
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
@@ -213,6 +218,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
+ metricsSystem,
None)
}
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9b45fc2938..781e49bdec 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -30,7 +30,7 @@ import spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialRe
import spark.scheduler.cluster.TaskInfo
import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-
+import spark.metrics.MetricsSystem
/**
* A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
* each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
@@ -126,7 +126,6 @@ class DAGScheduler(
val resultStageToJob = new HashMap[Stage, ActiveJob]
val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
-
// Start a thread to run the DAGScheduler event loop
def start() {
new Thread("DAGScheduler") {
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
new file mode 100644
index 0000000000..57aa74512c
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -0,0 +1,31 @@
+package spark.scheduler
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "DAGScheduler"
+
+
+ metricRegistry.register(MetricRegistry.name("stage","failedStage"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.failed.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage","runningStage"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.running.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage","waitingStage"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.waiting.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("job","allJobs"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.nextRunId.get()
+ })
+
+ metricRegistry.register(MetricRegistry.name("job","ActiveJobs"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.activeJobs.size
+ })
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
new file mode 100644
index 0000000000..c0ce9259c8
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -0,0 +1,35 @@
+package spark.storage
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+import spark.storage._
+
+private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "BlockManager"
+
+ metricRegistry.register(MetricRegistry.name("memory","maxMem"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ maxMem
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("memory","remainingMem"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ remainingMem
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("disk","diskSpaceUsed"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
+ diskSpaceUsed
+ }
+ })
+}