aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-11-11 09:29:48 -0800
committerAaron Davidson <aaron@databricks.com>2014-11-11 09:29:48 -0800
commitdeefd9d7377a8091a1d184b99066febd0e9f6afd (patch)
tree3c9e851051067be41466c49b862731d988c9d7f7 /core
parent6e03de304e0294017d832763fd71e642736f8c33 (diff)
downloadspark-deefd9d7377a8091a1d184b99066febd0e9f6afd.tar.gz
spark-deefd9d7377a8091a1d184b99066febd0e9f6afd.tar.bz2
spark-deefd9d7377a8091a1d184b99066febd0e9f6afd.zip
SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable
Author: Prashant Sharma <prashant.s@imaginea.com> Closes #771 from ScrapCodes/deploy-failover-pluggable and squashes the following commits: 29ba440 [Prashant Sharma] fixed a compilation error fef35ec [Prashant Sharma] Code review 57ee6f0 [Prashant Sharma] SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala62
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala70
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala69
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala56
10 files changed, 211 insertions, 150 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 6ba395be1c..ad7d81747c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.util.Utils
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
index 2ac2118688..9d3d7938c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.master
import java.util.Date
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.DriverDescription
import org.apache.spark.util.Utils
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 08a99bbe68..6ff2aa5244 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -18,10 +18,12 @@
package org.apache.spark.deploy.master
import java.io._
-
-import akka.serialization.Serialization
+import java.nio.ByteBuffer
import org.apache.spark.Logging
+import org.apache.spark.serializer.Serializer
+
+import scala.reflect.ClassTag
/**
* Stores data in a single on-disk directory with one file per application and worker.
@@ -32,65 +34,39 @@ import org.apache.spark.Logging
*/
private[spark] class FileSystemPersistenceEngine(
val dir: String,
- val serialization: Serialization)
+ val serialization: Serializer)
extends PersistenceEngine with Logging {
+ val serializer = serialization.newInstance()
new File(dir).mkdir()
- override def addApplication(app: ApplicationInfo) {
- val appFile = new File(dir + File.separator + "app_" + app.id)
- serializeIntoFile(appFile, app)
- }
-
- override def removeApplication(app: ApplicationInfo) {
- new File(dir + File.separator + "app_" + app.id).delete()
- }
-
- override def addDriver(driver: DriverInfo) {
- val driverFile = new File(dir + File.separator + "driver_" + driver.id)
- serializeIntoFile(driverFile, driver)
+ override def persist(name: String, obj: Object): Unit = {
+ serializeIntoFile(new File(dir + File.separator + name), obj)
}
- override def removeDriver(driver: DriverInfo) {
- new File(dir + File.separator + "driver_" + driver.id).delete()
+ override def unpersist(name: String): Unit = {
+ new File(dir + File.separator + name).delete()
}
- override def addWorker(worker: WorkerInfo) {
- val workerFile = new File(dir + File.separator + "worker_" + worker.id)
- serializeIntoFile(workerFile, worker)
- }
-
- override def removeWorker(worker: WorkerInfo) {
- new File(dir + File.separator + "worker_" + worker.id).delete()
- }
-
- override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
- val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
- val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
- val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
- val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
- val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
- val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
- val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
- (apps, drivers, workers)
+ override def read[T: ClassTag](prefix: String) = {
+ val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))
+ files.map(deserializeFromFile[T])
}
private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
- val serializer = serialization.findSerializerFor(value)
- val serialized = serializer.toBinary(value)
-
- val out = new FileOutputStream(file)
+ val out = serializer.serializeStream(new FileOutputStream(file))
try {
- out.write(serialized)
+ out.writeObject(value)
} finally {
out.close()
}
+
}
- def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
+ def deserializeFromFile[T](file: File): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
try {
@@ -99,8 +75,6 @@ private[spark] class FileSystemPersistenceEngine(
dis.close()
}
- val clazz = m.runtimeClass.asInstanceOf[Class[T]]
- val serializer = serialization.serializerFor(clazz)
- serializer.fromBinary(fileData).asInstanceOf[T]
+ serializer.deserializeStream(dis).readObject()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index 4433a2ec29..cf77c86d76 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -17,30 +17,27 @@
package org.apache.spark.deploy.master
-import akka.actor.{Actor, ActorRef}
-
-import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
+import org.apache.spark.annotation.DeveloperApi
/**
- * A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it
- * is the only Master serving requests.
- * In addition to the API provided, the LeaderElectionAgent will use of the following messages
- * to inform the Master of leader changes:
- * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
- * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
+ * :: DeveloperApi ::
+ *
+ * A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
*/
-private[spark] trait LeaderElectionAgent extends Actor {
- // TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
- val masterActor: ActorRef
+@DeveloperApi
+trait LeaderElectionAgent {
+ val masterActor: LeaderElectable
+ def stop() {} // to avoid noops in implementations.
}
-/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
-private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
- override def preStart() {
- masterActor ! ElectedLeader
- }
+@DeveloperApi
+trait LeaderElectable {
+ def electedLeader()
+ def revokedLeadership()
+}
- override def receive = {
- case _ =>
- }
+/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
+private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
+ extends LeaderElectionAgent {
+ masterActor.electedLeader()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2f81d472d7..021454e258 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -50,7 +50,7 @@ private[spark] class Master(
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager)
- extends Actor with ActorLogReceive with Logging {
+ extends Actor with ActorLogReceive with Logging with LeaderElectable {
import context.dispatcher // to use Akka's scheduler.schedule()
@@ -61,7 +61,6 @@ private[spark] class Master(
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
- val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
val workers = new HashSet[WorkerInfo]
@@ -103,7 +102,7 @@ private[spark] class Master(
var persistenceEngine: PersistenceEngine = _
- var leaderElectionAgent: ActorRef = _
+ var leaderElectionAgent: LeaderElectionAgent = _
private var recoveryCompletionTask: Cancellable = _
@@ -130,23 +129,24 @@ private[spark] class Master(
masterMetricsSystem.start()
applicationMetricsSystem.start()
- persistenceEngine = RECOVERY_MODE match {
+ val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
- new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
+ val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
+ (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
- logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
- new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
+ val fsFactory = new FileSystemRecoveryModeFactory(conf)
+ (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
+ case "CUSTOM" =>
+ val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
+ val factory = clazz.getConstructor(conf.getClass)
+ .newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
+ (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
- new BlackHolePersistenceEngine()
+ (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
-
- leaderElectionAgent = RECOVERY_MODE match {
- case "ZOOKEEPER" =>
- context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
- case _ =>
- context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
- }
+ persistenceEngine = persistenceEngine_
+ leaderElectionAgent = leaderElectionAgent_
}
override def preRestart(reason: Throwable, message: Option[Any]) {
@@ -165,7 +165,15 @@ private[spark] class Master(
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
- context.stop(leaderElectionAgent)
+ leaderElectionAgent.stop()
+ }
+
+ override def electedLeader() {
+ self ! ElectedLeader
+ }
+
+ override def revokedLeadership() {
+ self ! RevokedLeadership
}
override def receiveWithLogging = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
index e3640ea4f7..2e0e1e7036 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -17,6 +17,10 @@
package org.apache.spark.deploy.master
+import org.apache.spark.annotation.DeveloperApi
+
+import scala.reflect.ClassTag
+
/**
* Allows Master to persist any state that is necessary in order to recover from a failure.
* The following semantics are required:
@@ -25,36 +29,70 @@ package org.apache.spark.deploy.master
* Given these two requirements, we will have all apps and workers persisted, but
* we might not have yet deleted apps or workers that finished (so their liveness must be verified
* during recovery).
+ *
+ * The implementation of this trait defines how name-object pairs are stored or retrieved.
*/
-private[spark] trait PersistenceEngine {
- def addApplication(app: ApplicationInfo)
+@DeveloperApi
+trait PersistenceEngine {
- def removeApplication(app: ApplicationInfo)
+ /**
+ * Defines how the object is serialized and persisted. Implementation will
+ * depend on the store used.
+ */
+ def persist(name: String, obj: Object)
- def addWorker(worker: WorkerInfo)
+ /**
+ * Defines how the object referred by its name is removed from the store.
+ */
+ def unpersist(name: String)
- def removeWorker(worker: WorkerInfo)
+ /**
+ * Gives all objects, matching a prefix. This defines how objects are
+ * read/deserialized back.
+ */
+ def read[T: ClassTag](prefix: String): Seq[T]
- def addDriver(driver: DriverInfo)
+ final def addApplication(app: ApplicationInfo): Unit = {
+ persist("app_" + app.id, app)
+ }
- def removeDriver(driver: DriverInfo)
+ final def removeApplication(app: ApplicationInfo): Unit = {
+ unpersist("app_" + app.id)
+ }
+
+ final def addWorker(worker: WorkerInfo): Unit = {
+ persist("worker_" + worker.id, worker)
+ }
+
+ final def removeWorker(worker: WorkerInfo): Unit = {
+ unpersist("worker_" + worker.id)
+ }
+
+ final def addDriver(driver: DriverInfo): Unit = {
+ persist("driver_" + driver.id, driver)
+ }
+
+ final def removeDriver(driver: DriverInfo): Unit = {
+ unpersist("driver_" + driver.id)
+ }
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
- def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo])
+ final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
+ (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
+ }
def close() {}
}
private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
- override def addApplication(app: ApplicationInfo) {}
- override def removeApplication(app: ApplicationInfo) {}
- override def addWorker(worker: WorkerInfo) {}
- override def removeWorker(worker: WorkerInfo) {}
- override def addDriver(driver: DriverInfo) {}
- override def removeDriver(driver: DriverInfo) {}
-
- override def readPersistedData() = (Nil, Nil, Nil)
+
+ override def persist(name: String, obj: Object): Unit = {}
+
+ override def unpersist(name: String): Unit = {}
+
+ override def read[T: ClassTag](name: String): Seq[T] = Nil
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
new file mode 100644
index 0000000000..d9d36c1ed5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.serializer.JavaSerializer
+
+/**
+ * ::DeveloperApi::
+ *
+ * Implementation of this class can be plugged in as recovery mode alternative for Spark's
+ * Standalone mode.
+ *
+ */
+@DeveloperApi
+abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
+
+ /**
+ * PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
+ * is handled for recovery.
+ *
+ */
+ def createPersistenceEngine(): PersistenceEngine
+
+ /**
+ * Create an instance of LeaderAgent that decides who gets elected as master.
+ */
+ def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent
+}
+
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from filesystem.
+ */
+private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf)
+ extends StandaloneRecoveryModeFactory(conf) with Logging {
+ val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+
+ def createPersistenceEngine() = {
+ logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
+ new FileSystemPersistenceEngine(RECOVERY_DIR, new JavaSerializer(conf))
+ }
+
+ def createLeaderElectionAgent(master: LeaderElectable) = new MonarchyLeaderAgent(master)
+}
+
+private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf)
+ extends StandaloneRecoveryModeFactory(conf) {
+ def createPersistenceEngine() = new ZooKeeperPersistenceEngine(new JavaSerializer(conf), conf)
+
+ def createLeaderElectionAgent(master: LeaderElectable) =
+ new ZooKeeperLeaderElectionAgent(master, conf)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index d221b0f6cc..473ddc23ff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
import akka.actor.ActorRef
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
private[spark] class WorkerInfo(
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 285f9b014e..8eaa0ad948 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -24,9 +24,8 @@ import org.apache.spark.deploy.master.MasterMessages._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
-private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
- masterUrl: String, conf: SparkConf)
- extends LeaderElectionAgent with LeaderLatchListener with Logging {
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
+ conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
@@ -34,30 +33,21 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
- override def preStart() {
+ start()
+ def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
-
leaderLatch.start()
}
- override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) {
- logError("LeaderElectionAgent failed...", reason)
- super.preRestart(reason, message)
- }
-
- override def postStop() {
+ override def stop() {
leaderLatch.close()
zk.close()
}
- override def receive = {
- case _ =>
- }
-
override def isLeader() {
synchronized {
// could have lost leadership by now.
@@ -85,10 +75,10 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
- masterActor ! ElectedLeader
+ masterActor.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
- masterActor ! RevokedLeadership
+ masterActor.revokedLeadership()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 834dfedee5..96c2139eb0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -19,72 +19,54 @@ package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
-import akka.serialization.Serialization
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.serializer.Serializer
+import java.nio.ByteBuffer
-class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
+import scala.reflect.ClassTag
+
+
+private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, conf: SparkConf)
extends PersistenceEngine
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
- SparkCuratorUtil.mkdir(zk, WORKING_DIR)
-
- override def addApplication(app: ApplicationInfo) {
- serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
- }
+ val serializer = serialization.newInstance()
- override def removeApplication(app: ApplicationInfo) {
- zk.delete().forPath(WORKING_DIR + "/app_" + app.id)
- }
+ SparkCuratorUtil.mkdir(zk, WORKING_DIR)
- override def addDriver(driver: DriverInfo) {
- serializeIntoFile(WORKING_DIR + "/driver_" + driver.id, driver)
- }
- override def removeDriver(driver: DriverInfo) {
- zk.delete().forPath(WORKING_DIR + "/driver_" + driver.id)
+ override def persist(name: String, obj: Object): Unit = {
+ serializeIntoFile(WORKING_DIR + "/" + name, obj)
}
- override def addWorker(worker: WorkerInfo) {
- serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
+ override def unpersist(name: String): Unit = {
+ zk.delete().forPath(WORKING_DIR + "/" + name)
}
- override def removeWorker(worker: WorkerInfo) {
- zk.delete().forPath(WORKING_DIR + "/worker_" + worker.id)
+ override def read[T: ClassTag](prefix: String) = {
+ val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix))
+ file.map(deserializeFromFile[T]).flatten
}
override def close() {
zk.close()
}
- override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
- val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
- val appFiles = sortedFiles.filter(_.startsWith("app_"))
- val apps = appFiles.map(deserializeFromFile[ApplicationInfo]).flatten
- val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
- val drivers = driverFiles.map(deserializeFromFile[DriverInfo]).flatten
- val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
- val workers = workerFiles.map(deserializeFromFile[WorkerInfo]).flatten
- (apps, drivers, workers)
- }
-
private def serializeIntoFile(path: String, value: AnyRef) {
- val serializer = serialization.findSerializerFor(value)
- val serialized = serializer.toBinary(value)
- zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
+ val serialized = serializer.serialize(value)
+ zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized.array())
}
- def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): Option[T] = {
+ def deserializeFromFile[T](filename: String): Option[T] = {
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
- val clazz = m.runtimeClass.asInstanceOf[Class[T]]
- val serializer = serialization.serializerFor(clazz)
try {
- Some(serializer.fromBinary(fileData).asInstanceOf[T])
+ Some(serializer.deserialize(ByteBuffer.wrap(fileData)))
} catch {
case e: Exception => {
logWarning("Exception while reading persisted file, deleting", e)