aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-04-28 12:08:18 -0700
committerAndrew Or <andrew@databricks.com>2015-04-28 12:08:18 -0700
commit8aab94d8984e9d12194dbda47b2e7d9dbc036889 (patch)
tree6ae73a4d711bb4a44db690390ab7626ad2c40f61
parent52ccf1d3739694826915cdf01642bab02958eb78 (diff)
downloadspark-8aab94d8984e9d12194dbda47b2e7d9dbc036889.tar.gz
spark-8aab94d8984e9d12194dbda47b2e7d9dbc036889.tar.bz2
spark-8aab94d8984e9d12194dbda47b2e7d9dbc036889.zip
[SPARK-4286] Add an external shuffle service that can be run as a daemon.
This allows Mesos deployments to use the shuffle service (and implicitly dynamic allocation). It does so by adding a new "main" class and two corresponding scripts in `sbin`: - `sbin/start-shuffle-service.sh` - `sbin/stop-shuffle-service.sh` Specific options can be passed in `SPARK_SHUFFLE_OPTS`. This is picking up work from #3861 /cc tnachen Author: Iulian Dragos <jaguarul@gmail.com> Closes #4990 from dragos/feature/external-shuffle-service and squashes the following commits: 6c2b148 [Iulian Dragos] Import order and wrong name fixup. 07804ad [Iulian Dragos] Moved ExternalShuffleService to the `deploy` package + other minor tweaks. 4dc1f91 [Iulian Dragos] Reviewer’s comments: 8145429 [Iulian Dragos] Add an external shuffle service that can be run as a daemon.
-rwxr-xr-xconf/spark-env.sh.template3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala (renamed from core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala)59
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala13
-rw-r--r--docs/job-scheduling.md2
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java4
-rwxr-xr-xsbin/start-shuffle-service.sh33
-rwxr-xr-xsbin/stop-shuffle-service.sh25
7 files changed, 124 insertions, 15 deletions
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 67f81d3336..43c4288912 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -3,7 +3,7 @@
# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.
-# Options read when launching programs locally with
+# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
@@ -39,6 +39,7 @@
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
+# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index b9798963ba..cd16f992a3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.deploy.worker
+package org.apache.spark.deploy
+
+import java.util.concurrent.CountDownLatch
import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.network.TransportContext
@@ -23,6 +25,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslRpcHandler
import org.apache.spark.network.server.TransportServer
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.util.Utils
/**
* Provides a server from which Executors can read shuffle files (rather than reading directly from
@@ -31,8 +34,8 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
*
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
*/
-private[worker]
-class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
+private[deploy]
+class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
@@ -51,16 +54,58 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
/** Starts the external shuffle service if the user has configured us to. */
def startIfEnabled() {
if (enabled) {
- require(server == null, "Shuffle server already started")
- logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
- server = transportContext.createServer(port)
+ start()
}
}
+ /** Start the external shuffle service */
+ def start() {
+ require(server == null, "Shuffle server already started")
+ logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
+ server = transportContext.createServer(port)
+ }
+
def stop() {
- if (enabled && server != null) {
+ if (server != null) {
server.close()
server = null
}
}
}
+
+/**
+ * A main class for running the external shuffle service.
+ */
+object ExternalShuffleService extends Logging {
+ @volatile
+ private var server: ExternalShuffleService = _
+
+ private val barrier = new CountDownLatch(1)
+
+ def main(args: Array[String]): Unit = {
+ val sparkConf = new SparkConf
+ Utils.loadDefaultSparkProperties(sparkConf)
+ val securityManager = new SecurityManager(sparkConf)
+
+ // we override this value since this service is started from the command line
+ // and we assume the user really wants it to be running
+ sparkConf.set("spark.shuffle.service.enabled", "true")
+ server = new ExternalShuffleService(sparkConf, securityManager)
+ server.start()
+
+ installShutdownHook()
+
+ // keep running until the process is terminated
+ barrier.await()
+ }
+
+ private def installShutdownHook(): Unit = {
+ Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
+ override def run() {
+ logInfo("Shutting down shuffle service.")
+ server.stop()
+ barrier.countDown()
+ }
+ })
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 3ee2eb69e8..8f3cc54051 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,6 +34,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
@@ -61,7 +62,7 @@ private[worker] class Worker(
assert (port > 0)
// For worker and executor IDs
- private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
+ private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -85,10 +86,10 @@ private[worker] class Worker(
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
- private val CLEANUP_INTERVAL_MILLIS =
+ private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
- private val APP_DATA_RETENTION_SECS =
+ private val APP_DATA_RETENTION_SECS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
private val testing: Boolean = sys.props.contains("spark.testing")
@@ -112,7 +113,7 @@ private[worker] class Worker(
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
-
+
var workDir: File = null
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
@@ -122,7 +123,7 @@ private[worker] class Worker(
val finishedApps = new HashSet[String]
// The shuffle service is not actually started unless configured.
- private val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
+ private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
@@ -134,7 +135,7 @@ private[worker] class Worker(
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
private val workerSource = new WorkerSource(this)
-
+
private var registrationRetryTimer: Option[Cancellable] = None
var coresUsed = 0
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 963e88a3e1..8d9c2ba204 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -32,7 +32,7 @@ Resource allocation can be configured as follows, based on the cluster type:
* **Standalone mode:** By default, applications submitted to the standalone mode cluster will run in
FIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limit
the number of nodes an application uses by setting the `spark.cores.max` configuration property in it,
- or change the default for applications that don't set this setting through `spark.deploy.defaultCores`.
+ or change the default for applications that don't set this setting through `spark.deploy.defaultCores`.
Finally, in addition to controlling cores, each application's `spark.executor.memory` setting controls
its memory use.
* **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index e601a0a19f..d80abf2a86 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -69,6 +69,10 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
} else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
memKey = "SPARK_EXECUTOR_MEMORY";
+ } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
} else if (className.startsWith("org.apache.spark.tools.")) {
String sparkHome = getSparkHome();
File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
diff --git a/sbin/start-shuffle-service.sh b/sbin/start-shuffle-service.sh
new file mode 100755
index 0000000000..4fddcf7f95
--- /dev/null
+++ b/sbin/start-shuffle-service.sh
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Starts the external shuffle server on the machine this script is executed on.
+#
+# Usage: start-shuffle-server.sh
+#
+# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle server configuration.
+#
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+. "$sbin/spark-config.sh"
+. "$SPARK_PREFIX/bin/load-spark-env.sh"
+
+exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.ExternalShuffleService 1
diff --git a/sbin/stop-shuffle-service.sh b/sbin/stop-shuffle-service.sh
new file mode 100755
index 0000000000..4cb6891ae2
--- /dev/null
+++ b/sbin/stop-shuffle-service.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the external shuffle service on the machine this script is executed on.
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.ExternalShuffleService 1