diff options
author | Sandeep Singh <sandeep@techaddict.me> | 2016-11-01 13:18:11 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-11-01 13:18:11 +0000 |
commit | ec6f479bb1d14c9eb45e0418353007be0416e4c5 (patch) | |
tree | 0cec54fd0e9a9bfa1328e53f46f377086d05869b /mesos | |
parent | e34b4e12673fb76c92f661d7c03527410857a0f8 (diff) | |
download | spark-ec6f479bb1d14c9eb45e0418353007be0416e4c5.tar.gz spark-ec6f479bb1d14c9eb45e0418353007be0416e4c5.tar.bz2 spark-ec6f479bb1d14c9eb45e0418353007be0416e4c5.zip |
[SPARK-16881][MESOS] Migrate Mesos configs to use ConfigEntry
## What changes were proposed in this pull request?
Migrate Mesos configs to use ConfigEntry
## How was this patch tested?
Jenkins Tests
Author: Sandeep Singh <sandeep@techaddict.me>
Closes #15654 from techaddict/SPARK-16881.
Diffstat (limited to 'mesos')
4 files changed, 68 insertions, 6 deletions
diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 73b6ca3844..7d6693b4cd 100644 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.mesos import java.util.concurrent.CountDownLatch import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.mesos.ui.MesosClusterUI import org.apache.spark.deploy.rest.mesos.MesosRestServer import org.apache.spark.internal.Logging @@ -51,7 +52,7 @@ private[mesos] class MesosClusterDispatcher( extends Logging { private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host) - private val recoveryMode = conf.get("spark.deploy.recoveryMode", "NONE").toUpperCase() + private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase() logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode) private val engineFactory = recoveryMode match { @@ -74,7 +75,7 @@ private[mesos] class MesosClusterDispatcher( def start(): Unit = { webUi.bind() - scheduler.frameworkUrl = conf.get("spark.mesos.dispatcher.webui.url", webUi.activeWebUiUrl) + scheduler.frameworkUrl = conf.get(DISPATCHER_WEBUI_URL).getOrElse(webUi.activeWebUiUrl) scheduler.start() server.start() } @@ -99,8 +100,8 @@ private[mesos] object MesosClusterDispatcher extends Logging { conf.setMaster(dispatcherArgs.masterUrl) conf.setAppName(dispatcherArgs.name) dispatcherArgs.zookeeperUrl.foreach { z => - conf.set("spark.deploy.recoveryMode", "ZOOKEEPER") - conf.set("spark.deploy.zookeeper.url", z) + conf.set(RECOVERY_MODE, "ZOOKEEPER") + conf.set(ZOOKEEPER_URL, z) } val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) dispatcher.start() diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index 6b297c4600..859aa836a3 100644 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.Logging import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler @@ -114,7 +115,7 @@ private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManage protected override def newShuffleBlockHandler( conf: TransportConf): ExternalShuffleBlockHandler = { - val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s") + val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S) new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) } } diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala new file mode 100644 index 0000000000..19e253394f --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.config.ConfigBuilder + +package object config { + + /* Common app configuration. */ + + private[spark] val SHUFFLE_CLEANER_INTERVAL_S = + ConfigBuilder("spark.shuffle.cleaner.interval") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("30s") + + private[spark] val RECOVERY_MODE = + ConfigBuilder("spark.deploy.recoveryMode") + .stringConf + .createWithDefault("NONE") + + private[spark] val DISPATCHER_WEBUI_URL = + ConfigBuilder("spark.mesos.dispatcher.webui.url") + .doc("Set the Spark Mesos dispatcher webui_url for interacting with the " + + "framework. If unset it will point to Spark's internal web UI.") + .stringConf + .createOptional + + private[spark] val ZOOKEEPER_URL = + ConfigBuilder("spark.deploy.zookeeper.url") + .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " + + "configuration is used to set the zookeeper URL to connect to.") + .stringConf + .createOptional + + private[spark] val HISTORY_SERVER_URL = + ConfigBuilder("spark.mesos.dispatcher.historyServer.url") + .doc("Set the URL of the history server. The dispatcher will then " + + "link each driver to its entry in the history server.") + .stringConf + .createOptional + +} diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala index 8dcbdaad86..13ba7d311e 100644 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala @@ -23,12 +23,13 @@ import scala.xml.Node import org.apache.mesos.Protos.TaskStatus +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState import org.apache.spark.ui.{UIUtils, WebUIPage} private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { - private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") + private val historyServerURL = parent.conf.get(HISTORY_SERVER_URL) def render(request: HttpServletRequest): Seq[Node] = { val state = parent.scheduler.getSchedulerState() |