aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-11-10 16:32:32 -0800
committerReynold Xin <rxin@databricks.com>2015-11-10 16:32:32 -0800
commita3989058c0938c8c59c278e7d1a766701cfa255b (patch)
tree0f0bf059eb3fc02c3c21feb209a22015a6af205a /core
parent21c562fa03430365f5c2b7d6de1f8f60ab2140d4 (diff)
downloadspark-a3989058c0938c8c59c278e7d1a766701cfa255b.tar.gz
spark-a3989058c0938c8c59c278e7d1a766701cfa255b.tar.bz2
spark-a3989058c0938c8c59c278e7d1a766701cfa255b.zip
[SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply`
Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala209
2 files changed, 238 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 25ea692543..3f29da663b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -49,8 +49,8 @@ private[spark] class AppClient(
private val REGISTRATION_TIMEOUT_SECONDS = 20
private val REGISTRATION_RETRIES = 3
- private var endpoint: RpcEndpointRef = null
- private var appId: String = null
+ @volatile private var endpoint: RpcEndpointRef = null
+ @volatile private var appId: String = null
@volatile private var registered = false
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
@@ -77,6 +77,11 @@ private[spark] class AppClient(
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
+ // A thread pool to perform receive then reply actions in a thread so as not to block the
+ // event loop.
+ private val askAndReplyThreadPool =
+ ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
+
override def onStart(): Unit = {
try {
registerWithMaster(1)
@@ -200,7 +205,7 @@ private[spark] class AppClient(
case r: RequestExecutors =>
master match {
- case Some(m) => context.reply(m.askWithRetry[Boolean](r))
+ case Some(m) => askAndReplyAsync(m, context, r)
case None =>
logWarning("Attempted to request executors before registering with Master.")
context.reply(false)
@@ -208,13 +213,32 @@ private[spark] class AppClient(
case k: KillExecutors =>
master match {
- case Some(m) => context.reply(m.askWithRetry[Boolean](k))
+ case Some(m) => askAndReplyAsync(m, context, k)
case None =>
logWarning("Attempted to kill executors before registering with Master.")
context.reply(false)
}
}
+ private def askAndReplyAsync[T](
+ endpointRef: RpcEndpointRef,
+ context: RpcCallContext,
+ msg: T): Unit = {
+ // Create a thread to ask a message and reply with the result. Allow thread to be
+ // interrupted during shutdown, otherwise context must be notified of NonFatal errors.
+ askAndReplyThreadPool.execute(new Runnable {
+ override def run(): Unit = {
+ try {
+ context.reply(endpointRef.askWithRetry[Boolean](msg))
+ } catch {
+ case ie: InterruptedException => // Cancelled
+ case NonFatal(t) =>
+ context.sendFailure(t)
+ }
+ }
+ })
+ }
+
override def onDisconnected(address: RpcAddress): Unit = {
if (master.exists(_.address == address)) {
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
@@ -252,6 +276,7 @@ private[spark] class AppClient(
registrationRetryThread.shutdownNow()
registerMasterFutures.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
+ askAndReplyThreadPool.shutdownNow()
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
new file mode 100644
index 0000000000..1e5c05a73f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.deploy.{ApplicationDescription, Command}
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.master.{ApplicationInfo, Master}
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.util.Utils
+
+/**
+ * End-to-end tests for application client in standalone mode.
+ */
+class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll {
+ private val numWorkers = 2
+ private val conf = new SparkConf()
+ private val securityManager = new SecurityManager(conf)
+
+ private var masterRpcEnv: RpcEnv = null
+ private var workerRpcEnvs: Seq[RpcEnv] = null
+ private var master: Master = null
+ private var workers: Seq[Worker] = null
+
+ /**
+ * Start the local cluster.
+ * Note: local-cluster mode is insufficient because we want a reference to the Master.
+ */
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
+ workerRpcEnvs = (0 until numWorkers).map { i =>
+ RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
+ }
+ master = makeMaster()
+ workers = makeWorkers(10, 2048)
+ // Wait until all workers register with master successfully
+ eventually(timeout(60.seconds), interval(10.millis)) {
+ assert(getMasterState.workers.size === numWorkers)
+ }
+ }
+
+ override def afterAll(): Unit = {
+ workerRpcEnvs.foreach(_.shutdown())
+ masterRpcEnv.shutdown()
+ workers.foreach(_.stop())
+ master.stop()
+ workerRpcEnvs = null
+ masterRpcEnv = null
+ workers = null
+ master = null
+ super.afterAll()
+ }
+
+ test("interface methods of AppClient using local Master") {
+ val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
+
+ ci.client.start()
+
+ // Client should connect with one Master which registers the application
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection")
+ assert(apps.size === 1, "master should have 1 registered app")
+ }
+
+ // Send message to Master to request Executors, verify request by change in executor limit
+ val numExecutorsRequested = 1
+ assert(ci.client.requestTotalExecutors(numExecutorsRequested))
+
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed")
+ }
+
+ // Send request to kill executor, verify request was made
+ assert {
+ val apps = getApplications()
+ val executorId: String = apps.head.executors.head._2.fullId
+ ci.client.killExecutors(Seq(executorId))
+ }
+
+ // Issue stop command for Client to disconnect from Master
+ ci.client.stop()
+
+ // Verify Client is marked dead and unregistered from Master
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead")
+ assert(apps.isEmpty, "master should have 0 registered apps")
+ }
+ }
+
+ test("request from AppClient before initialized with master") {
+ val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
+
+ // requests to master should fail immediately
+ assert(ci.client.requestTotalExecutors(3) === false)
+ }
+
+ // ===============================
+ // | Utility methods for testing |
+ // ===============================
+
+ /** Return a SparkConf for applications that want to talk to our Master. */
+ private def appConf: SparkConf = {
+ new SparkConf()
+ .setMaster(masterRpcEnv.address.toSparkURL)
+ .setAppName("test")
+ .set("spark.executor.memory", "256m")
+ }
+
+ /** Make a master to which our application will send executor requests. */
+ private def makeMaster(): Master = {
+ val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
+ masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ master
+ }
+
+ /** Make a few workers that talk to our master. */
+ private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
+ (0 until numWorkers).map { i =>
+ val rpcEnv = workerRpcEnvs(i)
+ val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
+ Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
+ rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
+ worker
+ }
+ }
+
+ /** Get the Master state */
+ private def getMasterState: MasterStateResponse = {
+ master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+ }
+
+ /** Get the applictions that are active from Master */
+ private def getApplications(): Seq[ApplicationInfo] = {
+ getMasterState.activeApps
+ }
+
+ /** Application Listener to collect events */
+ private class AppClientCollector extends AppClientListener with Logging {
+ val connectedIdList = new ArrayBuffer[String] with SynchronizedBuffer[String]
+ @volatile var disconnectedCount: Int = 0
+ val deadReasonList = new ArrayBuffer[String] with SynchronizedBuffer[String]
+ val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
+ val execRemovedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
+
+ def connected(id: String): Unit = {
+ connectedIdList += id
+ }
+
+ def disconnected(): Unit = {
+ synchronized {
+ disconnectedCount += 1
+ }
+ }
+
+ def dead(reason: String): Unit = {
+ deadReasonList += reason
+ }
+
+ def executorAdded(
+ id: String,
+ workerId: String,
+ hostPort: String,
+ cores: Int,
+ memory: Int): Unit = {
+ execAddedList += id
+ }
+
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
+ execRemovedList += id
+ }
+ }
+
+ /** Create AppClient and supporting objects */
+ private class AppClientInst(masterUrl: String) {
+ val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, securityManager)
+ private val cmd = new Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"),
+ List(), Map(), Seq(), Seq(), Seq())
+ private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored")
+ val listener = new AppClientCollector
+ val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf)
+ }
+
+}