aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-07 17:46:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-07 17:46:24 -0800
commit28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4 (patch)
treeffe4125cab4f5520a2b1c4159c84c1d26cfc59a1 /streaming/src/test
parentc94199e977279d9b4658297e8108b46bdf30157b (diff)
downloadspark-28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4.tar.gz
spark-28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4.tar.bz2
spark-28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4.zip
[SPARK-12591][STREAMING] Register OpenHashMapBasedStateMap for Kryo
The default serializer in Kryo is FieldSerializer and it ignores transient fields and never calls `writeObject` or `readObject`. So we should register OpenHashMapBasedStateMap using `DefaultSerializer` to make it work with Kryo. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10609 from zsxwing/SPARK-12591.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala96
1 files changed, 83 insertions, 13 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
index c4a01eaea7..ea32bbf95c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
@@ -17,15 +17,23 @@
package org.apache.spark.streaming
+import org.apache.spark.streaming.rdd.MapWithStateRDDRecord
+
import scala.collection.{immutable, mutable, Map}
+import scala.reflect.ClassTag
import scala.util.Random
-import org.apache.spark.SparkFunSuite
+import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
+import com.esotericsoftware.kryo.io.{Output, Input}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer._
import org.apache.spark.streaming.util.{EmptyStateMap, OpenHashMapBasedStateMap, StateMap}
-import org.apache.spark.util.Utils
class StateMapSuite extends SparkFunSuite {
+ private val conf = new SparkConf()
+
test("EmptyStateMap") {
val map = new EmptyStateMap[Int, Int]
intercept[scala.NotImplementedError] {
@@ -128,17 +136,17 @@ class StateMapSuite extends SparkFunSuite {
map1.put(2, 200, 2)
testSerialization(map1, "error deserializing and serialized map with data + no delta")
- val map2 = map1.copy()
+ val map2 = map1.copy().asInstanceOf[OpenHashMapBasedStateMap[Int, Int]]
// Do not test compaction
- assert(map2.asInstanceOf[OpenHashMapBasedStateMap[_, _]].shouldCompact === false)
+ assert(map2.shouldCompact === false)
testSerialization(map2, "error deserializing and serialized map with 1 delta + no new data")
map2.put(3, 300, 3)
map2.put(4, 400, 4)
testSerialization(map2, "error deserializing and serialized map with 1 delta + new data")
- val map3 = map2.copy()
- assert(map3.asInstanceOf[OpenHashMapBasedStateMap[_, _]].shouldCompact === false)
+ val map3 = map2.copy().asInstanceOf[OpenHashMapBasedStateMap[Int, Int]]
+ assert(map3.shouldCompact === false)
testSerialization(map3, "error deserializing and serialized map with 2 delta + no new data")
map3.put(3, 600, 3)
map3.remove(2)
@@ -267,18 +275,25 @@ class StateMapSuite extends SparkFunSuite {
assertMap(stateMap, refMap.toMap, time, "Final state map does not match reference map")
}
- private def testSerialization[MapType <: StateMap[Int, Int]](
- map: MapType, msg: String): MapType = {
- val deserMap = Utils.deserialize[MapType](
- Utils.serialize(map), Thread.currentThread().getContextClassLoader)
+ private def testSerialization[T: ClassTag](
+ map: OpenHashMapBasedStateMap[T, T], msg: String): OpenHashMapBasedStateMap[T, T] = {
+ testSerialization(new JavaSerializer(conf), map, msg)
+ testSerialization(new KryoSerializer(conf), map, msg)
+ }
+
+ private def testSerialization[T : ClassTag](
+ serializer: Serializer,
+ map: OpenHashMapBasedStateMap[T, T],
+ msg: String): OpenHashMapBasedStateMap[T, T] = {
+ val deserMap = serializeAndDeserialize(serializer, map)
assertMap(deserMap, map, 1, msg)
deserMap
}
// Assert whether all the data and operations on a state map matches that of a reference state map
- private def assertMap(
- mapToTest: StateMap[Int, Int],
- refMapToTestWith: StateMap[Int, Int],
+ private def assertMap[T](
+ mapToTest: StateMap[T, T],
+ refMapToTestWith: StateMap[T, T],
time: Long,
msg: String): Unit = {
withClue(msg) {
@@ -321,4 +336,59 @@ class StateMapSuite extends SparkFunSuite {
}
}
}
+
+ test("OpenHashMapBasedStateMap - serializing and deserializing with KryoSerializable states") {
+ val map = new OpenHashMapBasedStateMap[KryoState, KryoState]()
+ map.put(new KryoState("a"), new KryoState("b"), 1)
+ testSerialization(
+ new KryoSerializer(conf), map, "error deserializing and serialized KryoSerializable states")
+ }
+
+ test("EmptyStateMap - serializing and deserializing") {
+ val map = StateMap.empty[KryoState, KryoState]
+ // Since EmptyStateMap doesn't contains any date, KryoState won't break JavaSerializer.
+ assert(serializeAndDeserialize(new JavaSerializer(conf), map).
+ isInstanceOf[EmptyStateMap[KryoState, KryoState]])
+ assert(serializeAndDeserialize(new KryoSerializer(conf), map).
+ isInstanceOf[EmptyStateMap[KryoState, KryoState]])
+ }
+
+ test("MapWithStateRDDRecord - serializing and deserializing with KryoSerializable states") {
+ val map = new OpenHashMapBasedStateMap[KryoState, KryoState]()
+ map.put(new KryoState("a"), new KryoState("b"), 1)
+
+ val record =
+ MapWithStateRDDRecord[KryoState, KryoState, KryoState](map, Seq(new KryoState("c")))
+ val deserRecord = serializeAndDeserialize(new KryoSerializer(conf), record)
+ assert(!(record eq deserRecord))
+ assert(record.stateMap.getAll().toSeq === deserRecord.stateMap.getAll().toSeq)
+ assert(record.mappedData === deserRecord.mappedData)
+ }
+
+ private def serializeAndDeserialize[T: ClassTag](serializer: Serializer, t: T): T = {
+ val serializerInstance = serializer.newInstance()
+ serializerInstance.deserialize[T](
+ serializerInstance.serialize(t), Thread.currentThread().getContextClassLoader)
+ }
+}
+
+/** A class that only supports Kryo serialization. */
+private[streaming] final class KryoState(var state: String) extends KryoSerializable {
+
+ override def write(kryo: Kryo, output: Output): Unit = {
+ kryo.writeClassAndObject(output, state)
+ }
+
+ override def read(kryo: Kryo, input: Input): Unit = {
+ state = kryo.readClassAndObject(input).asInstanceOf[String]
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case that: KryoState => state == that.state
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ if (state == null) 0 else state.hashCode()
+ }
}