aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2014-02-24 23:20:38 -0800
committerAaron Davidson <aaron@databricks.com>2014-02-24 23:20:38 -0800
commitc852201ce95c7c982ff3794c114427eb33e92922 (patch)
tree35a14cde2a51f33528959c1e608e216a0207240f
parent1f4c7f7ecc9d2393663fc4d059e71fe4c70bad84 (diff)
downloadspark-c852201ce95c7c982ff3794c114427eb33e92922.tar.gz
spark-c852201ce95c7c982ff3794c114427eb33e92922.tar.bz2
spark-c852201ce95c7c982ff3794c114427eb33e92922.zip
For SPARK-1082, Use Curator for ZK interaction in standalone cluster
Author: Raymond Liu <raymond.liu@intel.com> Closes #611 from colorant/curator and squashes the following commits: 7556aa1 [Raymond Liu] Address review comments af92e1f [Raymond Liu] Fix coding style 964f3c2 [Raymond Liu] Ignore NodeExists exception 6df2966 [Raymond Liu] Rewrite zookeeper client code with curator
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala205
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala94
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala30
-rw-r--r--pom.xml6
-rw-r--r--project/SparkBuild.scala2
9 files changed, 99 insertions, 300 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 5576b0c3b4..f209704f31 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -55,8 +55,8 @@
<artifactId>avro-ipc</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index f25a1ad3bf..a730fe1f59 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
private[spark] trait LeaderElectionAgent extends Actor {
+ //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index 74a9f8cd82..db72d8ae9b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -28,10 +28,6 @@ private[master] object MasterMessages {
case object RevokedLeadership
- // Actor System to LeaderElectionAgent
-
- case object CheckLeader
-
// Actor System to Master
case object CheckForWorkerTimeOut
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
new file mode 100644
index 0000000000..2d35397035
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.zookeeper.KeeperException
+
+
+object SparkCuratorUtil extends Logging {
+
+ val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
+ val ZK_SESSION_TIMEOUT_MILLIS = 60000
+ val RETRY_WAIT_MILLIS = 5000
+ val MAX_RECONNECT_ATTEMPTS = 3
+
+ def newClient(conf: SparkConf): CuratorFramework = {
+ val ZK_URL = conf.get("spark.deploy.zookeeper.url")
+ val zk = CuratorFrameworkFactory.newClient(ZK_URL,
+ ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
+ new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
+ zk.start()
+ zk
+ }
+
+ def mkdir(zk: CuratorFramework, path: String) {
+ if (zk.checkExists().forPath(path) == null) {
+ try {
+ zk.create().creatingParentsIfNeeded().forPath(path)
+ } catch {
+ case nodeExist: KeeperException.NodeExistsException =>
+ // do nothing, ignore node existing exception.
+ case e: Exception => throw e
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
deleted file mode 100644
index 57758055b1..0000000000
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master
-
-import scala.collection.JavaConversions._
-
-import org.apache.zookeeper._
-import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.apache.zookeeper.data.Stat
-
-import org.apache.spark.{Logging, SparkConf}
-
-/**
- * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
- * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be
- * created. If ZooKeeper remains down after several retries, the given
- * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be
- * informed via zkDown().
- *
- * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
- * times or a semantic exception is thrown (e.g., "node already exists").
- */
-private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
- conf: SparkConf) extends Logging {
- val ZK_URL = conf.get("spark.deploy.zookeeper.url", "")
-
- val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
- val ZK_TIMEOUT_MILLIS = 30000
- val RETRY_WAIT_MILLIS = 5000
- val ZK_CHECK_PERIOD_MILLIS = 10000
- val MAX_RECONNECT_ATTEMPTS = 3
-
- private var zk: ZooKeeper = _
-
- private val watcher = new ZooKeeperWatcher()
- private var reconnectAttempts = 0
- private var closed = false
-
- /** Connect to ZooKeeper to start the session. Must be called before anything else. */
- def connect() {
- connectToZooKeeper()
-
- new Thread() {
- override def run() = sessionMonitorThread()
- }.start()
- }
-
- def sessionMonitorThread(): Unit = {
- while (!closed) {
- Thread.sleep(ZK_CHECK_PERIOD_MILLIS)
- if (zk.getState != ZooKeeper.States.CONNECTED) {
- reconnectAttempts += 1
- val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts
- if (attemptsLeft <= 0) {
- logError("Could not connect to ZooKeeper: system failure")
- zkWatcher.zkDown()
- close()
- } else {
- logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...")
- connectToZooKeeper()
- }
- }
- }
- }
-
- def close() {
- if (!closed && zk != null) { zk.close() }
- closed = true
- }
-
- private def connectToZooKeeper() {
- if (zk != null) zk.close()
- zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher)
- }
-
- /**
- * Attempts to maintain a live ZooKeeper exception despite (very) transient failures.
- * Mainly useful for handling the natural ZooKeeper session expiration.
- */
- private class ZooKeeperWatcher extends Watcher {
- def process(event: WatchedEvent) {
- if (closed) { return }
-
- event.getState match {
- case KeeperState.SyncConnected =>
- reconnectAttempts = 0
- zkWatcher.zkSessionCreated()
- case KeeperState.Expired =>
- connectToZooKeeper()
- case KeeperState.Disconnected =>
- logWarning("ZooKeeper disconnected, will retry...")
- case s => // Do nothing
- }
- }
- }
-
- def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = {
- retry {
- zk.create(path, bytes, ZK_ACL, createMode)
- }
- }
-
- def exists(path: String, watcher: Watcher = null): Stat = {
- retry {
- zk.exists(path, watcher)
- }
- }
-
- def getChildren(path: String, watcher: Watcher = null): List[String] = {
- retry {
- zk.getChildren(path, watcher).toList
- }
- }
-
- def getData(path: String): Array[Byte] = {
- retry {
- zk.getData(path, false, null)
- }
- }
-
- def delete(path: String, version: Int = -1): Unit = {
- retry {
- zk.delete(path, version)
- }
- }
-
- /**
- * Creates the given directory (non-recursively) if it doesn't exist.
- * All znodes are created in PERSISTENT mode with no data.
- */
- def mkdir(path: String) {
- if (exists(path) == null) {
- try {
- create(path, "".getBytes, CreateMode.PERSISTENT)
- } catch {
- case e: Exception =>
- // If the exception caused the directory not to be created, bubble it up,
- // otherwise ignore it.
- if (exists(path) == null) { throw e }
- }
- }
- }
-
- /**
- * Recursively creates all directories up to the given one.
- * All znodes are created in PERSISTENT mode with no data.
- */
- def mkdirRecursive(path: String) {
- var fullDir = ""
- for (dentry <- path.split("/").tail) {
- fullDir += "/" + dentry
- mkdir(fullDir)
- }
- }
-
- /**
- * Retries the given function up to 3 times. The assumption is that failure is transient,
- * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist),
- * in which case the exception will be thrown without retries.
- *
- * @param fn Block to execute, possibly multiple times.
- */
- def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = {
- try {
- fn
- } catch {
- case e: KeeperException.NoNodeException => throw e
- case e: KeeperException.NodeExistsException => throw e
- case e: Exception if n > 0 =>
- logError("ZooKeeper exception, " + n + " more retries...", e)
- Thread.sleep(RETRY_WAIT_MILLIS)
- retry(fn, n-1)
- }
- }
-}
-
-trait SparkZooKeeperWatcher {
- /**
- * Called whenever a ZK session is created --
- * this will occur when we create our first session as well as each time
- * the session expires or errors out.
- */
- def zkSessionCreated()
-
- /**
- * Called if ZK appears to be completely down (i.e., not just a transient error).
- * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead.
- */
- def zkDown()
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 47b8f67f8a..285f9b014e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -18,105 +18,67 @@
package org.apache.spark.deploy.master
import akka.actor.ActorRef
-import org.apache.zookeeper._
-import org.apache.zookeeper.Watcher.Event.EventType
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.master.MasterMessages._
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
masterUrl: String, conf: SparkConf)
- extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
+ extends LeaderElectionAgent with LeaderLatchListener with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
- private val watcher = new ZooKeeperWatcher()
- private val zk = new SparkZooKeeperSession(this, conf)
+ private var zk: CuratorFramework = _
+ private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
- private var myLeaderFile: String = _
- private var leaderUrl: String = _
override def preStart() {
+
logInfo("Starting ZooKeeper LeaderElection agent")
- zk.connect()
- }
+ zk = SparkCuratorUtil.newClient(conf)
+ leaderLatch = new LeaderLatch(zk, WORKING_DIR)
+ leaderLatch.addListener(this)
- override def zkSessionCreated() {
- synchronized {
- zk.mkdirRecursive(WORKING_DIR)
- myLeaderFile =
- zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
- self ! CheckLeader
- }
+ leaderLatch.start()
}
override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) {
- logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason)
- Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
+ logError("LeaderElectionAgent failed...", reason)
super.preRestart(reason, message)
}
- override def zkDown() {
- logError("ZooKeeper down! LeaderElectionAgent shutting down Master.")
- System.exit(1)
- }
-
override def postStop() {
+ leaderLatch.close()
zk.close()
}
override def receive = {
- case CheckLeader => checkLeader()
+ case _ =>
}
- private class ZooKeeperWatcher extends Watcher {
- def process(event: WatchedEvent) {
- if (event.getType == EventType.NodeDeleted) {
- logInfo("Leader file disappeared, a master is down!")
- self ! CheckLeader
+ override def isLeader() {
+ synchronized {
+ // could have lost leadership by now.
+ if (!leaderLatch.hasLeadership) {
+ return
}
- }
- }
- /** Uses ZK leader election. Navigates several ZK potholes along the way. */
- def checkLeader() {
- val masters = zk.getChildren(WORKING_DIR).toList
- val leader = masters.sorted.head
- val leaderFile = WORKING_DIR + "/" + leader
-
- // Setup a watch for the current leader.
- zk.exists(leaderFile, watcher)
-
- try {
- leaderUrl = new String(zk.getData(leaderFile))
- } catch {
- // A NoNodeException may be thrown if old leader died since the start of this method call.
- // This is fine -- just check again, since we're guaranteed to see the new values.
- case e: KeeperException.NoNodeException =>
- logInfo("Leader disappeared while reading it -- finding next leader")
- checkLeader()
- return
+ logInfo("We have gained leadership")
+ updateLeadershipStatus(true)
}
+ }
- // Synchronization used to ensure no interleaving between the creation of a new session and the
- // checking of a leader, which could cause us to delete our real leader file erroneously.
+ override def notLeader() {
synchronized {
- val isLeader = myLeaderFile == leaderFile
- if (!isLeader && leaderUrl == masterUrl) {
- // We found a different master file pointing to this process.
- // This can happen in the following two cases:
- // (1) The master process was restarted on the same node.
- // (2) The ZK server died between creating the file and returning the name of the file.
- // For this case, we will end up creating a second file, and MUST explicitly delete the
- // first one, since our ZK session is still open.
- // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
- // leader changes.
- assert(leaderFile < myLeaderFile)
- logWarning("Cleaning up old ZK master election file that points to this master.")
- zk.delete(leaderFile)
- } else {
- updateLeadershipStatus(isLeader)
+ // could have gained leadership by now.
+ if (leaderLatch.hasLeadership) {
+ return
}
+
+ logInfo("We have lost leadership")
+ updateLeadershipStatus(false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 48b2fc06a9..939006239d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,36 +17,28 @@
package org.apache.spark.deploy.master
+import scala.collection.JavaConversions._
+
import akka.serialization.Serialization
-import org.apache.zookeeper._
+import org.apache.zookeeper.CreateMode
import org.apache.spark.{Logging, SparkConf}
class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
extends PersistenceEngine
- with SparkZooKeeperWatcher
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ val zk = SparkCuratorUtil.newClient(conf)
- val zk = new SparkZooKeeperSession(this, conf)
-
- zk.connect()
-
- override def zkSessionCreated() {
- zk.mkdirRecursive(WORKING_DIR)
- }
-
- override def zkDown() {
- logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.")
- }
+ SparkCuratorUtil.mkdir(zk, WORKING_DIR)
override def addApplication(app: ApplicationInfo) {
serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
}
override def removeApplication(app: ApplicationInfo) {
- zk.delete(WORKING_DIR + "/app_" + app.id)
+ zk.delete().forPath(WORKING_DIR + "/app_" + app.id)
}
override def addDriver(driver: DriverInfo) {
@@ -54,7 +46,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
}
override def removeDriver(driver: DriverInfo) {
- zk.delete(WORKING_DIR + "/driver_" + driver.id)
+ zk.delete().forPath(WORKING_DIR + "/driver_" + driver.id)
}
override def addWorker(worker: WorkerInfo) {
@@ -62,7 +54,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
}
override def removeWorker(worker: WorkerInfo) {
- zk.delete(WORKING_DIR + "/worker_" + worker.id)
+ zk.delete().forPath(WORKING_DIR + "/worker_" + worker.id)
}
override def close() {
@@ -70,7 +62,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
}
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
- val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
+ val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
val appFiles = sortedFiles.filter(_.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
@@ -83,11 +75,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
private def serializeIntoFile(path: String, value: AnyRef) {
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
- zk.create(path, serialized, CreateMode.PERSISTENT)
+ zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
}
def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
- val fileData = zk.getData(WORKING_DIR + "/" + filename)
+ val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
diff --git a/pom.xml b/pom.xml
index 3a530685b8..4f1e8398d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -393,9 +393,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f0d2e74148..220894affb 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -277,7 +277,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib, excludeCommonsLogging, excludeSLF4J),
"org.apache.avro" % "avro" % "1.7.4",
"org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
- "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
+ "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeNetty),
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",