aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacek Lewandowski <lewandowski.jacek@gmail.com>2015-05-08 11:38:09 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-08 11:38:09 -0700
commit89d94878fd26b8431f7ec814730832de5832de17 (patch)
tree9ab212a3c91ce0ba8080c9d7719a314a655273fb
parent4f01f5b563819e2ce7d3ac7ea86162b4e76935a3 (diff)
downloadspark-89d94878fd26b8431f7ec814730832de5832de17.tar.gz
spark-89d94878fd26b8431f7ec814730832de5832de17.tar.bz2
spark-89d94878fd26b8431f7ec814730832de5832de17.zip
[SPARK-7436] Fixed instantiation of custom recovery mode factory and added tests
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Closes #5976 from jacek-lewandowski/SPARK-7436-1.4 and squashes the following commits: 6298313 [Jacek Lewandowski] SPARK-7436: Fixed instantiation of custom recovery mode factory and added tests
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala110
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala100
3 files changed, 208 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 53e1903a3d..fccceb3ea5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -165,7 +165,7 @@ private[master] class Master(
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
- val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
+ val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serialization])
.newInstance(conf, SerializationExtension(context.system))
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
new file mode 100644
index 0000000000..f4e56632e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file is placed in different package to make sure all of these components work well
+// when they are outside of org.apache.spark.
+package other.supplier
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import akka.serialization.Serialization
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.master._
+
+class CustomRecoveryModeFactory(
+ conf: SparkConf,
+ serialization: Serialization
+) extends StandaloneRecoveryModeFactory(conf, serialization) {
+
+ CustomRecoveryModeFactory.instantiationAttempts += 1
+
+ /**
+ * PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
+ * is handled for recovery.
+ *
+ */
+ override def createPersistenceEngine(): PersistenceEngine =
+ new CustomPersistenceEngine(serialization)
+
+ /**
+ * Create an instance of LeaderAgent that decides who gets elected as master.
+ */
+ override def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent =
+ new CustomLeaderElectionAgent(master)
+}
+
+object CustomRecoveryModeFactory {
+ @volatile var instantiationAttempts = 0
+}
+
+class CustomPersistenceEngine(serialization: Serialization) extends PersistenceEngine {
+ val data = mutable.HashMap[String, Array[Byte]]()
+
+ CustomPersistenceEngine.lastInstance = Some(this)
+
+ /**
+ * Defines how the object is serialized and persisted. Implementation will
+ * depend on the store used.
+ */
+ override def persist(name: String, obj: Object): Unit = {
+ CustomPersistenceEngine.persistAttempts += 1
+ serialization.serialize(obj) match {
+ case util.Success(bytes) => data += name -> bytes
+ case util.Failure(cause) => throw new RuntimeException(cause)
+ }
+ }
+
+ /**
+ * Defines how the object referred by its name is removed from the store.
+ */
+ override def unpersist(name: String): Unit = {
+ CustomPersistenceEngine.unpersistAttempts += 1
+ data -= name
+ }
+
+ /**
+ * Gives all objects, matching a prefix. This defines how objects are
+ * read/deserialized back.
+ */
+ override def read[T: ClassTag](prefix: String): Seq[T] = {
+ CustomPersistenceEngine.readAttempts += 1
+ val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+ val results = for ((name, bytes) <- data; if name.startsWith(prefix))
+ yield serialization.deserialize(bytes, clazz)
+
+ results.find(_.isFailure).foreach {
+ case util.Failure(cause) => throw new RuntimeException(cause)
+ }
+
+ results.flatMap(_.toOption).toSeq
+ }
+}
+
+object CustomPersistenceEngine {
+ @volatile var persistAttempts = 0
+ @volatile var unpersistAttempts = 0
+ @volatile var readAttempts = 0
+
+ @volatile var lastInstance: Option[CustomPersistenceEngine] = None
+}
+
+class CustomLeaderElectionAgent(val masterActor: LeaderElectable) extends LeaderElectionAgent {
+ masterActor.electedLeader()
+}
+
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 34c74d87f0..0faa8f650e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -17,12 +17,20 @@
package org.apache.spark.deploy.master
+import java.util.Date
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
import akka.actor.Address
-import org.scalatest.FunSuite
+import org.scalatest.{FunSuite, Matchers}
+import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
-import org.apache.spark.{SSLOptions, SparkConf, SparkException}
+import org.apache.spark.deploy._
+import org.apache.spark.{SparkConf, SparkException}
-class MasterSuite extends FunSuite {
+class MasterSuite extends FunSuite with Matchers {
test("toAkkaUrl") {
val conf = new SparkConf(loadDefaults = false)
@@ -63,4 +71,90 @@ class MasterSuite extends FunSuite {
}
assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
}
+
+ test("can use a custom recovery mode factory") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set("spark.deploy.recoveryMode", "CUSTOM")
+ conf.set("spark.deploy.recoveryMode.factory",
+ classOf[CustomRecoveryModeFactory].getCanonicalName)
+
+ val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
+
+ val commandToPersist = new Command(
+ mainClass = "",
+ arguments = Nil,
+ environment = Map.empty,
+ classPathEntries = Nil,
+ libraryPathEntries = Nil,
+ javaOpts = Nil
+ )
+
+ val appToPersist = new ApplicationInfo(
+ startTime = 0,
+ id = "test_app",
+ desc = new ApplicationDescription(
+ name = "",
+ maxCores = None,
+ memoryPerExecutorMB = 0,
+ command = commandToPersist,
+ appUiUrl = "",
+ eventLogDir = None,
+ eventLogCodec = None,
+ coresPerExecutor = None),
+ submitDate = new Date(),
+ driver = null,
+ defaultCores = 0
+ )
+
+ val driverToPersist = new DriverInfo(
+ startTime = 0,
+ id = "test_driver",
+ desc = new DriverDescription(
+ jarUrl = "",
+ mem = 0,
+ cores = 0,
+ supervise = false,
+ command = commandToPersist
+ ),
+ submitDate = new Date()
+ )
+
+ val workerToPersist = new WorkerInfo(
+ id = "test_worker",
+ host = "127.0.0.1",
+ port = 10000,
+ cores = 0,
+ memory = 0,
+ actor = null,
+ webUiPort = 0,
+ publicAddress = ""
+ )
+
+ val (actorSystem, port, uiPort, restPort) =
+ Master.startSystemAndActor("127.0.0.1", 7077, 8080, conf)
+
+ try {
+ Await.result(actorSystem.actorSelection("/user/Master").resolveOne(10 seconds), 10 seconds)
+
+ CustomPersistenceEngine.lastInstance.isDefined shouldBe true
+ val persistenceEngine = CustomPersistenceEngine.lastInstance.get
+
+ persistenceEngine.addApplication(appToPersist)
+ persistenceEngine.addDriver(driverToPersist)
+ persistenceEngine.addWorker(workerToPersist)
+
+ val (apps, drivers, workers) = persistenceEngine.readPersistedData()
+
+ apps.map(_.id) should contain(appToPersist.id)
+ drivers.map(_.id) should contain(driverToPersist.id)
+ workers.map(_.id) should contain(workerToPersist.id)
+
+ } finally {
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
+
+ CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
+ }
+
}