aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-04-28 12:08:18 -0700
committerAndrew Or <andrew@databricks.com>2015-04-28 12:08:18 -0700
commit8aab94d8984e9d12194dbda47b2e7d9dbc036889 (patch)
tree6ae73a4d711bb4a44db690390ab7626ad2c40f61 /core
parent52ccf1d3739694826915cdf01642bab02958eb78 (diff)
downloadspark-8aab94d8984e9d12194dbda47b2e7d9dbc036889.tar.gz
spark-8aab94d8984e9d12194dbda47b2e7d9dbc036889.tar.bz2
spark-8aab94d8984e9d12194dbda47b2e7d9dbc036889.zip
[SPARK-4286] Add an external shuffle service that can be run as a daemon.
This allows Mesos deployments to use the shuffle service (and implicitly dynamic allocation). It does so by adding a new "main" class and two corresponding scripts in `sbin`: - `sbin/start-shuffle-service.sh` - `sbin/stop-shuffle-service.sh` Specific options can be passed in `SPARK_SHUFFLE_OPTS`. This is picking up work from #3861 /cc tnachen Author: Iulian Dragos <jaguarul@gmail.com> Closes #4990 from dragos/feature/external-shuffle-service and squashes the following commits: 6c2b148 [Iulian Dragos] Import order and wrong name fixup. 07804ad [Iulian Dragos] Moved ExternalShuffleService to the `deploy` package + other minor tweaks. 4dc1f91 [Iulian Dragos] Reviewer’s comments: 8145429 [Iulian Dragos] Add an external shuffle service that can be run as a daemon.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala (renamed from core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala)59
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala13
2 files changed, 59 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index b9798963ba..cd16f992a3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.deploy.worker
+package org.apache.spark.deploy
+
+import java.util.concurrent.CountDownLatch
import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.network.TransportContext
@@ -23,6 +25,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslRpcHandler
import org.apache.spark.network.server.TransportServer
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.util.Utils
/**
* Provides a server from which Executors can read shuffle files (rather than reading directly from
@@ -31,8 +34,8 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
*
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
*/
-private[worker]
-class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
+private[deploy]
+class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
@@ -51,16 +54,58 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
/** Starts the external shuffle service if the user has configured us to. */
def startIfEnabled() {
if (enabled) {
- require(server == null, "Shuffle server already started")
- logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
- server = transportContext.createServer(port)
+ start()
}
}
+ /** Start the external shuffle service */
+ def start() {
+ require(server == null, "Shuffle server already started")
+ logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
+ server = transportContext.createServer(port)
+ }
+
def stop() {
- if (enabled && server != null) {
+ if (server != null) {
server.close()
server = null
}
}
}
+
+/**
+ * A main class for running the external shuffle service.
+ */
+object ExternalShuffleService extends Logging {
+ @volatile
+ private var server: ExternalShuffleService = _
+
+ private val barrier = new CountDownLatch(1)
+
+ def main(args: Array[String]): Unit = {
+ val sparkConf = new SparkConf
+ Utils.loadDefaultSparkProperties(sparkConf)
+ val securityManager = new SecurityManager(sparkConf)
+
+ // we override this value since this service is started from the command line
+ // and we assume the user really wants it to be running
+ sparkConf.set("spark.shuffle.service.enabled", "true")
+ server = new ExternalShuffleService(sparkConf, securityManager)
+ server.start()
+
+ installShutdownHook()
+
+ // keep running until the process is terminated
+ barrier.await()
+ }
+
+ private def installShutdownHook(): Unit = {
+ Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
+ override def run() {
+ logInfo("Shutting down shuffle service.")
+ server.stop()
+ barrier.countDown()
+ }
+ })
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 3ee2eb69e8..8f3cc54051 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,6 +34,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
@@ -61,7 +62,7 @@ private[worker] class Worker(
assert (port > 0)
// For worker and executor IDs
- private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
+ private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -85,10 +86,10 @@ private[worker] class Worker(
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
- private val CLEANUP_INTERVAL_MILLIS =
+ private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
- private val APP_DATA_RETENTION_SECS =
+ private val APP_DATA_RETENTION_SECS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
private val testing: Boolean = sys.props.contains("spark.testing")
@@ -112,7 +113,7 @@ private[worker] class Worker(
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
-
+
var workDir: File = null
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
@@ -122,7 +123,7 @@ private[worker] class Worker(
val finishedApps = new HashSet[String]
// The shuffle service is not actually started unless configured.
- private val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
+ private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
@@ -134,7 +135,7 @@ private[worker] class Worker(
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
private val workerSource = new WorkerSource(this)
-
+
private var registrationRetryTimer: Option[Cancellable] = None
var coresUsed = 0