aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-04 12:12:06 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-04 12:12:06 -0800
commitd1542387891018914fdd6b647f17f0b05acdd40e (patch)
tree51d0bdbd9014daa6f6f87bb9547acdf110300463 /streaming/src/test/scala
parent596154eabe51961733789a18a47067748fb72e8e (diff)
downloadspark-d1542387891018914fdd6b647f17f0b05acdd40e.tar.gz
spark-d1542387891018914fdd6b647f17f0b05acdd40e.tar.bz2
spark-d1542387891018914fdd6b647f17f0b05acdd40e.zip
Made checkpointing of dstream graph to work with checkpointing of RDDs. For streams requiring checkpointing of its RDD, the default checkpoint interval is set to 10 seconds.
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala77
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala37
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala4
3 files changed, 92 insertions, 26 deletions
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 6dcedcf463..dfe31b5771 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -2,52 +2,95 @@ package spark.streaming
import spark.streaming.StreamingContext._
import java.io.File
+import collection.mutable.ArrayBuffer
+import runtime.RichInt
+import org.scalatest.BeforeAndAfter
+import org.apache.hadoop.fs.Path
+import org.apache.commons.io.FileUtils
-class CheckpointSuite extends TestSuiteBase {
+class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+
+ before {
+ FileUtils.deleteDirectory(new File(checkpointDir))
+ }
+
+ after {
+ FileUtils.deleteDirectory(new File(checkpointDir))
+ }
override def framework() = "CheckpointSuite"
- override def checkpointFile() = "checkpoint"
+ override def batchDuration() = Seconds(1)
+
+ override def checkpointDir() = "checkpoint"
+
+ override def checkpointInterval() = batchDuration
def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
- useSet: Boolean = false
+ initialNumBatches: Int
) {
// Current code assumes that:
// number of inputs = number of outputs = number of batches to be run
-
val totalNumBatches = input.size
- val initialNumBatches = input.size / 2
val nextNumBatches = totalNumBatches - initialNumBatches
val initialNumExpectedOutputs = initialNumBatches
+ val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs
// Do half the computation (half the number of batches), create checkpoint file and quit
val ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
- verifyOutput[V](output, expectedOutput.take(initialNumBatches), useSet)
+ verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
// Restart and complete the computation from checkpoint file
- val sscNew = new StreamingContext(checkpointFile)
- sscNew.setCheckpointDetails(null, null)
- val outputNew = runStreams[V](sscNew, nextNumBatches, expectedOutput.size)
- verifyOutput[V](outputNew, expectedOutput, useSet)
-
- new File(checkpointFile).delete()
- new File(checkpointFile + ".bk").delete()
- new File("." + checkpointFile + ".crc").delete()
- new File("." + checkpointFile + ".bk.crc").delete()
+ val sscNew = new StreamingContext(checkpointDir)
+ //sscNew.checkpoint(null, null)
+ val outputNew = runStreams[V](sscNew, nextNumBatches, nextNumExpectedOutputs)
+ verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
}
- test("simple per-batch operation") {
+
+ test("map and reduceByKey") {
testCheckpointedOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ),
- true
+ 3
)
}
+
+ test("reduceByKeyAndWindowInv") {
+ val n = 10
+ val w = 4
+ val input = (1 to n).map(x => Seq("a")).toSeq
+ val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
+ val operation = (st: DStream[String]) => {
+ st.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(w), Seconds(1))
+ }
+ for (i <- Seq(3, 5, 7)) {
+ testCheckpointedOperation(input, operation, output, i)
+ }
+ }
+
+ test("updateStateByKey") {
+ val input = (1 to 10).map(_ => Seq("a")).toSeq
+ val output = (1 to 10).map(x => Seq(("a", x))).toSeq
+ val operation = (st: DStream[String]) => {
+ val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
+ Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ }
+ st.map(x => (x, 1))
+ .updateStateByKey[RichInt](updateFunc)
+ .checkpoint(Seconds(5))
+ .map(t => (t._1, t._2.self))
+ }
+ for (i <- Seq(3, 5, 7)) {
+ testCheckpointedOperation(input, operation, output, i)
+ }
+ }
+
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index c9bc454f91..e441feea19 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -5,10 +5,16 @@ import util.ManualClock
import collection.mutable.ArrayBuffer
import org.scalatest.FunSuite
import collection.mutable.SynchronizedBuffer
+import java.io.{ObjectInputStream, IOException}
+
+/**
+ * This is a input stream just for the testsuites. This is equivalent to a checkpointable,
+ * replayable, reliable message queue like Kafka. It requires a sequence as input, and
+ * returns the i_th element at the i_th batch unde manual clock.
+ */
class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](ssc_) {
- var currentIndex = 0
def start() {}
@@ -23,17 +29,32 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
ssc.sc.makeRDD(Seq[T](), numPartitions)
}
logInfo("Created RDD " + rdd.id)
- //currentIndex += 1
Some(rdd)
}
}
+/**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ */
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- })
+ }) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
+/**
+ * This is the base trait for Spark Streaming testsuites. This provides basic functionality
+ * to run user-defined set of input on user-defined stream operations, and verify the output.
+ */
trait TestSuiteBase extends FunSuite with Logging {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
@@ -44,7 +65,7 @@ trait TestSuiteBase extends FunSuite with Logging {
def batchDuration() = Seconds(1)
- def checkpointFile() = null.asInstanceOf[String]
+ def checkpointDir() = null.asInstanceOf[String]
def checkpointInterval() = batchDuration
@@ -60,8 +81,8 @@ trait TestSuiteBase extends FunSuite with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
- if (checkpointFile != null) {
- ssc.setCheckpointDetails(checkpointFile, checkpointInterval())
+ if (checkpointDir != null) {
+ ssc.checkpoint(checkpointDir, checkpointInterval())
}
// Setup the stream computation
@@ -82,8 +103,8 @@ trait TestSuiteBase extends FunSuite with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
- if (checkpointFile != null) {
- ssc.setCheckpointDetails(checkpointFile, checkpointInterval())
+ if (checkpointDir != null) {
+ ssc.checkpoint(checkpointDir, checkpointInterval())
}
// Setup the stream computation
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index d7d8d5bd36..e282f0fdd5 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -283,7 +283,9 @@ class WindowOperationsSuite extends TestSuiteBase {
test("reduceByKeyAndWindowInv - " + name) {
val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist()
+ s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
+ .persist()
+ .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}
testOperation(input, operation, expectedOutput, numBatches, true)
}