From dabeb6f160f7ad7df1c54b1b8b069700dd4b74dd Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Fri, 7 Mar 2014 10:22:27 -0800 Subject: SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1 This patch allows the FaultToleranceTest to work in newer versions of Docker. See https://spark-project.atlassian.net/browse/SPARK-1136 for more details. Besides changing the Docker and FaultToleranceTest internals, this patch also changes the behavior of Master to accept new Workers which share an address with a Worker that we are currently trying to recover. This can only happen when the Worker itself was restarted and got the same IP address/port at the same time as a Master recovery occurs. Finally, this adds a good bit of ASCII art to the test to make failures, successes, and actions more apparent. This is very much needed. Author: Aaron Davidson Closes #5 from aarondav/zookeeper and squashes the following commits: 5d7a72a [Aaron Davidson] SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1 --- .../apache/spark/deploy/FaultToleranceTest.scala | 41 ++++++++++++++++++---- .../org/apache/spark/deploy/master/Master.scala | 11 ++++-- .../spark/deploy/master/SparkCuratorUtil.scala | 13 ++++++- docker/README.md | 4 ++- docker/spark-test/master/default_cmd | 8 ++++- docker/spark-test/worker/default_cmd | 8 ++++- 6 files changed, 73 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index d48c1892ae..f4eb1601be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -30,20 +30,24 @@ import scala.sys.process._ import org.json4s._ import org.json4s.jackson.JsonMethods -import org.apache.spark.{Logging, SparkContext} -import org.apache.spark.deploy.master.RecoveryState +import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil} /** * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master. * In order to mimic a real distributed cluster more closely, Docker is used. * Execute using - * ./spark-class org.apache.spark.deploy.FaultToleranceTest + * ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest * - * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS: + * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS + * *and* SPARK_JAVA_OPTS: * - spark.deploy.recoveryMode=ZOOKEEPER * - spark.deploy.zookeeper.url=172.17.42.1:2181 * Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port. * + * In case of failure, make sure to kill off prior docker containers before restarting: + * docker kill $(docker ps -q) + * * Unfortunately, due to the Docker dependency this suite cannot be run automatically without a * working installation of Docker. In addition to having Docker, the following are assumed: * - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/) @@ -51,10 +55,16 @@ import org.apache.spark.deploy.master.RecoveryState * docker/ directory. Run 'docker/spark-test/build' to generate these. */ private[spark] object FaultToleranceTest extends App with Logging { + + val conf = new SparkConf() + val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + val masters = ListBuffer[TestMasterInfo]() val workers = ListBuffer[TestWorkerInfo]() var sc: SparkContext = _ + val zk = SparkCuratorUtil.newClient(conf) + var numPassed = 0 var numFailed = 0 @@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging { sc = null } terminateCluster() + + // Clear ZK directories in between tests (for speed purposes) + SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader") + SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status") } test("sanity-basic") { @@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging { try { fn numPassed += 1 + logInfo("==============================================") logInfo("Passed: " + name) + logInfo("==============================================") } catch { case e: Exception => numFailed += 1 + logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") logError("FAILED: " + name, e) + logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + sys.exit(1) } afterEach() } def addMasters(num: Int) { + logInfo(s">>>>> ADD MASTERS $num <<<<<") (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) } } def addWorkers(num: Int) { + logInfo(s">>>>> ADD WORKERS $num <<<<<") val masterUrls = getMasterUrls(masters) (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) } } /** Creates a SparkContext, which constructs a Client to interact with our cluster. */ def createClient() = { + logInfo(">>>>> CREATE CLIENT <<<<<") if (sc != null) { sc.stop() } // Counter-hack: Because of a hack in SparkEnv#create() that changes this // property, we need to reset it. @@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging { } def killLeader(): Unit = { + logInfo(">>>>> KILL LEADER <<<<<") masters.foreach(_.readState()) val leader = getLeader masters -= leader @@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging { def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis) def terminateCluster() { + logInfo(">>>>> TERMINATE CLUSTER <<<<<") masters.foreach(_.kill()) workers.foreach(_.kill()) masters.clear() @@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging { * are all alive in a proper configuration (e.g., only one leader). */ def assertValidClusterState() = { + logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<") assertUsable() var numAlive = 0 var numStandby = 0 @@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val val workers = json \ "workers" val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE") - liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String]) + // Extract the worker IP from "webuiaddress" (rather than "host") because the host name + // on containers is a weird hash instead of the actual IP address. + liveWorkerIPs = liveWorkers.map { + w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081") + } numLiveApps = (json \ "activeapps").children.size @@ -403,7 +432,7 @@ private[spark] object Docker extends Logging { def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = { val mountCmd = if (mountDir != "") { " -v " + mountDir } else "" - val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args) + val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args) logDebug("Run command: " + cmd) cmd } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2d6d0c33fa..b8dfa44102 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -531,8 +531,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int, val workerAddress = worker.actor.path.address if (addressToWorker.contains(workerAddress)) { - logInfo("Attempted to re-register worker at same address: " + workerAddress) - return false + val oldWorker = addressToWorker(workerAddress) + if (oldWorker.state == WorkerState.UNKNOWN) { + // A worker registering from UNKNOWN implies that the worker was restarted during recovery. + // The old worker must thus be dead, so we will remove it and accept the new worker. + removeWorker(oldWorker) + } else { + logInfo("Attempted to re-register worker at same address: " + workerAddress) + return false + } } workers += worker diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala index 2d35397035..4781a80d47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala @@ -17,11 +17,13 @@ package org.apache.spark.deploy.master -import org.apache.spark.{SparkConf, Logging} +import scala.collection.JavaConversions._ + import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.zookeeper.KeeperException +import org.apache.spark.{Logging, SparkConf} object SparkCuratorUtil extends Logging { @@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging { } } } + + def deleteRecursive(zk: CuratorFramework, path: String) { + if (zk.checkExists().forPath(path) != null) { + for (child <- zk.getChildren.forPath(path)) { + zk.delete().forPath(path + "/" + child) + } + zk.delete().forPath(path) + } + } } diff --git a/docker/README.md b/docker/README.md index bf59e77d11..40ba9c3065 100644 --- a/docker/README.md +++ b/docker/README.md @@ -2,4 +2,6 @@ Spark docker files =========== Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles), -as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker). \ No newline at end of file +as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker). + +Tested with Docker version 0.8.1. diff --git a/docker/spark-test/master/default_cmd b/docker/spark-test/master/default_cmd index a5b1303c2e..5a7da3446f 100755 --- a/docker/spark-test/master/default_cmd +++ b/docker/spark-test/master/default_cmd @@ -19,4 +19,10 @@ IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }') echo "CONTAINER_IP=$IP" -/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP +export SPARK_LOCAL_IP=$IP +export SPARK_PUBLIC_DNS=$IP + +# Avoid the default Docker behavior of mapping our IP address to an unreachable host name +umount /etc/hosts + +/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP diff --git a/docker/spark-test/worker/default_cmd b/docker/spark-test/worker/default_cmd index ab6336f70c..31b06cb0eb 100755 --- a/docker/spark-test/worker/default_cmd +++ b/docker/spark-test/worker/default_cmd @@ -19,4 +19,10 @@ IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }') echo "CONTAINER_IP=$IP" -/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1 +export SPARK_LOCAL_IP=$IP +export SPARK_PUBLIC_DNS=$IP + +# Avoid the default Docker behavior of mapping our IP address to an unreachable host name +umount /etc/hosts + +/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1 -- cgit v1.2.3