From 68068977b85d2355223e21ebf4e546a13f0a8585 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 14 Sep 2013 20:51:11 -0700 Subject: Fix build on ubuntu --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'project') diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 77e211ce03..eb5a89394b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -81,7 +81,7 @@ object SparkBuild extends Build { organization := "org.apache.spark", version := "0.8.0-SNAPSHOT", scalaVersion := "2.9.3", - scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", + scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION), javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, -- cgit v1.2.3 From 6079721fa17cb2eeb0a9896405c75baaff0e98d7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 24 Sep 2013 11:41:51 -0700 Subject: Update build version in master --- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- project/SparkBuild.scala | 2 +- python/pyspark/shell.py | 2 +- repl-bin/pom.xml | 2 +- repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 14 files changed, 15 insertions(+), 15 deletions(-) (limited to 'project') diff --git a/assembly/pom.xml b/assembly/pom.xml index 808a829e19..d62332137a 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 51173c32b2..c4ce006085 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 14cd520aaf..9c2d6046a9 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/docs/_config.yml b/docs/_config.yml index b061764b36..48ecb8d0c9 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,8 +3,8 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 0.8.0-SNAPSHOT -SPARK_VERSION_SHORT: 0.8.0 +SPARK_VERSION: 0.9.0-incubating-SNAPSHOT +SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT SCALA_VERSION: 2.9.3 MESOS_VERSION: 0.13.0 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/examples/pom.xml b/examples/pom.xml index e48f5b50ab..b9cc6f5e0a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index 966caf6835..4ef4f0ae4e 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 4aed1260f0..d74d45adf1 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT pom Spark Project Parent POM http://spark.incubator.apache.org/ diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ed7671757b..a2e29591fc 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -79,7 +79,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.apache.spark", - version := "0.8.0-SNAPSHOT", + version := "0.9.0-incubating-SNAPSHOT", scalaVersion := "2.9.3", scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION), diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index dc205b306f..a475959090 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -35,7 +35,7 @@ print """Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /__ / .__/\_,_/_/ /_/\_\ version 0.8.0 + /__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT /_/ """ print "Using Python version %s (%s, %s)" % ( diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 3685561501..05aadc7bdf 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 193ccb48ee..36f54a22cf 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 + /___/ .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT /_/ """) import Properties._ diff --git a/streaming/pom.xml b/streaming/pom.xml index 7bea069b61..b260a72abb 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 77646a6816..29f0014128 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 21b650d1ea..427fcdf545 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 0.8.0-SNAPSHOT + 0.9.0-incubating-SNAPSHOT ../pom.xml -- cgit v1.2.3 From 3f283278b00fc0a98a6c8cccd704bfc476f5d765 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 26 Sep 2013 13:58:10 -0700 Subject: Removed scala -optimize flag. --- pom.xml | 1 - project/SparkBuild.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) (limited to 'project') diff --git a/pom.xml b/pom.xml index d74d45adf1..ad5051d38a 100644 --- a/pom.xml +++ b/pom.xml @@ -557,7 +557,6 @@ true -unchecked - -optimise -deprecation diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 99cdadb9e7..aef246d8a9 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -81,7 +81,7 @@ object SparkBuild extends Build { organization := "org.apache.spark", version := "0.9.0-incubating-SNAPSHOT", scalaVersion := "2.9.3", - scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-optimize", "-deprecation", + scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation", "-target:" + SCALAC_JVM_VERSION), javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, -- cgit v1.2.3 From f549ea33d3d5a584f5d9965bb8e56462a1d6528e Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 19 Sep 2013 14:40:14 -0700 Subject: Standalone Scheduler fault tolerance using ZooKeeper This patch implements full distributed fault tolerance for standalone scheduler Masters. There is only one master Leader at a time, which is actively serving scheduling requests. If this Leader crashes, another master will eventually be elected, reconstruct the state from the first Master, and continue serving scheduling requests. Leader election is performed using the ZooKeeper leader election pattern. We try to minimize the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of retries and session monitoring on top of the ZooKeeper client. Master failover follows directly from the single-node Master recovery via the file system (patch 194ba4b8), save that the Master state is stored in ZooKeeper instead. Configuration: By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE). By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled. By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory to an appropriate directory accessible by the Master, we will keep the behavior of from 194ba4b8. Additionally, places where a Master could be specificied by a spark:// url can now take comma-delimited lists to specify backup masters. Note that this is only used for registration of NEW Workers and application Clients. Once a Worker or Client has registered with the Master Leader, it is "in the system" and will never need to register again. Forthcoming: Documentation, tests (! - only ad hoc testing has been performed so far) I do not intend for this commit to be merged until tests are added, but this patch should still be mostly reviewable until then. --- .../main/scala/org/apache/spark/SparkContext.scala | 7 +- .../org/apache/spark/deploy/DeployMessage.scala | 20 +-- .../apache/spark/deploy/ExecutorDescription.scala | 5 + .../apache/spark/deploy/LocalSparkCluster.scala | 7 +- .../org/apache/spark/deploy/client/Client.scala | 58 +++++-- .../spark/deploy/client/ClientListener.scala | 4 + .../apache/spark/deploy/client/TestClient.scala | 7 +- .../spark/deploy/master/ApplicationInfo.scala | 18 +- .../apache/spark/deploy/master/ExecutorInfo.scala | 2 +- .../master/FileSystemPersistenceEngine.scala | 14 +- .../spark/deploy/master/LeaderElectionAgent.scala | 28 ++++ .../org/apache/spark/deploy/master/Master.scala | 145 +++++++++------- .../spark/deploy/master/MasterMessages.scala | 29 ++++ .../apache/spark/deploy/master/MasterState.scala | 4 +- .../spark/deploy/master/PersistenceEngine.scala | 4 +- .../deploy/master/SparkZooKeeperSession.scala | 183 +++++++++++++++++++++ .../apache/spark/deploy/master/WorkerInfo.scala | 24 +-- .../master/ZooKeeperLeaderElectionAgent.scala | 109 ++++++++++++ .../deploy/master/ZooKeeperPersistenceEngine.scala | 64 +++++++ .../org/apache/spark/deploy/worker/Worker.scala | 125 +++++++++----- .../spark/deploy/worker/WorkerArguments.scala | 8 +- .../spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 11 +- pom.xml | 11 ++ project/SparkBuild.scala | 1 + 25 files changed, 720 insertions(+), 170 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala (limited to 'project') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 912ce752fb..5318847276 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -169,7 +169,8 @@ class SparkContext( case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + val masterUrls = sparkUrl.split(",") + val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) scheduler.initialize(backend) scheduler @@ -185,8 +186,8 @@ class SparkContext( val scheduler = new ClusterScheduler(this) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) - val sparkUrl = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + val masterUrls = localCluster.start() + val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 0d0745a480..31d1909279 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -27,6 +27,7 @@ import org.apache.spark.util.Utils private[deploy] sealed trait DeployMessage extends Serializable +/** Contains messages sent between Scheduler actor nodes. */ private[deploy] object DeployMessages { // Worker to Master @@ -58,13 +59,14 @@ private[deploy] object DeployMessages { // Master to Worker - case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage + case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage case class RegisterWorkerFailed(message: String) extends DeployMessage - case class KillExecutor(appId: String, execId: Int) extends DeployMessage + case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage case class LaunchExecutor( + masterUrl: String, appId: String, execId: Int, appDesc: ApplicationDescription, @@ -82,7 +84,7 @@ private[deploy] object DeployMessages { // Master to Client - case class RegisteredApplication(appId: String) extends DeployMessage + case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage // TODO(matei): replace hostPort with host case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { @@ -131,16 +133,4 @@ private[deploy] object DeployMessages { assert (port > 0) } - // Actor System to Master - - case object CheckForWorkerTimeOut - - case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) - - case object EndRecoveryProcess - - case object RequestWebUIPort - - case class WebUIPortResponse(webUIBoundPort: Int) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala index 716ee483d5..2abf0b69dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala @@ -17,6 +17,11 @@ package org.apache.spark.deploy +/** + * Used to send state on-the-wire about Executors from Worker to Master. + * This state is sufficient for the Master to reconstruct its internal data structures during + * failover. + */ private[spark] class ExecutorDescription( val appId: String, val execId: Int, diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 10161c8204..308a2bfa22 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -39,22 +39,23 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I private val masterActorSystems = ArrayBuffer[ActorSystem]() private val workerActorSystems = ArrayBuffer[ActorSystem]() - def start(): String = { + def start(): Array[String] = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0) masterActorSystems += masterSystem val masterUrl = "spark://" + localHostname + ":" + masterPort + val masters = Array(masterUrl) /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masterUrl, null, Some(workerNum)) + memoryPerWorker, masters, null, Some(workerNum)) workerActorSystems += workerSystem } - return masterUrl + return masters } def stop() { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 28548a2ca9..aa2a10a8ad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -23,6 +23,7 @@ import akka.actor._ import akka.actor.Terminated import akka.pattern.ask import akka.util.Duration +import akka.util.duration._ import akka.remote.RemoteClientDisconnected import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown @@ -40,27 +41,27 @@ import org.apache.spark.deploy.master.Master */ private[spark] class Client( actorSystem: ActorSystem, - masterUrl: String, + masterUrls: Array[String], appDescription: ApplicationDescription, listener: ClientListener) extends Logging { + val REGISTRATION_TIMEOUT = 60 * 1000 + var actor: ActorRef = null var appId: String = null + var registered = false + var activeMasterUrl: String = null class ClientActor extends Actor with Logging { var master: ActorRef = null var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times + var alreadyDead = false // To avoid calling listener.dead() multiple times override def preStart() { - logInfo("Connecting to master " + masterUrl) try { - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - masterAddress = master.path.address - master ! RegisterApplication(appDescription) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + connectToMaster() } catch { case e: Exception => logError("Failed to connect to master", e) @@ -69,9 +70,34 @@ private[spark] class Client( } } + def connectToMaster() { + for (masterUrl <- masterUrls) { + logInfo("Connecting to master " + masterUrl + "...") + val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + actor ! RegisterApplication(appDescription) + } + + context.system.scheduler.scheduleOnce(REGISTRATION_TIMEOUT millis) { + if (!registered) { + logError("All masters are unresponsive! Giving up.") + markDead() + } + } + } + + def changeMaster(url: String) { + activeMasterUrl = url + master = context.actorFor(Master.toAkkaUrl(url)) + masterAddress = master.path.address + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } + override def receive = { - case RegisteredApplication(appId_) => + case RegisteredApplication(appId_, masterUrl) => appId = appId_ + registered = true + changeMaster(masterUrl) listener.connected(appId) case ApplicationRemoved(message) => @@ -92,13 +118,12 @@ private[spark] class Client( listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } - case MasterChanged(materUrl, masterWebUiUrl) => + case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) context.unwatch(master) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - masterAddress = master.path.address + changeMaster(masterUrl) + alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - context.watch(master) case Terminated(actor_) if actor_ == master => logError("Connection to master failed; waiting for master to reconnect...") @@ -113,7 +138,7 @@ private[spark] class Client( markDisconnected() case StopClient => - markDisconnected() + markDead() sender ! true context.stop(self) } @@ -127,6 +152,13 @@ private[spark] class Client( alreadyDisconnected = true } } + + def markDead() { + if (!alreadyDead) { + listener.dead() + alreadyDead = true + } + } } def start() { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala index 4605368c11..be7a11bd15 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala @@ -27,8 +27,12 @@ package org.apache.spark.deploy.client private[spark] trait ClientListener { def connected(appId: String): Unit + /** Disconnection may be a temporary state, as we fail over to a new Master. */ def disconnected(): Unit + /** Dead means that we couldn't find any Masters to connect to, and have given up. */ + def dead(): Unit + def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index d5e9a0e095..5b62d3ba6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -33,6 +33,11 @@ private[spark] object TestClient { System.exit(0) } + def dead() { + logInfo("Could not connect to master") + System.exit(0) + } + def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {} def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {} @@ -44,7 +49,7 @@ private[spark] object TestClient { val desc = new ApplicationDescription( "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored") val listener = new TestListener - val client = new Client(actorSystem, url, desc, listener) + val client = new Client(actorSystem, Array(url), desc, listener) client.start() actorSystem.awaitTermination() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index e437a0e7ae..8291e29ec3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -23,12 +23,12 @@ import akka.actor.ActorRef import scala.collection.mutable private[spark] class ApplicationInfo( - val startTime: Long, - val id: String, - val desc: ApplicationDescription, - val submitDate: Date, - val driver: ActorRef, - val appUiUrl: String) + val startTime: Long, + val id: String, + val desc: ApplicationDescription, + val submitDate: Date, + val driver: ActorRef, + val appUiUrl: String) extends Serializable { @transient var state: ApplicationState.Value = _ @@ -39,14 +39,14 @@ private[spark] class ApplicationInfo( @transient private var nextExecutorId: Int = _ - init() + init private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init() + init } - private def init() { + private def init = { state = ApplicationState.WAITING executors = new mutable.HashMap[Int, ExecutorInfo] coresGranted = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala index d235234c13..76db61dd61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala @@ -28,7 +28,7 @@ private[spark] class ExecutorInfo( var state = ExecutorState.LAUNCHING - /** Copy all state variables from the given on-the-wire ExecutorDescription. */ + /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */ def copyState(execDesc: ExecutorDescription) { state = execDesc.state } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 2fc13821bd..c0849ef324 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -32,8 +32,8 @@ import org.apache.spark.Logging * @param serialization Used to serialize our objects. */ private[spark] class FileSystemPersistenceEngine( - val dir: String, - val serialization: Serialization) + val dir: String, + val serialization: Serialization) extends PersistenceEngine with Logging { new File(dir).mkdir() @@ -57,11 +57,11 @@ private[spark] class FileSystemPersistenceEngine( } override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { - val sortedFiles = new File(dir).listFiles().sortBy(_.getName()) - val appFiles = sortedFiles.filter(_.getName().startsWith("app_")) - val apps = appFiles.map(deserializeFromFile[ApplicationInfo](_)) - val workerFiles = sortedFiles.filter(_.getName().startsWith("worker_")) - val workers = workerFiles.map(deserializeFromFile[WorkerInfo](_)) + val sortedFiles = new File(dir).listFiles().sortBy(_.getName) + val appFiles = sortedFiles.filter(_.getName.startsWith("app_")) + val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_")) + val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) (apps, workers) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala new file mode 100644 index 0000000000..c44a23f8c6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -0,0 +1,28 @@ +package org.apache.spark.deploy.master + +import akka.actor.{Actor, ActorRef} + +import org.apache.spark.deploy.master.MasterMessages.ElectedLeader + +/** + * A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it + * is the only Master serving requests. + * In addition to the API provided, the LeaderElectionAgent will use of the following messages + * to inform the Master of leader changes: + * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]] + * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] + */ +trait LeaderElectionAgent extends Actor { + val masterActor: ActorRef +} + +/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ +class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { + override def preStart() { + masterActor ! ElectedLeader + } + + override def receive = { + case _ => + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c6e039eed4..e13a8cba4a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -34,18 +34,18 @@ import akka.util.{Duration, Timeout} import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.master.MasterState.MasterState +import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{AkkaUtils, Utils} - private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "") + val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE") var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] @@ -76,75 +76,115 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (envVar != null) envVar else host } - var state: MasterState = _ + val masterUrl = "spark://" + host + ":" + port + var masterWebUiUrl: String = _ + + var state = MasterState.STANDBY var persistenceEngine: PersistenceEngine = _ + var leaderElectionAgent: ActorRef = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean override def preStart() { - logInfo("Starting Spark master at spark://" + host + ":" + port) + logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) webUi.start() + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() - persistenceEngine = - if (RECOVERY_DIR.isEmpty()) { - new BlackHolePersistenceEngine() - } else { + persistenceEngine = RECOVERY_MODE match { + case "ZOOKEEPER" => + logInfo("Persisting recovery state to ZooKeeper") + new ZooKeeperPersistenceEngine(SerializationExtension(context.system)) + case "FILESYSTEM" => logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system)) - } + case _ => + new BlackHolePersistenceEngine() + } - val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() - state = - if (storedApps.isEmpty && storedWorkers.isEmpty) { - MasterState.ALIVE - } else { - self ! BeginRecovery(storedApps, storedWorkers) - MasterState.RECOVERING - } + leaderElectionAgent = context.actorOf(Props( + RECOVERY_MODE match { + case "ZOOKEEPER" => + new ZooKeeperLeaderElectionAgent(self, masterUrl) + case _ => + new MonarchyLeaderAgent(self) + })) + } + + override def preRestart(reason: Throwable, message: Option[Any]) { + logError("Master actor restarted due to exception", reason) } override def postStop() { webUi.stop() masterMetricsSystem.stop() applicationMetricsSystem.stop() + persistenceEngine.close() + context.stop(leaderElectionAgent) } override def receive = { + case ElectedLeader => { + val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() + state = if (storedApps.isEmpty && storedWorkers.isEmpty) + MasterState.ALIVE + else + MasterState.RECOVERING + + logInfo("I have been elected leader! New state: " + state) + + if (state == MasterState.RECOVERING) { + beginRecovery(storedApps, storedWorkers) + context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } + } + } + + case RevokedLeadership => { + logError("Leadership has been revoked -- master shutting down.") + System.exit(0) + } + case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) - if (idToWorker.contains(id)) { + if (state == MasterState.STANDBY) { + // ignore, don't send response + } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) registerWorker(worker) context.watch(sender) // This doesn't work with remote actors but helps for testing persistenceEngine.addWorker(worker) - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get) + sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } } case RegisterApplication(description) => { - logInfo("Registering app " + description.name) - val app = createApplication(description, sender) - registerApplication(app) - logInfo("Registered app " + description.name + " with ID " + app.id) - context.watch(sender) // This doesn't work with remote actors but helps for testing - persistenceEngine.addApplication(app) - sender ! RegisteredApplication(app.id) - schedule() + if (state == MasterState.STANDBY) { + // ignore, don't send response + } else { + logInfo("Registering app " + description.name) + val app = createApplication(description, sender) + registerApplication(app) + logInfo("Registered app " + description.name + " with ID " + app.id) + context.watch(sender) // This doesn't work with remote actors but helps for testing + persistenceEngine.addApplication(app) + sender ! RegisteredApplication(app.id, masterUrl) + schedule() + } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { @@ -184,27 +224,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - case BeginRecovery(storedApps, storedWorkers) => { - context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, EndRecoveryProcess) - - val masterUrl = "spark://" + host + ":" + port - val masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get - for (app <- storedApps) { - registerApplication(app) - app.state = ApplicationState.UNKNOWN - app.driver ! MasterChanged(masterUrl, masterWebUiUrl) - } - for (worker <- storedWorkers) { - registerWorker(worker) - worker.state = WorkerState.UNKNOWN - worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) - } - } - case MasterChangeAcknowledged(appId) => { - val appOption = idToApp.get(appId) - appOption match { + idToApp.get(appId) match { case Some(app) => + logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) @@ -216,9 +239,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case WorkerSchedulerStateResponse(workerId, executors) => { idToWorker.get(workerId) match { case Some(worker) => + logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE - val validExecutors = executors.filter(exec => idToApp.get(exec.appId) != None) + val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) for (exec <- validExecutors) { val app = idToApp.get(exec.appId).get val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) @@ -232,10 +256,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (canCompleteRecovery) { completeRecovery() } } - case EndRecoveryProcess => { - completeRecovery() - } - case Terminated(actor) => { // The disconnected actor could've been either a worker or an app; remove whichever of // those we have an entry for in the corresponding actor hashmap @@ -275,15 +295,29 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers.count(_.state == WorkerState.UNKNOWN) == 0 && apps.count(_.state == ApplicationState.UNKNOWN) == 0 + def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) { + for (app <- storedApps) { + registerApplication(app) + app.state = ApplicationState.UNKNOWN + app.driver ! MasterChanged(masterUrl, masterWebUiUrl) + } + for (worker <- storedWorkers) { + registerWorker(worker) + worker.state = WorkerState.UNKNOWN + worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) + } + } + def completeRecovery() { + // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { if (state != MasterState.RECOVERING) { return } state = MasterState.COMPLETING_RECOVERY } // Kill off any workers and apps that didn't respond to us. - workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_)) - apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication(_)) + workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) + apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) state = MasterState.ALIVE schedule() @@ -352,7 +386,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.actor ! LaunchExecutor( + worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) @@ -415,6 +449,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (firstApp == None) { firstApp = Some(app) } + // TODO: What is firstApp?? Can we remove it? val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) { logWarning("Could not find any workers with enough memory for " + firstApp.get.id) @@ -444,7 +479,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act waitingApps -= app for (exec <- app.executors.values) { exec.worker.removeExecutor(exec) - exec.worker.actor ! KillExecutor(exec.application.id, exec.id) + exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id) exec.state = ExecutorState.KILLED } app.markFinished(state) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala new file mode 100644 index 0000000000..6e31d40b43 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -0,0 +1,29 @@ +package org.apache.spark.deploy.master + +import org.apache.spark.util.Utils + +sealed trait MasterMessages extends Serializable + +/** Contains messages seen only by the Master and its associated entities. */ +private[master] object MasterMessages { + + // LeaderElectionAgent to Master + + case object ElectedLeader + + case object RevokedLeadership + + // Actor System to LeaderElectionAgent + + case object CheckLeader + + // Actor System to Master + + case object CheckForWorkerTimeOut + + case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) + + case object RequestWebUIPort + + case class WebUIPortResponse(webUIBoundPort: Int) +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala index 9ea5e9752e..eec3df3b7a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala @@ -18,9 +18,9 @@ package org.apache.spark.deploy.master private[spark] object MasterState - extends Enumeration("ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { + extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { type MasterState = Value - val ALIVE, RECOVERING, COMPLETING_RECOVERY = Value + val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 07d23c6bf3..8c4878bd30 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -36,9 +36,11 @@ trait PersistenceEngine { /** * Returns the persisted data sorted by their respective ids (which implies that they're - * sorted by time order of creation). + * sorted by time of creation). */ def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) + + def close() {} } class BlackHolePersistenceEngine extends PersistenceEngine { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala new file mode 100644 index 0000000000..5492a3a988 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -0,0 +1,183 @@ +package org.apache.spark.deploy.master + +import scala.collection.JavaConversions._ +import scala.concurrent.ops._ + +import org.apache.spark.Logging +import org.apache.zookeeper._ +import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.Watcher.Event.KeeperState + +/** + * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry + * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be + * created. If ZooKeeper remains down after several retries, the given + * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be + * informed via zkDown(). + * + * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many + * times or a semantic exception is thrown (e.g.., "node already exists"). + */ +class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { + val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "") + + val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE + val ZK_TIMEOUT_MILLIS = 30000 + val RETRY_WAIT_MILLIS = 5000 + val ZK_CHECK_PERIOD_MILLIS = 10000 + val MAX_RECONNECT_ATTEMPTS = 3 + + private var zk: ZooKeeper = _ + + private val watcher = new ZooKeeperWatcher() + private var reconnectAttempts = 0 + private var closed = false + + /** Connect to ZooKeeper to start the session. Must be called before anything else. */ + def connect() { + connectToZooKeeper() + spawn(sessionMonitorThread) + } + + def sessionMonitorThread = { + while (!closed) { + Thread.sleep(ZK_CHECK_PERIOD_MILLIS) + if (zk.getState != ZooKeeper.States.CONNECTED) { + reconnectAttempts += 1 + val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts + if (attemptsLeft <= 0) { + logError("Could not connect to ZooKeeper: system failure") + zkWatcher.zkDown() + close() + } else { + logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...") + connectToZooKeeper() + } + } + } + } + + def close() { + if (!closed && zk != null) { zk.close() } + closed = true + } + + private def connectToZooKeeper() { + if (zk != null) zk.close() + zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher) + } + + /** + * Attempts to maintain a live ZooKeeper exception despite (very) transient failures. + * Mainly useful for handling the natural ZooKeeper session expiration. + */ + private class ZooKeeperWatcher extends Watcher { + def process(event: WatchedEvent) { + if (closed) { return } + + event.getState match { + case KeeperState.SyncConnected => + reconnectAttempts = 0 + zkWatcher.zkSessionCreated() + case KeeperState.Expired => + connectToZooKeeper() + case KeeperState.Disconnected => + logWarning("ZooKeeper disconnected, will retry...") + } + } + } + + def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = { + retry { + zk.create(path, bytes, ZK_ACL, createMode) + } + } + + def exists(path: String, watcher: Watcher = null): Stat = { + retry { + zk.exists(path, watcher) + } + } + + def getChildren(path: String, watcher: Watcher = null): List[String] = { + retry { + zk.getChildren(path, watcher).toList + } + } + + def getData(path: String): Array[Byte] = { + retry { + zk.getData(path, false, null) + } + } + + def delete(path: String, version: Int = -1): Unit = { + retry { + zk.delete(path, version) + } + } + + /** + * Creates the given directory (non-recursively) if it doesn't exist. + * All znodes are created in PERSISTENT mode with no data. + */ + def mkdir(path: String) { + if (exists(path) == null) { + try { + create(path, "".getBytes, CreateMode.PERSISTENT) + } catch { + case e: Exception => + // If the exception caused the directory not to be created, bubble it up, + // otherwise ignore it. + if (exists(path) == null) { throw e } + } + } + } + + /** + * Recursively creates all directories up to the given one. + * All znodes are created in PERSISTENT mode with no data. + */ + def mkdirRecursive(path: String) { + var fullDir = "" + for (dentry <- path.split("/").tail) { + fullDir += "/" + dentry + mkdir(fullDir) + } + } + + /** + * Retries the given function up to 3 times. The assumption is that failure is transient, + * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist), + * in which case the exception will be thrown without retries. + * + * @param fn Block to execute, possibly multiple times. + */ + def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = { + try { + fn + } catch { + case e: KeeperException.NoNodeException => throw e + case e: KeeperException.NodeExistsException => throw e + case e if n > 0 => + logError("ZooKeeper exception, " + n + " more retries...", e) + Thread.sleep(RETRY_WAIT_MILLIS) + retry(fn)(n-1) + } + } +} + +trait SparkZooKeeperWatcher { + /** + * Called whenever a ZK session is created -- + * this will occur when we create our first session as well as each time + * the session expires or errors out. + */ + def zkSessionCreated() + + /** + * Called if ZK appears to be completely down (i.e., not just a transient error). + * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead. + */ + def zkDown() +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 2ab7bb233c..26090c6a9c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -22,14 +22,14 @@ import scala.collection.mutable import org.apache.spark.util.Utils private[spark] class WorkerInfo( - val id: String, - val host: String, - val port: Int, - val cores: Int, - val memory: Int, - val actor: ActorRef, - val webUiPort: Int, - val publicAddress: String) + val id: String, + val host: String, + val port: Int, + val cores: Int, + val memory: Int, + val actor: ActorRef, + val webUiPort: Int, + val publicAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") @@ -42,18 +42,18 @@ private[spark] class WorkerInfo( @transient var lastHeartbeat: Long = _ - init() + init def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init() + init } - private def init() { - executors = new mutable.HashMap[String, ExecutorInfo] + private def init = { + executors = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 memoryUsed = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala new file mode 100644 index 0000000000..4ca59e5b24 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -0,0 +1,109 @@ +package org.apache.spark.deploy.master + +import scala.collection.JavaConversions._ + +import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership} +import org.apache.spark.Logging +import org.apache.zookeeper._ +import org.apache.zookeeper.Watcher.Event.EventType + +import akka.actor.{Cancellable, ActorRef} +import akka.util.duration._ + +class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) + extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { + + val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" + + private val watcher = new ZooKeeperWatcher() + private val zk = new SparkZooKeeperSession(this) + private var status = LeadershipStatus.NOT_LEADER + private var myLeaderFile: String = _ + private var leaderUrl: String = _ + + override def preStart() { + logInfo("Starting ZooKeeper LeaderElection agent") + zk.connect() + } + + override def zkSessionCreated() { + zk.mkdirRecursive(WORKING_DIR) + myLeaderFile = + zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) + self ! CheckLeader + } + + override def zkDown() { + logError("ZooKeeper down! LeaderElectionAgent shutting down Master.") + System.exit(1) + } + + override def postStop() { + zk.close() + } + + override def receive = { + case CheckLeader => checkLeader() + } + + private class ZooKeeperWatcher extends Watcher { + def process(event: WatchedEvent) { + if (event.getType == EventType.NodeDeleted) { + logInfo("Leader file disappeared, a master is down!") + self ! CheckLeader + } + } + } + + /** Uses ZK leader election. Navigates several ZK potholes along the way. */ + def checkLeader() { + val masters = zk.getChildren(WORKING_DIR).toList + val leader = masters.sorted.get(0) + val leaderFile = WORKING_DIR + "/" + leader + + // Setup a watch for the current leader. + zk.exists(leaderFile, watcher) + + try { + leaderUrl = new String(zk.getData(leaderFile)) + } catch { + // A NoNodeException may be thrown if old leader died since the start of this method call. + // This is fine -- just check again, since we're guaranteed to see the new values. + case e: KeeperException.NoNodeException => + logInfo("Leader disappeared while reading it -- finding next leader") + checkLeader() + return + } + + val isLeader = myLeaderFile == leaderFile + if (!isLeader && leaderUrl == masterUrl) { + // We found a different master file pointing to this process. + // This can happen in the following two cases: + // (1) The master process was restarted on the same node. + // (2) The ZK server died between creating the node and returning the name of the node. + // For this case, we will end up creating a second file, and MUST explicitly delete the + // first one, since our ZK session is still open. + // Note that this deletion will cause a NodeDeleted event to be fired so we check again for + // leader changes. + logWarning("Cleaning up old ZK master election file that points to this master.") + zk.delete(leaderFile) + } else { + updateLeadershipStatus(isLeader) + } + } + + def updateLeadershipStatus(isLeader: Boolean) { + if (isLeader && status == LeadershipStatus.NOT_LEADER) { + status = LeadershipStatus.LEADER + masterActor ! ElectedLeader + } else if (!isLeader && status == LeadershipStatus.LEADER) { + status = LeadershipStatus.NOT_LEADER + masterActor ! RevokedLeadership + } + } + + private object LeadershipStatus extends Enumeration { + type LeadershipStatus = Value + val LEADER, NOT_LEADER = Value + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala new file mode 100644 index 0000000000..f45b62cbdd --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -0,0 +1,64 @@ +package org.apache.spark.deploy.master + +import org.apache.spark.Logging +import org.apache.zookeeper._ + +import akka.serialization.Serialization + +class ZooKeeperPersistenceEngine(serialization: Serialization) extends PersistenceEngine with SparkZooKeeperWatcher with Logging { + val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status" + + val zk = new SparkZooKeeperSession(this) + + zk.connect() + + override def zkSessionCreated() { + zk.mkdirRecursive(WORKING_DIR) + } + + override def zkDown() { + logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.") + } + + override def addApplication(app: ApplicationInfo) { + serializeIntoFile(WORKING_DIR + "/app_" + app.id, app) + } + + override def removeApplication(app: ApplicationInfo) { + zk.delete(WORKING_DIR + "/app_" + app.id) + } + + override def addWorker(worker: WorkerInfo) { + serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker) + } + + override def removeWorker(worker: WorkerInfo) { + zk.delete(WORKING_DIR + "/worker_" + worker.id) + } + + override def close() { + zk.close() + } + + override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { + val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted + val appFiles = sortedFiles.filter(_.startsWith("app_")) + val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val workerFiles = sortedFiles.filter(_.startsWith("worker_")) + val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) + (apps, workers) + } + + private def serializeIntoFile(path: String, value: Serializable) { + val serializer = serialization.findSerializerFor(value) + val serialized = serializer.toBinary(value) + zk.create(path, serialized, CreateMode.PERSISTENT) + } + + def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = { + val fileData = zk.getData("/spark/master_status/" + filename) + val clazz = m.erasure.asInstanceOf[Class[T]] + val serializer = serialization.serializerFor(clazz) + serializer.fromBinary(fileData).asInstanceOf[T] + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 46455aa5ae..73fb0c8bd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -23,11 +23,11 @@ import java.io.File import scala.collection.mutable.HashMap -import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} +import akka.actor._ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import akka.util.duration._ -import org.apache.spark.{SparkEnv, Logging} +import org.apache.spark.Logging import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -35,14 +35,13 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} - private[spark] class Worker( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - var masterUrl: String, + masterUrls: Array[String], workDirPath: String = null) extends Actor with Logging { @@ -54,8 +53,16 @@ private[spark] class Worker( // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4 + val REGISTRATION_TIMEOUT = 20.seconds + val REGISTRATION_RETRIES = 3 + + // Index into masterUrls that we're currently trying to register with. + var masterIndex = 0 + var master: ActorRef = null - var masterWebUiUrl : String = "" + var activeMasterUrl: String = "" + var activeMasterWebUiUrl : String = "" + var registered = false val workerId = generateWorkerId() var sparkHome: File = null var workDir: File = null @@ -103,35 +110,62 @@ private[spark] class Worker( webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.start() - connectToMaster() + registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() } - def connectToMaster() { - logInfo("Connecting to master " + masterUrl) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress) + def changeMaster(url: String, uiUrl: String) { + activeMasterUrl = url + activeMasterWebUiUrl = uiUrl + master = context.actorFor(Master.toAkkaUrl(activeMasterUrl)) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } + def tryRegisterAllMasters() { + for (masterUrl <- masterUrls) { + logInfo("Connecting to master " + masterUrl + "...") + val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, + publicAddress) + } + } + + def registerWithMaster() { + tryRegisterAllMasters() + + var retries = 0 + lazy val retryTimer: Cancellable = + context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { + retries += 1 + if (registered) { + retryTimer.cancel() + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + System.exit(1) + } else { + tryRegisterAllMasters() + } + } + retryTimer // start timer + } + override def receive = { - case RegisteredWorker(url) => - masterWebUiUrl = url - logInfo("Successfully registered with master") + case RegisteredWorker(masterUrl, masterWebUiUrl) => + logInfo("Successfully registered with master " + masterUrl) + registered = true + changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) } - case MasterChanged(url, uiUrl) => - logInfo("Master has changed, new master is at " + url) - masterUrl = url - masterWebUiUrl = uiUrl + case MasterChanged(masterUrl, masterWebUiUrl) => + logInfo("Master has changed, new master is at " + masterUrl) context.unwatch(master) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - context.watch(master) // Doesn't work with remote actors, but useful for testing + changeMaster(masterUrl, masterWebUiUrl) + val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) sender ! WorkerSchedulerStateResponse(workerId, execs.toList) @@ -140,15 +174,19 @@ private[spark] class Worker( logError("Worker registration failed: " + message) System.exit(1) - case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) => - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) - executors(appId + "/" + execId) = manager - manager.start() - coresUsed += cores_ - memoryUsed += memory_ - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) => + if (masterUrl != activeMasterUrl) { + logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") + } else { + logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, + self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) + executors(appId + "/" + execId) = manager + manager.start() + coresUsed += cores_ + memoryUsed += memory_ + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) @@ -164,14 +202,18 @@ private[spark] class Worker( memoryUsed -= executor.memory } - case KillExecutor(appId, execId) => - val fullId = appId + "/" + execId - executors.get(fullId) match { - case Some(executor) => - logInfo("Asked to kill executor " + fullId) - executor.kill() - case None => - logInfo("Asked to kill unknown executor " + fullId) + case KillExecutor(masterUrl, appId, execId) => + if (masterUrl != activeMasterUrl) { + logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId) + } else { + val fullId = appId + "/" + execId + executors.get(fullId) match { + case Some(executor) => + logInfo("Asked to kill executor " + fullId) + executor.kill() + case None => + logInfo("Asked to kill unknown executor " + fullId) + } } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => @@ -179,8 +221,8 @@ private[spark] class Worker( case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, - finishedExecutors.values.toList, masterUrl, cores, memory, - coresUsed, memoryUsed, masterWebUiUrl) + finishedExecutors.values.toList, activeMasterUrl, cores, memory, + coresUsed, memoryUsed, activeMasterWebUiUrl) } } @@ -203,17 +245,18 @@ private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.master, args.workDir) + args.memory, args.masters, args.workDir) actorSystem.awaitTermination() } def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None) + : (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, - masterUrl, workDir)), name = "Worker") + masterUrls, workDir)), name = "Worker") (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 0ae89a864f..16d8686490 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -29,7 +29,7 @@ private[spark] class WorkerArguments(args: Array[String]) { var webUiPort = 8081 var cores = inferDefaultCores() var memory = inferDefaultMemory() - var master: String = null + var masters: Array[String] = null var workDir: String = null // Check for settings in environment variables @@ -86,14 +86,14 @@ private[spark] class WorkerArguments(args: Array[String]) { printUsageAndExit(0) case value :: tail => - if (master != null) { // Two positional arguments were given + if (masters != null) { // Two positional arguments were given printUsageAndExit(1) } - master = value + masters = value.split(",") parse(tail) case Nil => - if (master == null) { // No positional argument was given + if (masters == null) { // No positional argument was given printUsageAndExit(1) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 95d6007f3b..800f1cafcc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -105,7 +105,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val logText = {Utils.offsetBytes(path, startByte, endByte)} - val linkToMaster =

Back to Master

+ val linkToMaster =

Back to Master

val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index c173cdf449..8a3017e964 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, - master: String, + masters: Array[String], appName: String) extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) with ClientListener @@ -52,7 +52,7 @@ private[spark] class SparkDeploySchedulerBackend( val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) - client = new Client(sc.env.actorSystem, master, appDesc, this) + client = new Client(sc.env.actorSystem, masters, appDesc, this) client.start() } @@ -75,6 +75,13 @@ private[spark] class SparkDeploySchedulerBackend( } } + override def dead() { + if (!stopping) { + logError("Spark cluster looks dead, giving up.") + scheduler.error("Spark cluster looks down") + } + } + override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) diff --git a/pom.xml b/pom.xml index d74d45adf1..f8ea2b8920 100644 --- a/pom.xml +++ b/pom.xml @@ -344,6 +344,17 @@ 0.9 test + + org.apache.zookeeper + zookeeper + 3.4.5 + + + org.jboss.netty + netty + + + org.apache.hadoop hadoop-client diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 99cdadb9e7..156f501a04 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -211,6 +211,7 @@ object SparkBuild extends Build { "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), + "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", -- cgit v1.2.3 From 9fd6bba60d908c1c176e3bbd34add1853ecc1d8d Mon Sep 17 00:00:00 2001 From: Du Li Date: Tue, 1 Oct 2013 15:46:51 -0700 Subject: ask ivy/sbt to check local maven repo under ~/.m2 --- project/SparkBuild.scala | 3 +++ 1 file changed, 3 insertions(+) (limited to 'project') diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index aef246d8a9..cdec6168af 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -97,6 +97,9 @@ object SparkBuild extends Build { // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), + // also check the local Maven repository ~/.m2 + resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), + // Shared between both core and streaming. resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), -- cgit v1.2.3 From 213b70a2db5e92ada4b762bc39876c01a3530897 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 7 Oct 2013 10:47:45 -0700 Subject: Merge pull request #31 from sundeepn/branch-0.8 Resolving package conflicts with hadoop 0.23.9 Hadoop 0.23.9 is having a package conflict with easymock's dependencies. (cherry picked from commit 023e3fdf008b3194a36985a07923df9aaf64e520) Signed-off-by: Reynold Xin --- core/src/test/scala/org/apache/spark/ui/UISuite.scala | 7 ++++++- project/SparkBuild.scala | 13 ++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) (limited to 'project') diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 07c9f2382b..8f0ec6683b 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -26,7 +26,12 @@ class UISuite extends FunSuite { test("jetty port increases under contention") { val startPort = 4040 val server = new Server(startPort) - server.start() + + Try { server.start() } match { + case Success(s) => + case Failure(e) => + // Either case server port is busy hence setup for test complete + } val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq()) val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq()) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index cdec6168af..eb4b96eb47 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -156,6 +156,7 @@ object SparkBuild extends Build { */ + libraryDependencies ++= Seq( "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", "org.scalatest" %% "scalatest" % "1.9.1" % "test", @@ -178,6 +179,7 @@ object SparkBuild extends Build { val slf4jVersion = "1.7.2" + val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") val excludeNetty = ExclusionRule(organization = "org.jboss.netty") val excludeAsm = ExclusionRule(organization = "asm") @@ -210,7 +212,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.13.0", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), @@ -248,6 +250,7 @@ object SparkBuild extends Build { exclude("log4j","log4j") exclude("org.apache.cassandra.deps", "avro") excludeAll(excludeSnappy) + excludeAll(excludeCglib) ) ) ++ assemblySettings ++ extraAssemblySettings @@ -290,10 +293,10 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) ) ) -- cgit v1.2.3