aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-11-22 14:05:38 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-22 14:05:38 -0800
commit9b2a3c6126e4fe8485e506f8a56a26cb72509a5f (patch)
tree6367cee11073628da9b4dbb27329ba87dc33bb1c /core/src
parentb5d17ef10e2509d9886c660945449a89750c8116 (diff)
downloadspark-9b2a3c6126e4fe8485e506f8a56a26cb72509a5f.tar.gz
spark-9b2a3c6126e4fe8485e506f8a56a26cb72509a5f.tar.bz2
spark-9b2a3c6126e4fe8485e506f8a56a26cb72509a5f.zip
[SPARK-4377] Fixed serialization issue by switching to akka provided serializer.
... - there is no way around this for deserializing actorRef(s). Author: Prashant Sharma <prashant.s@imaginea.com> Closes #3402 from ScrapCodes/SPARK-4377/troubleDeserializing and squashes the following commits: 77233fd [Prashant Sharma] Style fixes 9b35c6e [Prashant Sharma] Scalastyle fixes 29880da [Prashant Sharma] [SPARK-4377] Fixed serialization issue by switching to akka provided serializer - there is no way around this for deserializing actorRef(s).
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala22
4 files changed, 42 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 6ff2aa5244..36a2e2c6a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -18,12 +18,13 @@
package org.apache.spark.deploy.master
import java.io._
-import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import akka.serialization.Serialization
import org.apache.spark.Logging
-import org.apache.spark.serializer.Serializer
-import scala.reflect.ClassTag
/**
* Stores data in a single on-disk directory with one file per application and worker.
@@ -34,10 +35,9 @@ import scala.reflect.ClassTag
*/
private[spark] class FileSystemPersistenceEngine(
val dir: String,
- val serialization: Serializer)
+ val serialization: Serialization)
extends PersistenceEngine with Logging {
- val serializer = serialization.newInstance()
new File(dir).mkdir()
override def persist(name: String, obj: Object): Unit = {
@@ -56,17 +56,17 @@ private[spark] class FileSystemPersistenceEngine(
private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
-
- val out = serializer.serializeStream(new FileOutputStream(file))
+ val serializer = serialization.findSerializerFor(value)
+ val serialized = serializer.toBinary(value)
+ val out = new FileOutputStream(file)
try {
- out.writeObject(value)
+ out.write(serialized)
} finally {
out.close()
}
-
}
- def deserializeFromFile[T](file: File): T = {
+ private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
try {
@@ -74,7 +74,9 @@ private[spark] class FileSystemPersistenceEngine(
} finally {
dis.close()
}
-
- serializer.deserializeStream(dis).readObject()
+ val clazz = m.runtimeClass.asInstanceOf[Class[T]]
+ val serializer = serialization.serializerFor(clazz)
+ serializer.fromBinary(fileData).asInstanceOf[T]
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 021454e258..7b32c505de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -30,6 +30,7 @@ import scala.util.Random
import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -132,15 +133,18 @@ private[spark] class Master(
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
- val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
+ val zkFactory =
+ new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
- val fsFactory = new FileSystemRecoveryModeFactory(conf)
+ val fsFactory =
+ new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
- val factory = clazz.getConstructor(conf.getClass)
- .newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
+ val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
+ .newInstance(conf, SerializationExtension(context.system))
+ .asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index d9d36c1ed5..1096eb0368 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -17,9 +17,10 @@
package org.apache.spark.deploy.master
+import akka.serialization.Serialization
+
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.serializer.JavaSerializer
/**
* ::DeveloperApi::
@@ -29,7 +30,7 @@ import org.apache.spark.serializer.JavaSerializer
*
*/
@DeveloperApi
-abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
+abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serialization) {
/**
* PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
@@ -48,21 +49,21 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
* LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
* recovery is made by restoring from filesystem.
*/
-private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf)
- extends StandaloneRecoveryModeFactory(conf) with Logging {
+private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
+ extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
def createPersistenceEngine() = {
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
- new FileSystemPersistenceEngine(RECOVERY_DIR, new JavaSerializer(conf))
+ new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
}
def createLeaderElectionAgent(master: LeaderElectable) = new MonarchyLeaderAgent(master)
}
-private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf)
- extends StandaloneRecoveryModeFactory(conf) {
- def createPersistenceEngine() = new ZooKeeperPersistenceEngine(new JavaSerializer(conf), conf)
+private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
+ extends StandaloneRecoveryModeFactory(conf, serializer) {
+ def createPersistenceEngine() = new ZooKeeperPersistenceEngine(conf, serializer)
def createLeaderElectionAgent(master: LeaderElectable) =
new ZooKeeperLeaderElectionAgent(master, conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 96c2139eb0..e11ac031fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,27 +17,24 @@
package org.apache.spark.deploy.master
+import akka.serialization.Serialization
+
import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.serializer.Serializer
-import java.nio.ByteBuffer
-import scala.reflect.ClassTag
-
-private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, conf: SparkConf)
+private[spark] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
extends PersistenceEngine
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
- val serializer = serialization.newInstance()
-
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
@@ -59,14 +56,17 @@ private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, c
}
private def serializeIntoFile(path: String, value: AnyRef) {
- val serialized = serializer.serialize(value)
- zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized.array())
+ val serializer = serialization.findSerializerFor(value)
+ val serialized = serializer.toBinary(value)
+ zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
}
- def deserializeFromFile[T](filename: String): Option[T] = {
+ def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
+ val clazz = m.runtimeClass.asInstanceOf[Class[T]]
+ val serializer = serialization.serializerFor(clazz)
try {
- Some(serializer.deserialize(ByteBuffer.wrap(fileData)))
+ Some(serializer.fromBinary(fileData).asInstanceOf[T])
} catch {
case e: Exception => {
logWarning("Exception while reading persisted file, deleting", e)