aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-28 10:14:30 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:46 +0800
commit7fb574bf666661fdf8a786de779f85efe2f15f0c (patch)
tree7190540550160322d1b275f07de381acf3c30d70
parent4d6dd67fa1f8f031f1ef46a442cec733fa3b1a7a (diff)
downloadspark-7fb574bf666661fdf8a786de779f85efe2f15f0c.tar.gz
spark-7fb574bf666661fdf8a786de779f85efe2f15f0c.tar.bz2
spark-7fb574bf666661fdf8a786de779f85efe2f15f0c.zip
Code clean and remarshal
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala9
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala11
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala18
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala5
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala1
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala1
-rw-r--r--core/src/main/scala/spark/metrics/sink/Sink.scala1
7 files changed, 15 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index e44f5e3168..cc0b2d4295 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -78,6 +78,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
Master.metricsSystem.registerSource(masterInstrumentation)
+ Master.metricsSystem.start()
}
override def postStop() {
@@ -321,22 +322,22 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
}
}
+
+ override def postStop() {
+ Master.metricsSystem.stop()
+ }
}
private[spark] object Master {
private val systemName = "sparkMaster"
private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
-
private val metricsSystem = MetricsSystem.createMetricsSystem("master")
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
-
- metricsSystem.start()
actorSystem.awaitTermination()
- metricsSystem.stop()
}
/** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index 46c90b94d2..61a561c955 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -1,19 +1,12 @@
package spark.deploy.master
-import java.util.{Map, HashMap => JHashMap}
-
-import com.codahale.metrics.{Gauge, Metric}
-
-import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
-import spark.Logging
private[spark] class MasterInstrumentation(val master: Master) extends Source {
- val className = classOf[Master].getName()
- val instrumentationName = "master"
val metricRegistry = new MetricRegistry()
- val sourceName = instrumentationName
+ val sourceName = "master"
metricRegistry.register(MetricRegistry.name("workers","number"),
new Gauge[Int] {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 5ce29cf04c..94c20a98c1 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -1,44 +1,38 @@
package spark.deploy.worker
-import com.codahale.metrics.{Gauge, Metric}
+import com.codahale.metrics.{Gauge, MetricRegistry}
-import java.util.{Map, HashMap => JHashMap}
-
-import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import spark.metrics.source.Source
private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
- val className = classOf[Worker].getName()
-
val sourceName = "worker"
-
val metricRegistry = new MetricRegistry()
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
+ metricRegistry.register(MetricRegistry.name("executor", "number"),
new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
+ metricRegistry.register(MetricRegistry.name("core_used", "number"),
new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
+ metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"),
new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})
// Gauge for cores free of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
+ metricRegistry.register(MetricRegistry.name("core_free", "number"),
new Gauge[Int] {
override def getValue: Int = worker.coresFree
})
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
+ metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"),
new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index a23ccd2692..5bfdc00eaf 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -5,7 +5,6 @@ import scala.collection.mutable
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import java.util.Properties
-//import java.util._
import java.util.concurrent.TimeUnit
import spark.Logging
@@ -20,7 +19,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source]
- var registry = new MetricRegistry()
+ val registry = new MetricRegistry()
registerSources()
registerSinks()
@@ -35,7 +34,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
def registerSource(source: Source) {
sources += source
- registry.register(source.sourceName,source.metricRegistry)
+ registry.register(source.sourceName, source.metricRegistry)
}
def registerSources() {
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index 9cd17556fa..e2e4197d1d 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -23,7 +23,6 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
.convertRatesTo(TimeUnit.SECONDS)
.build()
-
override def start() {
reporter.start(pollPeriod, pollUnit)
}
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index 62e51be0dc..c2d645331c 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -30,7 +30,6 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))
-
override def start() {
reporter.start(pollPeriod, pollUnit)
}
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
index 9fef894fde..3ffdcbdaba 100644
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -2,6 +2,5 @@ package spark.metrics.sink
trait Sink {
def start: Unit
-
def stop: Unit
} \ No newline at end of file