aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-11-19 16:50:08 -0800
committerAndrew Or <andrew@databricks.com>2015-11-19 16:50:08 -0800
commitb2cecb80ece59a1c086d4ae7aeebef445a4e7299 (patch)
tree19ab897316359ae267c11d8358aeeaebc8644692 /streaming
parent880128f37e1bc0b9d98d1786670be62a06c648f2 (diff)
downloadspark-b2cecb80ece59a1c086d4ae7aeebef445a4e7299.tar.gz
spark-b2cecb80ece59a1c086d4ae7aeebef445a4e7299.tar.bz2
spark-b2cecb80ece59a1c086d4ae7aeebef445a4e7299.zip
[SPARK-11845][STREAMING][TEST] Added unit test to verify TrackStateRDD is correctly checkpointed
To make sure that all lineage is correctly truncated for TrackStateRDD when checkpointed. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9831 from tdas/SPARK-11845.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala60
1 files changed, 57 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala
index 19ef5a14f8..0feb3af1ab 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/TrackStateRDDSuite.scala
@@ -17,31 +17,40 @@
package org.apache.spark.streaming.rdd
+import java.io.File
+
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.util.OpenHashMapBasedStateMap
-import org.apache.spark.streaming.{Time, State}
-import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.streaming.{State, Time}
+import org.apache.spark.util.Utils
-class TrackStateRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
+class TrackStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with BeforeAndAfterAll {
private var sc: SparkContext = null
+ private var checkpointDir: File = _
override def beforeAll(): Unit = {
sc = new SparkContext(
new SparkConf().setMaster("local").setAppName("TrackStateRDDSuite"))
+ checkpointDir = Utils.createTempDir()
+ sc.setCheckpointDir(checkpointDir.toString)
}
override def afterAll(): Unit = {
if (sc != null) {
sc.stop()
}
+ Utils.deleteRecursively(checkpointDir)
}
+ override def sparkContext: SparkContext = sc
+
test("creation from pair RDD") {
val data = Seq((1, "1"), (2, "2"), (3, "3"))
val partitioner = new HashPartitioner(10)
@@ -278,6 +287,51 @@ class TrackStateRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
rdd7, Seq(("k3", 2)), Set())
}
+ test("checkpointing") {
+ /**
+ * This tests whether the TrackStateRDD correctly truncates any references to its parent RDDs -
+ * the data RDD and the parent TrackStateRDD.
+ */
+ def rddCollectFunc(rdd: RDD[TrackStateRDDRecord[Int, Int, Int]])
+ : Set[(List[(Int, Int, Long)], List[Int])] = {
+ rdd.map { record => (record.stateMap.getAll().toList, record.emittedRecords.toList) }
+ .collect.toSet
+ }
+
+ /** Generate TrackStateRDD with data RDD having a long lineage */
+ def makeStateRDDWithLongLineageDataRDD(longLineageRDD: RDD[Int])
+ : TrackStateRDD[Int, Int, Int, Int] = {
+ TrackStateRDD.createFromPairRDD(longLineageRDD.map { _ -> 1}, partitioner, Time(0))
+ }
+
+ testRDD(
+ makeStateRDDWithLongLineageDataRDD, reliableCheckpoint = true, rddCollectFunc _)
+ testRDDPartitions(
+ makeStateRDDWithLongLineageDataRDD, reliableCheckpoint = true, rddCollectFunc _)
+
+ /** Generate TrackStateRDD with parent state RDD having a long lineage */
+ def makeStateRDDWithLongLineageParenttateRDD(
+ longLineageRDD: RDD[Int]): TrackStateRDD[Int, Int, Int, Int] = {
+
+ // Create a TrackStateRDD that has a long lineage using the data RDD with a long lineage
+ val stateRDDWithLongLineage = makeStateRDDWithLongLineageDataRDD(longLineageRDD)
+
+ // Create a new TrackStateRDD, with the lineage lineage TrackStateRDD as the parent
+ new TrackStateRDD[Int, Int, Int, Int](
+ stateRDDWithLongLineage,
+ stateRDDWithLongLineage.sparkContext.emptyRDD[(Int, Int)].partitionBy(partitioner),
+ (time: Time, key: Int, value: Option[Int], state: State[Int]) => None,
+ Time(10),
+ None
+ )
+ }
+
+ testRDD(
+ makeStateRDDWithLongLineageParenttateRDD, reliableCheckpoint = true, rddCollectFunc _)
+ testRDDPartitions(
+ makeStateRDDWithLongLineageParenttateRDD, reliableCheckpoint = true, rddCollectFunc _)
+ }
+
/** Assert whether the `trackStateByKey` operation generates expected results */
private def assertOperation[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](
testStateRDD: TrackStateRDD[K, V, S, T],