aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/org')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala30
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala10
2 files changed, 30 insertions, 10 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
index 48d3b41b66..c4a01eaea7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
@@ -122,23 +122,27 @@ class StateMapSuite extends SparkFunSuite {
test("OpenHashMapBasedStateMap - serializing and deserializing") {
val map1 = new OpenHashMapBasedStateMap[Int, Int]()
+ testSerialization(map1, "error deserializing and serialized empty map")
+
map1.put(1, 100, 1)
map1.put(2, 200, 2)
+ testSerialization(map1, "error deserializing and serialized map with data + no delta")
val map2 = map1.copy()
+ // Do not test compaction
+ assert(map2.asInstanceOf[OpenHashMapBasedStateMap[_, _]].shouldCompact === false)
+ testSerialization(map2, "error deserializing and serialized map with 1 delta + no new data")
+
map2.put(3, 300, 3)
map2.put(4, 400, 4)
+ testSerialization(map2, "error deserializing and serialized map with 1 delta + new data")
val map3 = map2.copy()
+ assert(map3.asInstanceOf[OpenHashMapBasedStateMap[_, _]].shouldCompact === false)
+ testSerialization(map3, "error deserializing and serialized map with 2 delta + no new data")
map3.put(3, 600, 3)
map3.remove(2)
-
- // Do not test compaction
- assert(map3.asInstanceOf[OpenHashMapBasedStateMap[_, _]].shouldCompact === false)
-
- val deser_map3 = Utils.deserialize[StateMap[Int, Int]](
- Utils.serialize(map3), Thread.currentThread().getContextClassLoader)
- assertMap(deser_map3, map3, 1, "Deserialized map not same as original map")
+ testSerialization(map3, "error deserializing and serialized map with 2 delta + new data")
}
test("OpenHashMapBasedStateMap - serializing and deserializing with compaction") {
@@ -156,11 +160,9 @@ class StateMapSuite extends SparkFunSuite {
assert(map.deltaChainLength > deltaChainThreshold)
assert(map.shouldCompact === true)
- val deser_map = Utils.deserialize[OpenHashMapBasedStateMap[Int, Int]](
- Utils.serialize(map), Thread.currentThread().getContextClassLoader)
+ val deser_map = testSerialization(map, "Deserialized + compacted map not same as original map")
assert(deser_map.deltaChainLength < deltaChainThreshold)
assert(deser_map.shouldCompact === false)
- assertMap(deser_map, map, 1, "Deserialized + compacted map not same as original map")
}
test("OpenHashMapBasedStateMap - all possible sequences of operations with copies ") {
@@ -265,6 +267,14 @@ class StateMapSuite extends SparkFunSuite {
assertMap(stateMap, refMap.toMap, time, "Final state map does not match reference map")
}
+ private def testSerialization[MapType <: StateMap[Int, Int]](
+ map: MapType, msg: String): MapType = {
+ val deserMap = Utils.deserialize[MapType](
+ Utils.serialize(map), Thread.currentThread().getContextClassLoader)
+ assertMap(deserMap, map, 1, msg)
+ deserMap
+ }
+
// Assert whether all the data and operations on a state map matches that of a reference state map
private def assertMap(
mapToTest: StateMap[Int, Int],
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala
index 0feb3af1ab..3b2d43f2ce 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala
@@ -332,6 +332,16 @@ class TrackStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with Bef
makeStateRDDWithLongLineageParenttateRDD, reliableCheckpoint = true, rddCollectFunc _)
}
+ test("checkpointing empty state RDD") {
+ val emptyStateRDD = TrackStateRDD.createFromPairRDD[Int, Int, Int, Int](
+ sc.emptyRDD[(Int, Int)], new HashPartitioner(10), Time(0))
+ emptyStateRDD.checkpoint()
+ assert(emptyStateRDD.flatMap { _.stateMap.getAll() }.collect().isEmpty)
+ val cpRDD = sc.checkpointFile[TrackStateRDDRecord[Int, Int, Int]](
+ emptyStateRDD.getCheckpointFile.get)
+ assert(cpRDD.flatMap { _.stateMap.getAll() }.collect().isEmpty)
+ }
+
/** Assert whether the `trackStateByKey` operation generates expected results */
private def assertOperation[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
testStateRDD: TrackStateRDD[K, V, S, T],