aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorYangyang Liu <yangyangliu@fb.com>2016-07-12 10:13:58 -0700
committerReynold Xin <rxin@databricks.com>2016-07-12 10:13:58 -0700
commit68df47aca55e99406b7b67ef3d4b1008abf1b8b6 (patch)
tree3f55af3bf21e5079e4b33e1f7e0e523e1b2e1fc5 /core
parentd513c99c19e229f72d03006e251725a43c13fefd (diff)
downloadspark-68df47aca55e99406b7b67ef3d4b1008abf1b8b6.tar.gz
spark-68df47aca55e99406b7b67ef3d4b1008abf1b8b6.tar.bz2
spark-68df47aca55e99406b7b67ef3d4b1008abf1b8b6.zip
[SPARK-16405] Add metrics and source for external shuffle service
## What changes were proposed in this pull request? Since externalShuffleService is essential for spark, better monitoring for shuffle service is necessary. In order to do so, we added various metrics in shuffle service and imported into ExternalShuffleServiceSource for metric system. Metrics added in shuffle service: * registeredExecutorsSize * openBlockRequestLatencyMillis * registerExecutorRequestLatencyMillis * blockTransferRateBytes JIRA Issue: https://issues.apache.org/jira/browse/SPARK-16405 ## How was this patch tested? Some test cases are added to verify metrics as expected in metric system. Those unit test cases are shown in `ExternalShuffleBlockHandlerSuite ` Author: Yangyang Liu <yangyangliu@fb.com> Closes #14080 from lovexi/yangyang-metrics.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala37
2 files changed, 45 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index adc0de1e91..37a19a495b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslServerBootstrap
@@ -41,6 +42,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
+ protected val masterMetricsSystem =
+ MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
@@ -54,6 +57,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private var server: TransportServer = _
+ private val shuffleServiceSource = new ExternalShuffleServiceSource(blockHandler)
+
/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
new ExternalShuffleBlockHandler(conf, null)
@@ -77,6 +82,9 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
Nil
}
server = transportContext.createServer(port, bootstraps.asJava)
+
+ masterMetricsSystem.registerSource(shuffleServiceSource)
+ masterMetricsSystem.start()
}
/** Clean up all shuffle files associated with an application that has exited. */
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala
new file mode 100644
index 0000000000..e917679c83
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import javax.annotation.concurrent.ThreadSafe
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+
+/**
+ * Provides metrics source for external shuffle service
+ */
+@ThreadSafe
+private class ExternalShuffleServiceSource
+(blockHandler: ExternalShuffleBlockHandler) extends Source {
+ override val metricRegistry = new MetricRegistry()
+ override val sourceName = "shuffleService"
+
+ metricRegistry.registerAll(blockHandler.getAllMetrics)
+}