aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-03-02 01:00:42 -0800
committerReynold Xin <rxin@apache.org>2014-03-02 01:00:42 -0800
commit46bcb9551eb918ac4a31cd4cca924b432f6dc352 (patch)
tree67fa627d64f755f3c974e9912cab8c14b7599bca /core
parent1fd2bfd3dd6c27a54880f0d7a658b39f358aa804 (diff)
downloadspark-46bcb9551eb918ac4a31cd4cca924b432f6dc352.tar.gz
spark-46bcb9551eb918ac4a31cd4cca924b432f6dc352.tar.bz2
spark-46bcb9551eb918ac4a31cd4cca924b432f6dc352.zip
SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID
Previously, ZooKeeperPersistenceEngine would crash the whole Master process if there was stored data from a prior Spark version. Now, we just delete these files. Author: Aaron Davidson <aaron@databricks.com> Closes #4 from aarondav/zookeeper2 and squashes the following commits: fa8b40f [Aaron Davidson] SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala18
1 files changed, 13 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 939006239d..5413ff671a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -64,11 +64,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
val appFiles = sortedFiles.filter(_.startsWith("app_"))
- val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+ val apps = appFiles.map(deserializeFromFile[ApplicationInfo]).flatten
val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
- val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
+ val drivers = driverFiles.map(deserializeFromFile[DriverInfo]).flatten
val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
- val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+ val workers = workerFiles.map(deserializeFromFile[WorkerInfo]).flatten
(apps, drivers, workers)
}
@@ -78,10 +78,18 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
}
- def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
+ def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): Option[T] = {
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
- serializer.fromBinary(fileData).asInstanceOf[T]
+ try {
+ Some(serializer.fromBinary(fileData).asInstanceOf[T])
+ } catch {
+ case e: Exception => {
+ logWarning("Exception while reading persisted file, deleting", e)
+ zk.delete().forPath(WORKING_DIR + "/" + filename)
+ None
+ }
+ }
}
}