From c852201ce95c7c982ff3794c114427eb33e92922 Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Mon, 24 Feb 2014 23:20:38 -0800 Subject: For SPARK-1082, Use Curator for ZK interaction in standalone cluster Author: Raymond Liu Closes #611 from colorant/curator and squashes the following commits: 7556aa1 [Raymond Liu] Address review comments af92e1f [Raymond Liu] Fix coding style 964f3c2 [Raymond Liu] Ignore NodeExists exception 6df2966 [Raymond Liu] Rewrite zookeeper client code with curator --- core/pom.xml | 4 +- .../spark/deploy/master/LeaderElectionAgent.scala | 1 + .../spark/deploy/master/MasterMessages.scala | 4 - .../spark/deploy/master/SparkCuratorUtil.scala | 53 ++++++ .../deploy/master/SparkZooKeeperSession.scala | 205 --------------------- .../master/ZooKeeperLeaderElectionAgent.scala | 94 +++------- .../deploy/master/ZooKeeperPersistenceEngine.scala | 30 ++- pom.xml | 6 +- project/SparkBuild.scala | 2 +- 9 files changed, 99 insertions(+), 300 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala delete mode 100644 core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala diff --git a/core/pom.xml b/core/pom.xml index 5576b0c3b4..f209704f31 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -55,8 +55,8 @@ avro-ipc - org.apache.zookeeper - zookeeper + org.apache.curator + curator-recipes org.eclipse.jetty diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index f25a1ad3bf..a730fe1f59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] */ private[spark] trait LeaderElectionAgent extends Actor { + //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring. val masterActor: ActorRef } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index 74a9f8cd82..db72d8ae9b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -28,10 +28,6 @@ private[master] object MasterMessages { case object RevokedLeadership - // Actor System to LeaderElectionAgent - - case object CheckLeader - // Actor System to Master case object CheckForWorkerTimeOut diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala new file mode 100644 index 0000000000..2d35397035 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.{SparkConf, Logging} +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.zookeeper.KeeperException + + +object SparkCuratorUtil extends Logging { + + val ZK_CONNECTION_TIMEOUT_MILLIS = 15000 + val ZK_SESSION_TIMEOUT_MILLIS = 60000 + val RETRY_WAIT_MILLIS = 5000 + val MAX_RECONNECT_ATTEMPTS = 3 + + def newClient(conf: SparkConf): CuratorFramework = { + val ZK_URL = conf.get("spark.deploy.zookeeper.url") + val zk = CuratorFrameworkFactory.newClient(ZK_URL, + ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS, + new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS)) + zk.start() + zk + } + + def mkdir(zk: CuratorFramework, path: String) { + if (zk.checkExists().forPath(path) == null) { + try { + zk.create().creatingParentsIfNeeded().forPath(path) + } catch { + case nodeExist: KeeperException.NodeExistsException => + // do nothing, ignore node existing exception. + case e: Exception => throw e + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala deleted file mode 100644 index 57758055b1..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master - -import scala.collection.JavaConversions._ - -import org.apache.zookeeper._ -import org.apache.zookeeper.Watcher.Event.KeeperState -import org.apache.zookeeper.data.Stat - -import org.apache.spark.{Logging, SparkConf} - -/** - * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry - * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be - * created. If ZooKeeper remains down after several retries, the given - * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be - * informed via zkDown(). - * - * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many - * times or a semantic exception is thrown (e.g., "node already exists"). - */ -private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher, - conf: SparkConf) extends Logging { - val ZK_URL = conf.get("spark.deploy.zookeeper.url", "") - - val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE - val ZK_TIMEOUT_MILLIS = 30000 - val RETRY_WAIT_MILLIS = 5000 - val ZK_CHECK_PERIOD_MILLIS = 10000 - val MAX_RECONNECT_ATTEMPTS = 3 - - private var zk: ZooKeeper = _ - - private val watcher = new ZooKeeperWatcher() - private var reconnectAttempts = 0 - private var closed = false - - /** Connect to ZooKeeper to start the session. Must be called before anything else. */ - def connect() { - connectToZooKeeper() - - new Thread() { - override def run() = sessionMonitorThread() - }.start() - } - - def sessionMonitorThread(): Unit = { - while (!closed) { - Thread.sleep(ZK_CHECK_PERIOD_MILLIS) - if (zk.getState != ZooKeeper.States.CONNECTED) { - reconnectAttempts += 1 - val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts - if (attemptsLeft <= 0) { - logError("Could not connect to ZooKeeper: system failure") - zkWatcher.zkDown() - close() - } else { - logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...") - connectToZooKeeper() - } - } - } - } - - def close() { - if (!closed && zk != null) { zk.close() } - closed = true - } - - private def connectToZooKeeper() { - if (zk != null) zk.close() - zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher) - } - - /** - * Attempts to maintain a live ZooKeeper exception despite (very) transient failures. - * Mainly useful for handling the natural ZooKeeper session expiration. - */ - private class ZooKeeperWatcher extends Watcher { - def process(event: WatchedEvent) { - if (closed) { return } - - event.getState match { - case KeeperState.SyncConnected => - reconnectAttempts = 0 - zkWatcher.zkSessionCreated() - case KeeperState.Expired => - connectToZooKeeper() - case KeeperState.Disconnected => - logWarning("ZooKeeper disconnected, will retry...") - case s => // Do nothing - } - } - } - - def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = { - retry { - zk.create(path, bytes, ZK_ACL, createMode) - } - } - - def exists(path: String, watcher: Watcher = null): Stat = { - retry { - zk.exists(path, watcher) - } - } - - def getChildren(path: String, watcher: Watcher = null): List[String] = { - retry { - zk.getChildren(path, watcher).toList - } - } - - def getData(path: String): Array[Byte] = { - retry { - zk.getData(path, false, null) - } - } - - def delete(path: String, version: Int = -1): Unit = { - retry { - zk.delete(path, version) - } - } - - /** - * Creates the given directory (non-recursively) if it doesn't exist. - * All znodes are created in PERSISTENT mode with no data. - */ - def mkdir(path: String) { - if (exists(path) == null) { - try { - create(path, "".getBytes, CreateMode.PERSISTENT) - } catch { - case e: Exception => - // If the exception caused the directory not to be created, bubble it up, - // otherwise ignore it. - if (exists(path) == null) { throw e } - } - } - } - - /** - * Recursively creates all directories up to the given one. - * All znodes are created in PERSISTENT mode with no data. - */ - def mkdirRecursive(path: String) { - var fullDir = "" - for (dentry <- path.split("/").tail) { - fullDir += "/" + dentry - mkdir(fullDir) - } - } - - /** - * Retries the given function up to 3 times. The assumption is that failure is transient, - * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist), - * in which case the exception will be thrown without retries. - * - * @param fn Block to execute, possibly multiple times. - */ - def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = { - try { - fn - } catch { - case e: KeeperException.NoNodeException => throw e - case e: KeeperException.NodeExistsException => throw e - case e: Exception if n > 0 => - logError("ZooKeeper exception, " + n + " more retries...", e) - Thread.sleep(RETRY_WAIT_MILLIS) - retry(fn, n-1) - } - } -} - -trait SparkZooKeeperWatcher { - /** - * Called whenever a ZK session is created -- - * this will occur when we create our first session as well as each time - * the session expires or errors out. - */ - def zkSessionCreated() - - /** - * Called if ZK appears to be completely down (i.e., not just a transient error). - * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead. - */ - def zkDown() -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 47b8f67f8a..285f9b014e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -18,105 +18,67 @@ package org.apache.spark.deploy.master import akka.actor.ActorRef -import org.apache.zookeeper._ -import org.apache.zookeeper.Watcher.Event.EventType import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.master.MasterMessages._ +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String, conf: SparkConf) - extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { + extends LeaderElectionAgent with LeaderLatchListener with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" - private val watcher = new ZooKeeperWatcher() - private val zk = new SparkZooKeeperSession(this, conf) + private var zk: CuratorFramework = _ + private var leaderLatch: LeaderLatch = _ private var status = LeadershipStatus.NOT_LEADER - private var myLeaderFile: String = _ - private var leaderUrl: String = _ override def preStart() { + logInfo("Starting ZooKeeper LeaderElection agent") - zk.connect() - } + zk = SparkCuratorUtil.newClient(conf) + leaderLatch = new LeaderLatch(zk, WORKING_DIR) + leaderLatch.addListener(this) - override def zkSessionCreated() { - synchronized { - zk.mkdirRecursive(WORKING_DIR) - myLeaderFile = - zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) - self ! CheckLeader - } + leaderLatch.start() } override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) { - logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason) - Thread.sleep(zk.ZK_TIMEOUT_MILLIS) + logError("LeaderElectionAgent failed...", reason) super.preRestart(reason, message) } - override def zkDown() { - logError("ZooKeeper down! LeaderElectionAgent shutting down Master.") - System.exit(1) - } - override def postStop() { + leaderLatch.close() zk.close() } override def receive = { - case CheckLeader => checkLeader() + case _ => } - private class ZooKeeperWatcher extends Watcher { - def process(event: WatchedEvent) { - if (event.getType == EventType.NodeDeleted) { - logInfo("Leader file disappeared, a master is down!") - self ! CheckLeader + override def isLeader() { + synchronized { + // could have lost leadership by now. + if (!leaderLatch.hasLeadership) { + return } - } - } - /** Uses ZK leader election. Navigates several ZK potholes along the way. */ - def checkLeader() { - val masters = zk.getChildren(WORKING_DIR).toList - val leader = masters.sorted.head - val leaderFile = WORKING_DIR + "/" + leader - - // Setup a watch for the current leader. - zk.exists(leaderFile, watcher) - - try { - leaderUrl = new String(zk.getData(leaderFile)) - } catch { - // A NoNodeException may be thrown if old leader died since the start of this method call. - // This is fine -- just check again, since we're guaranteed to see the new values. - case e: KeeperException.NoNodeException => - logInfo("Leader disappeared while reading it -- finding next leader") - checkLeader() - return + logInfo("We have gained leadership") + updateLeadershipStatus(true) } + } - // Synchronization used to ensure no interleaving between the creation of a new session and the - // checking of a leader, which could cause us to delete our real leader file erroneously. + override def notLeader() { synchronized { - val isLeader = myLeaderFile == leaderFile - if (!isLeader && leaderUrl == masterUrl) { - // We found a different master file pointing to this process. - // This can happen in the following two cases: - // (1) The master process was restarted on the same node. - // (2) The ZK server died between creating the file and returning the name of the file. - // For this case, we will end up creating a second file, and MUST explicitly delete the - // first one, since our ZK session is still open. - // Note that this deletion will cause a NodeDeleted event to be fired so we check again for - // leader changes. - assert(leaderFile < myLeaderFile) - logWarning("Cleaning up old ZK master election file that points to this master.") - zk.delete(leaderFile) - } else { - updateLeadershipStatus(isLeader) + // could have gained leadership by now. + if (leaderLatch.hasLeadership) { + return } + + logInfo("We have lost leadership") + updateLeadershipStatus(false) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 48b2fc06a9..939006239d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -17,36 +17,28 @@ package org.apache.spark.deploy.master +import scala.collection.JavaConversions._ + import akka.serialization.Serialization -import org.apache.zookeeper._ +import org.apache.zookeeper.CreateMode import org.apache.spark.{Logging, SparkConf} class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) extends PersistenceEngine - with SparkZooKeeperWatcher with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" + val zk = SparkCuratorUtil.newClient(conf) - val zk = new SparkZooKeeperSession(this, conf) - - zk.connect() - - override def zkSessionCreated() { - zk.mkdirRecursive(WORKING_DIR) - } - - override def zkDown() { - logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.") - } + SparkCuratorUtil.mkdir(zk, WORKING_DIR) override def addApplication(app: ApplicationInfo) { serializeIntoFile(WORKING_DIR + "/app_" + app.id, app) } override def removeApplication(app: ApplicationInfo) { - zk.delete(WORKING_DIR + "/app_" + app.id) + zk.delete().forPath(WORKING_DIR + "/app_" + app.id) } override def addDriver(driver: DriverInfo) { @@ -54,7 +46,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) } override def removeDriver(driver: DriverInfo) { - zk.delete(WORKING_DIR + "/driver_" + driver.id) + zk.delete().forPath(WORKING_DIR + "/driver_" + driver.id) } override def addWorker(worker: WorkerInfo) { @@ -62,7 +54,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) } override def removeWorker(worker: WorkerInfo) { - zk.delete(WORKING_DIR + "/worker_" + worker.id) + zk.delete().forPath(WORKING_DIR + "/worker_" + worker.id) } override def close() { @@ -70,7 +62,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) } override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { - val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted + val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted val appFiles = sortedFiles.filter(_.startsWith("app_")) val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) val driverFiles = sortedFiles.filter(_.startsWith("driver_")) @@ -83,11 +75,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) private def serializeIntoFile(path: String, value: AnyRef) { val serializer = serialization.findSerializerFor(value) val serialized = serializer.toBinary(value) - zk.create(path, serialized, CreateMode.PERSISTENT) + zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized) } def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = { - val fileData = zk.getData(WORKING_DIR + "/" + filename) + val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename) val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) serializer.fromBinary(fileData).asInstanceOf[T] diff --git a/pom.xml b/pom.xml index 3a530685b8..4f1e8398d9 100644 --- a/pom.xml +++ b/pom.xml @@ -393,9 +393,9 @@ test - org.apache.zookeeper - zookeeper - 3.4.5 + org.apache.curator + curator-recipes + 2.4.0 org.jboss.netty diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f0d2e74148..220894affb 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -277,7 +277,7 @@ object SparkBuild extends Build { "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib, excludeCommonsLogging, excludeSLF4J), "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), - "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), + "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", -- cgit v1.2.3