aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-09 12:26:17 -0800
committerDenny <dennybritz@gmail.com>2012-11-09 12:26:17 -0800
commit2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89 (patch)
treeecd8e96a1e1760c75a220e90ccd422aaf8add7db /streaming/src
parente5a09367870be757a0abb3e2ad7a53e74110b033 (diff)
parentcc2a65f54715ff0990d5873d50eec0dedf64d409 (diff)
downloadspark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.tar.gz
spark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.tar.bz2
spark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.zip
Merge branch 'dev' of github.com:radlab/spark into kafka
Conflicts: streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala17
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala75
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala188
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala9
8 files changed, 263 insertions, 50 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 1643f45ffb..a70fb8f73a 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -32,7 +32,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val file = new Path(path, "graph")
val conf = new Configuration()
val fs = file.getFileSystem(conf)
- logDebug("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
if (fs.exists(file)) {
val bkFile = new Path(file.getParent, file.getName + ".bk")
FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
@@ -43,7 +43,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
oos.writeObject(this)
oos.close()
fs.close()
- logInfo("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'")
}
def toBytes(): Array[Byte] = {
@@ -58,7 +58,6 @@ object Checkpoint extends Logging {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
- var detailedLog: String = ""
attempts.foreach(file => {
if (fs.exists(file)) {
@@ -76,6 +75,7 @@ object Checkpoint extends Logging {
fs.close()
cp.validate()
logInfo("Checkpoint successfully loaded from file '" + file + "'")
+ logInfo("Checkpoint was generated at time " + cp.checkpointTime)
return cp
} catch {
case e: Exception =>
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index f891730317..3219919a24 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -297,16 +297,20 @@ extends Serializable with Logging {
* this method to save custom checkpoint data.
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
+ // Get the checkpointed RDDs from the generated RDDs
val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null)
.map(x => (x._1, x._2.getCheckpointData()))
+ // Make a copy of the existing checkpoint data
val oldRdds = checkpointData.rdds.clone()
+ // If the new checkpoint has checkpoints then replace existing with the new one
if (newRdds.size > 0) {
checkpointData.rdds.clear()
checkpointData.rdds ++= newRdds
}
-
+ // Make dependencies update their checkpoint data
dependencies.foreach(_.updateCheckpointData(currentTime))
+ // TODO: remove this, this is just for debugging
newRdds.foreach {
case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
}
@@ -321,7 +325,7 @@ extends Serializable with Logging {
}
}
}
- logInfo("Updated checkpoint data")
+ logInfo("Updated checkpoint data for time " + currentTime)
}
/**
@@ -331,6 +335,7 @@ extends Serializable with Logging {
* override the updateCheckpointData() method would also need to override this method.
*/
protected[streaming] def restoreCheckpointData() {
+ // Create RDDs from the checkpoint data
logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
checkpointData.rdds.foreach {
case(time, data) => {
@@ -339,6 +344,7 @@ extends Serializable with Logging {
}
}
dependencies.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
}
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 2b3f5a4829..de0fb1f3ad 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -29,10 +29,12 @@ extends Logging {
// on this first trigger time of the timer.
if (ssc.isCheckpointPresent) {
// If manual clock is being used for testing, then
- // set manual clock to the last checkpointed time
+ // either set the manual clock to the last checkpointed time,
+ // or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds
- clock.asInstanceOf[ManualClock].setTime(lastTime)
+ val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
timer.restart(graph.zeroTime.milliseconds)
logInfo("Scheduler's timer restarted")
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 770fd61498..d68d2632e7 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -18,7 +18,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
-class StreamingContext (
+final class StreamingContext (
sc_ : SparkContext,
cp_ : Checkpoint
) extends Logging {
@@ -61,12 +61,12 @@ class StreamingContext (
}
}
- val nextNetworkInputStreamId = new AtomicInteger(0)
- var networkInputTracker: NetworkInputTracker = null
+ private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
+ private[streaming] var networkInputTracker: NetworkInputTracker = null
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
- sc.setCheckpointDir(cp_.checkpointDir, true)
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
cp_.checkpointDir
} else {
null
@@ -87,7 +87,7 @@ class StreamingContext (
def checkpoint(dir: String, interval: Time) {
if (dir != null) {
- sc.setCheckpointDir(new Path(dir, "rdds-" + UUID.randomUUID.toString).toString)
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
checkpointDir = dir
checkpointInterval = interval
} else {
@@ -240,8 +240,11 @@ class StreamingContext (
}
def doCheckpoint(currentTime: Time) {
+ val startTime = System.currentTimeMillis()
graph.updateCheckpointData(currentTime)
new Checkpoint(this, currentTime).save(checkpointDir)
+ val stopTime = System.currentTimeMillis()
+ logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
}
}
@@ -260,5 +263,9 @@ object StreamingContext {
prefix + "-" + time.milliseconds + "." + suffix
}
}
+
+ def getSparkCheckpointDir(sscCheckpointDir: String): String = {
+ new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
+ }
}
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 9fdfd50be2..038827ddb0 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -52,15 +52,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
.checkpoint(stateStreamCheckpointInterval)
.map(t => (t._1, t._2.self))
}
- val ssc = setupStreams(input, operation)
- val stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
+ var ssc = setupStreams(input, operation)
+ var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
- // Run till a time such that at least one RDD in the stream should have been checkpointed
+ // Run till a time such that at least one RDD in the stream should have been checkpointed,
+ // then check whether some RDD has been checkpointed or not
ssc.start()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- advanceClock(clock, firstNumBatches)
-
- // Check whether some RDD has been checkpointed or not
+ runStreamsWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.mkString(",\n") + "]")
assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.foreach {
@@ -73,42 +71,45 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.map(x => new File(x._2.toString))
- advanceClock(clock, secondNumBatches)
+ runStreamsWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
+ ssc.stop()
// Restart stream computation using the checkpoint file and check whether
// checkpointed RDDs have been restored or not
- ssc.stop()
- val sscNew = new StreamingContext(checkpointDir)
- val stateStreamNew = sscNew.graph.getOutputStreams().head.dependencies.head.dependencies.head
- logInfo("Restored data of state stream = \n[" + stateStreamNew.generatedRDDs.mkString("\n") + "]")
- assert(!stateStreamNew.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
+ ssc = new StreamingContext(checkpointDir)
+ stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
+ logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
+ assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
- // Run one batch to generate a new checkpoint file
- sscNew.start()
- val clockNew = sscNew.scheduler.clock.asInstanceOf[ManualClock]
- advanceClock(clockNew, 1)
-
- // Check whether some RDD is present in the checkpoint data or not
- assert(!stateStreamNew.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ // Run one batch to generate a new checkpoint file and check whether some RDD
+ // is present in the checkpoint data or not
+ ssc.start()
+ runStreamsWithRealDelay(ssc, 1)
+ assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.foreach {
case (time, data) => {
val file = new File(data.toString)
- assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
+ assert(file.exists(),
+ "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
}
}
+ ssc.stop()
// Restart stream computation from the new checkpoint file to see whether that file has
// correct checkpoint data
- sscNew.stop()
- val sscNewNew = new StreamingContext(checkpointDir)
- val stateStreamNewNew = sscNew.graph.getOutputStreams().head.dependencies.head.dependencies.head
- logInfo("Restored data of state stream = \n[" + stateStreamNew.generatedRDDs.mkString("\n") + "]")
- assert(!stateStreamNewNew.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
- sscNewNew.start()
- advanceClock(sscNewNew.scheduler.clock.asInstanceOf[ManualClock], 1)
- sscNewNew.stop()
+ ssc = new StreamingContext(checkpointDir)
+ stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
+ logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
+ assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
+
+ // Adjust manual clock time as if it is being restarted after a delay
+ System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
+ ssc.start()
+ runStreamsWithRealDelay(ssc, 4)
+ ssc.stop()
+ System.clearProperty("spark.streaming.manualClock.jump")
}
test("map and reduceByKey") {
@@ -123,10 +124,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
test("reduceByKeyAndWindowInv") {
val n = 10
val w = 4
- val input = (1 to n).map(x => Seq("a")).toSeq
+ val input = (1 to n).map(_ => Seq("a")).toSeq
val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
val operation = (st: DStream[String]) => {
- st.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, batchDuration * 4, batchDuration)
+ st.map(x => (x, 1))
+ .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
+ .checkpoint(Seconds(2))
}
for (i <- Seq(2, 3, 4)) {
testCheckpointedOperation(input, operation, output, i)
@@ -184,7 +187,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
}
- def advanceClock(clock: ManualClock, numBatches: Long) {
+ /**
+ * Advances the manual clock on the streaming scheduler by given number of batches.
+ * It also wait for the expected amount of time for each batch.
+ */
+
+
+ def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
clock.addToTime(batchDuration.milliseconds)
@@ -193,4 +203,5 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
}
+
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
new file mode 100644
index 0000000000..5b414117fc
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -0,0 +1,188 @@
+package spark.streaming
+
+import org.scalatest.BeforeAndAfter
+import org.apache.commons.io.FileUtils
+import java.io.File
+import scala.runtime.RichInt
+import scala.util.Random
+import spark.streaming.StreamingContext._
+import collection.mutable.ArrayBuffer
+import spark.Logging
+
+/**
+ * This testsuite tests master failures at random times while the stream is running using
+ * the real clock.
+ */
+class FailureSuite extends TestSuiteBase with BeforeAndAfter {
+
+ before {
+ FileUtils.deleteDirectory(new File(checkpointDir))
+ }
+
+ after {
+ FailureSuite.reset()
+ FileUtils.deleteDirectory(new File(checkpointDir))
+ }
+
+ override def framework = "CheckpointSuite"
+
+ override def batchDuration = Milliseconds(500)
+
+ override def checkpointDir = "checkpoint"
+
+ override def checkpointInterval = batchDuration
+
+ test("multiple failures with updateStateByKey") {
+ val n = 30
+ // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
+ val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
+ // Last output: [ (a, 465) ] for n=30
+ val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
+
+ val operation = (st: DStream[String]) => {
+ val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
+ Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ }
+ st.map(x => (x, 1))
+ .updateStateByKey[RichInt](updateFunc)
+ .checkpoint(Seconds(2))
+ .map(t => (t._1, t._2.self))
+ }
+
+ testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+ }
+
+ test("multiple failures with reduceByKeyAndWindow") {
+ val n = 30
+ val w = 100
+ assert(w > n, "Window should be much larger than the number of input sets in this test")
+ // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
+ val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
+ // Last output: [ (a, 465) ]
+ val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
+
+ val operation = (st: DStream[String]) => {
+ st.map(x => (x, 1))
+ .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
+ .checkpoint(Seconds(2))
+ }
+
+ testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+ }
+
+
+ /**
+ * Tests stream operation with multiple master failures, and verifies whether the
+ * final set of output values is as expected or not. Checking the final value is
+ * proof that no intermediate data was lost due to master failures.
+ */
+ def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest](
+ input: Seq[Seq[U]],
+ operation: DStream[U] => DStream[V],
+ lastExpectedOutput: Seq[V],
+ numBatches: Int,
+ numExpectedOutput: Int
+ ) {
+ var ssc = setupStreams[U, V](input, operation)
+ val mergedOutput = new ArrayBuffer[Seq[V]]()
+
+ var totalTimeRan = 0L
+ while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) {
+ new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start()
+ val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput)
+
+ mergedOutput ++= output
+ totalTimeRan += timeRan
+ logInfo("New output = " + output)
+ logInfo("Merged output = " + mergedOutput)
+ logInfo("Total time spent = " + totalTimeRan)
+ val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8)
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Restarting stream computation in " + sleepTime + " ms " +
+ "\n-------------------------------------------\n"
+ )
+ Thread.sleep(sleepTime)
+ FailureSuite.failed = false
+ ssc = new StreamingContext(checkpointDir)
+ }
+ ssc.stop()
+ ssc = null
+
+ // Verify whether the last output is the expected one
+ val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty))
+ assert(lastOutput.toSet === lastExpectedOutput.toSet)
+ logInfo("Finished computation after " + FailureSuite.failureCount + " failures")
+ }
+
+ /**
+ * Runs the streams set up in `ssc` on real clock until the expected max number of
+ */
+ def runStreamsWithRealClock[V: ClassManifest](
+ ssc: StreamingContext,
+ numBatches: Int,
+ maxExpectedOutput: Int
+ ): (Seq[Seq[V]], Long) = {
+
+ System.clearProperty("spark.streaming.clock")
+
+ assert(numBatches > 0, "Number of batches to run stream computation is zero")
+ assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero")
+ logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput)
+
+ // Get the output buffer
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ val output = outputStream.output
+ val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong
+ val startTime = System.currentTimeMillis()
+
+ try {
+ // Start computation
+ ssc.start()
+
+ // Wait until expected number of output items have been generated
+ while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) {
+ logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput)
+ Thread.sleep(100)
+ }
+ } catch {
+ case e: Exception => logInfo("Exception while running streams: " + e)
+ } finally {
+ ssc.stop()
+ }
+ val timeTaken = System.currentTimeMillis() - startTime
+ logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
+ (output, timeTaken)
+ }
+
+
+}
+
+object FailureSuite {
+ var failed = false
+ var failureCount = 0
+
+ def reset() {
+ failed = false
+ failureCount = 0
+ }
+}
+
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint
+ val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime)
+ logInfo("Kill wait time = " + killWaitTime)
+ Thread.sleep(killWaitTime.toLong)
+ logInfo(
+ "\n---------------------------------------\n" +
+ "Killing streaming context after " + killWaitTime + " ms" +
+ "\n---------------------------------------\n"
+ )
+ if (ssc != null) ssc.stop()
+ FailureSuite.failed = true
+ FailureSuite.failureCount += 1
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index c17254b809..8f892baab1 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -12,6 +12,8 @@ import org.apache.commons.io.FileUtils
class InputStreamsSuite extends TestSuiteBase {
+
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
test("network input stream") {
// Start the server
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index b8c7f99603..5fb5cc504c 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -23,12 +23,9 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
def compute(validTime: Time): Option[RDD[T]] = {
logInfo("Computing RDD for time " + validTime)
val index = ((validTime - zeroTime) / slideTime - 1).toInt
- val rdd = if (index < input.size) {
- ssc.sc.makeRDD(input(index), numPartitions)
- } else {
- ssc.sc.makeRDD(Seq[T](), numPartitions)
- }
- logInfo("Created RDD " + rdd.id)
+ val selectedInput = if (index < input.size) input(index) else Seq[T]()
+ val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
+ logInfo("Created RDD " + rdd.id + " with " + selectedInput)
Some(rdd)
}
}