From a3989058c0938c8c59c278e7d1a766701cfa255b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 10 Nov 2015 16:32:32 -0800 Subject: [SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply` Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface. Author: Bryan Cutler Closes #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827. --- .../org/apache/spark/deploy/client/AppClient.scala | 33 +++- .../spark/deploy/client/AppClientSuite.scala | 209 +++++++++++++++++++++ 2 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 25ea692543..3f29da663b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -49,8 +49,8 @@ private[spark] class AppClient( private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var endpoint: RpcEndpointRef = null - private var appId: String = null + @volatile private var endpoint: RpcEndpointRef = null + @volatile private var appId: String = null @volatile private var registered = false private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint @@ -77,6 +77,11 @@ private[spark] class AppClient( private val registrationRetryThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") + // A thread pool to perform receive then reply actions in a thread so as not to block the + // event loop. + private val askAndReplyThreadPool = + ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool") + override def onStart(): Unit = { try { registerWithMaster(1) @@ -200,7 +205,7 @@ private[spark] class AppClient( case r: RequestExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](r)) + case Some(m) => askAndReplyAsync(m, context, r) case None => logWarning("Attempted to request executors before registering with Master.") context.reply(false) @@ -208,13 +213,32 @@ private[spark] class AppClient( case k: KillExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](k)) + case Some(m) => askAndReplyAsync(m, context, k) case None => logWarning("Attempted to kill executors before registering with Master.") context.reply(false) } } + private def askAndReplyAsync[T]( + endpointRef: RpcEndpointRef, + context: RpcCallContext, + msg: T): Unit = { + // Create a thread to ask a message and reply with the result. Allow thread to be + // interrupted during shutdown, otherwise context must be notified of NonFatal errors. + askAndReplyThreadPool.execute(new Runnable { + override def run(): Unit = { + try { + context.reply(endpointRef.askWithRetry[Boolean](msg)) + } catch { + case ie: InterruptedException => // Cancelled + case NonFatal(t) => + context.sendFailure(t) + } + } + }) + } + override def onDisconnected(address: RpcAddress): Unit = { if (master.exists(_.address == address)) { logWarning(s"Connection to $address failed; waiting for master to reconnect...") @@ -252,6 +276,7 @@ private[spark] class AppClient( registrationRetryThread.shutdownNow() registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() + askAndReplyThreadPool.shutdownNow() } } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala new file mode 100644 index 0000000000..1e5c05a73f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.client + +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.concurrent.duration._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark._ +import org.apache.spark.deploy.{ApplicationDescription, Command} +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.master.{ApplicationInfo, Master} +import org.apache.spark.deploy.worker.Worker +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.util.Utils + +/** + * End-to-end tests for application client in standalone mode. + */ +class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll { + private val numWorkers = 2 + private val conf = new SparkConf() + private val securityManager = new SecurityManager(conf) + + private var masterRpcEnv: RpcEnv = null + private var workerRpcEnvs: Seq[RpcEnv] = null + private var master: Master = null + private var workers: Seq[Worker] = null + + /** + * Start the local cluster. + * Note: local-cluster mode is insufficient because we want a reference to the Master. + */ + override def beforeAll(): Unit = { + super.beforeAll() + masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager) + workerRpcEnvs = (0 until numWorkers).map { i => + RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager) + } + master = makeMaster() + workers = makeWorkers(10, 2048) + // Wait until all workers register with master successfully + eventually(timeout(60.seconds), interval(10.millis)) { + assert(getMasterState.workers.size === numWorkers) + } + } + + override def afterAll(): Unit = { + workerRpcEnvs.foreach(_.shutdown()) + masterRpcEnv.shutdown() + workers.foreach(_.stop()) + master.stop() + workerRpcEnvs = null + masterRpcEnv = null + workers = null + master = null + super.afterAll() + } + + test("interface methods of AppClient using local Master") { + val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) + + ci.client.start() + + // Client should connect with one Master which registers the application + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") + assert(apps.size === 1, "master should have 1 registered app") + } + + // Send message to Master to request Executors, verify request by change in executor limit + val numExecutorsRequested = 1 + assert(ci.client.requestTotalExecutors(numExecutorsRequested)) + + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed") + } + + // Send request to kill executor, verify request was made + assert { + val apps = getApplications() + val executorId: String = apps.head.executors.head._2.fullId + ci.client.killExecutors(Seq(executorId)) + } + + // Issue stop command for Client to disconnect from Master + ci.client.stop() + + // Verify Client is marked dead and unregistered from Master + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead") + assert(apps.isEmpty, "master should have 0 registered apps") + } + } + + test("request from AppClient before initialized with master") { + val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) + + // requests to master should fail immediately + assert(ci.client.requestTotalExecutors(3) === false) + } + + // =============================== + // | Utility methods for testing | + // =============================== + + /** Return a SparkConf for applications that want to talk to our Master. */ + private def appConf: SparkConf = { + new SparkConf() + .setMaster(masterRpcEnv.address.toSparkURL) + .setAppName("test") + .set("spark.executor.memory", "256m") + } + + /** Make a master to which our application will send executor requests. */ + private def makeMaster(): Master = { + val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf) + masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) + master + } + + /** Make a few workers that talk to our master. */ + private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = { + (0 until numWorkers).map { i => + val rpcEnv = workerRpcEnvs(i) + val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), + Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager) + rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker) + worker + } + } + + /** Get the Master state */ + private def getMasterState: MasterStateResponse = { + master.self.askWithRetry[MasterStateResponse](RequestMasterState) + } + + /** Get the applictions that are active from Master */ + private def getApplications(): Seq[ApplicationInfo] = { + getMasterState.activeApps + } + + /** Application Listener to collect events */ + private class AppClientCollector extends AppClientListener with Logging { + val connectedIdList = new ArrayBuffer[String] with SynchronizedBuffer[String] + @volatile var disconnectedCount: Int = 0 + val deadReasonList = new ArrayBuffer[String] with SynchronizedBuffer[String] + val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String] + val execRemovedList = new ArrayBuffer[String] with SynchronizedBuffer[String] + + def connected(id: String): Unit = { + connectedIdList += id + } + + def disconnected(): Unit = { + synchronized { + disconnectedCount += 1 + } + } + + def dead(reason: String): Unit = { + deadReasonList += reason + } + + def executorAdded( + id: String, + workerId: String, + hostPort: String, + cores: Int, + memory: Int): Unit = { + execAddedList += id + } + + def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = { + execRemovedList += id + } + } + + /** Create AppClient and supporting objects */ + private class AppClientInst(masterUrl: String) { + val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, securityManager) + private val cmd = new Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"), + List(), Map(), Seq(), Seq(), Seq()) + private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored") + val listener = new AppClientCollector + val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf) + } + +} -- cgit v1.2.3