diff options
author | Denny <dennybritz@gmail.com> | 2012-11-13 13:16:18 -0800 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-11-13 13:16:18 -0800 |
commit | 2aceae25be0562726a24b1b9e596e975f764cf30 (patch) | |
tree | a0cb1c645e2f00ea65ab1b35230602790bc66fe0 /streaming/src | |
parent | b6f7ba813e93916dad9dbb0f06819362a5fb7cf7 (diff) | |
parent | 26fec8f0b850e7eb0b6cfe63770f2e68cd50441b (diff) | |
download | spark-2aceae25be0562726a24b1b9e596e975f764cf30.tar.gz spark-2aceae25be0562726a24b1b9e596e975f764cf30.tar.bz2 spark-2aceae25be0562726a24b1b9e596e975f764cf30.zip |
Merge branch 'dev' into kafka
Conflicts:
streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'streaming/src')
9 files changed, 155 insertions, 123 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index a70fb8f73a..770f7b0cc0 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -5,7 +5,7 @@ import spark.{Logging, Utils} import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.conf.Configuration -import java.io.{InputStream, ObjectStreamClass, ObjectInputStream, ObjectOutputStream} +import java.io._ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) @@ -18,8 +18,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDir = ssc.checkpointDir val checkpointInterval = ssc.checkpointInterval - validate() - def validate() { assert(master != null, "Checkpoint.master is null") assert(framework != null, "Checkpoint.framework is null") @@ -27,35 +25,50 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) assert(checkpointTime != null, "Checkpoint.checkpointTime is null") logInfo("Checkpoint for time " + checkpointTime + " validated") } +} - def save(path: String) { - val file = new Path(path, "graph") - val conf = new Configuration() - val fs = file.getFileSystem(conf) - logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) +/** + * Convenience class to speed up the writing of graph checkpoint to file + */ +class CheckpointWriter(checkpointDir: String) extends Logging { + val file = new Path(checkpointDir, "graph") + val conf = new Configuration() + var fs = file.getFileSystem(conf) + val maxAttempts = 3 + + def write(checkpoint: Checkpoint) { + // TODO: maybe do this in a different thread from the main stream execution thread + var attempts = 0 + while (attempts < maxAttempts) { + attempts += 1 + try { + logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") + if (fs.exists(file)) { + val bkFile = new Path(file.getParent, file.getName + ".bk") + FileUtil.copy(fs, file, fs, bkFile, true, true, conf) + logDebug("Moved existing checkpoint file to " + bkFile) + } + val fos = fs.create(file) + val oos = new ObjectOutputStream(fos) + oos.writeObject(checkpoint) + oos.close() + logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'") + fos.close() + return + } catch { + case ioe: IOException => + logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + } } - val fos = fs.create(file) - val oos = new ObjectOutputStream(fos) - oos.writeObject(this) - oos.close() - fs.close() - logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'") - } - - def toBytes(): Array[Byte] = { - val bytes = Utils.serialize(this) - bytes + logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") } } -object Checkpoint extends Logging { - def load(path: String): Checkpoint = { +object CheckpointReader extends Logging { + + def read(path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) @@ -82,17 +95,11 @@ object Checkpoint extends Logging { logError("Error loading checkpoint from file '" + file + "'", e) } } else { - logWarning("Could not load checkpoint from file '" + file + "' as it does not exist") + logWarning("Could not read checkpoint from file '" + file + "' as it does not exist") } }) - throw new Exception("Could not load checkpoint from path '" + path + "'") - } - - def fromBytes(bytes: Array[Byte]): Checkpoint = { - val cp = Utils.deserialize[Checkpoint](bytes) - cp.validate() - cp + throw new Exception("Could not read checkpoint from path '" + path + "'") } } diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index e8bbf7d1c0..1235ce3e4d 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -233,7 +233,7 @@ extends Serializable with Logging { } if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) { newRDD.checkpoint() - logInfo("Marking RDD for time " + time + " for checkpointing at time " + time) + logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) Some(newRDD) @@ -300,6 +300,9 @@ extends Serializable with Logging { * this method to save custom checkpoint data. */ protected[streaming] def updateCheckpointData(currentTime: Time) { + + logInfo("Updating checkpoint data for time " + currentTime) + // Get the checkpointed RDDs from the generated RDDs val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null) .map(x => (x._1, x._2.getCheckpointData())) @@ -342,8 +345,11 @@ extends Serializable with Logging { logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs") checkpointData.rdds.foreach { case(time, data) => { - logInfo("Restoring checkpointed RDD for time " + time + " from file") - generatedRDDs += ((time, ssc.sc.objectFile[T](data.toString))) + logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") + val rdd = ssc.sc.objectFile[T](data.toString) + // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData() + rdd.checkpointFile = data.toString + generatedRDDs += ((time, rdd)) } } dependencies.foreach(_.restoreCheckpointData()) diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index b07d51fa6b..8b484e6acf 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -57,7 +57,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( override def checkpoint(interval: Time): DStream[(K, V)] = { super.checkpoint(interval) - reducedStream.checkpoint(interval) + //reducedStream.checkpoint(interval) this } diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index de0fb1f3ad..e2dca91179 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -16,8 +16,16 @@ extends Logging { initLogging() val graph = ssc.graph + val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) + + val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) { + new CheckpointWriter(ssc.checkpointDir) + } else { + null + } + val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_)) @@ -52,19 +60,23 @@ extends Logging { logInfo("Scheduler stopped") } - def generateRDDs(time: Time) { + private def generateRDDs(time: Time) { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") - graph.generateRDDs(time).foreach(submitJob) - logInfo("Generated RDDs for time " + time) + graph.generateRDDs(time).foreach(jobManager.runJob) graph.forgetOldRDDs(time) - if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) { - ssc.doCheckpoint(time) - } + doCheckpoint(time) + logInfo("Generated RDDs for time " + time) } - def submitJob(job: Job) { - jobManager.runJob(job) + private def doCheckpoint(time: Time) { + if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) { + val startTime = System.currentTimeMillis() + ssc.graph.updateCheckpointData(time) + checkpointWriter.write(new Checkpoint(ssc, time)) + val stopTime = System.currentTimeMillis() + logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") + } } } diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index cb261808f5..b7e4c1c30c 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -7,20 +7,11 @@ import spark.rdd.MapPartitionsRDD import spark.SparkContext._ import spark.storage.StorageLevel - -class StateRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: Iterator[T] => Iterator[U], - rememberPartitioner: Boolean - ) extends MapPartitionsRDD[U, T](prev, f) { - override val partitioner = if (rememberPartitioner) prev.partitioner else None -} - class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, - rememberPartitioner: Boolean + preservePartitioning: Boolean ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER) @@ -53,7 +44,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife updateFuncLocal(i) } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) - val stateRDD = new StateRDD(cogroupedRDD, finalFunc, rememberPartitioner) + val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime) return Some(stateRDD) } @@ -78,7 +69,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife } val groupedRDD = parentRDD.groupByKey(partitioner) - val sessionRDD = new StateRDD(groupedRDD, finalFunc, rememberPartitioner) + val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime + " (first)") return Some(sessionRDD) } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 4cba525f86..5e11e6d734 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -28,7 +28,7 @@ final class StreamingContext ( def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) = this(new SparkContext(master, frameworkName, sparkHome, jars), null) - def this(path: String) = this(null, Checkpoint.load(path)) + def this(path: String) = this(null, CheckpointReader.read(path)) def this(cp_ : Checkpoint) = this(null, cp_) @@ -85,7 +85,7 @@ final class StreamingContext ( graph.setRememberDuration(duration) } - def checkpoint(dir: String, interval: Time) { + def checkpoint(dir: String, interval: Time = null) { if (dir != null) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir)) checkpointDir = dir @@ -211,12 +211,29 @@ final class StreamingContext ( graph.addOutputStream(outputStream) } + def validate() { + assert(graph != null, "Graph is null") + graph.validate() + + assert( + checkpointDir == null || checkpointInterval != null, + "Checkpoint directory has been set, but the graph checkpointing interval has " + + "not been set. Please use StreamingContext.checkpoint() to set the interval." + ) + + + } + + /** * This function starts the execution of the streams. */ def start() { - assert(graph != null, "Graph is null") - graph.validate() + if (checkpointDir != null && checkpointInterval == null && graph != null) { + checkpointInterval = graph.batchDuration + } + + validate() val networkInputStreams = graph.getInputStreams().filter(s => s match { case n: NetworkInputDStream[_] => true @@ -250,14 +267,6 @@ final class StreamingContext ( case e: Exception => logWarning("Error while stopping", e) } } - - def doCheckpoint(currentTime: Time) { - val startTime = System.currentTimeMillis() - graph.updateCheckpointData(currentTime) - new Checkpoint(this, currentTime).save(checkpointDir) - val stopTime = System.currentTimeMillis() - logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") - } } diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index 02fe16866e..33774b463d 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -1,5 +1,5 @@ # Set everything to be logged to the console -log4j.rootCategory=WARN, console +log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 0bcf207082..0d82b2f1ea 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -24,7 +24,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { override def framework = "CheckpointSuite" - override def batchDuration = Milliseconds(200) + override def batchDuration = Milliseconds(500) override def checkpointDir = "checkpoint" @@ -34,7 +34,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { test("basic stream+rdd recovery") { - assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second") + assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") @@ -134,9 +134,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val operation = (st: DStream[String]) => { st.map(x => (x, 1)) .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) - .checkpoint(Seconds(2)) + .checkpoint(batchDuration * 2) } - testCheckpointedOperation(input, operation, output, 3) + testCheckpointedOperation(input, operation, output, 7) } test("updateStateByKey") { @@ -148,14 +148,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } st.map(x => (x, 1)) .updateStateByKey[RichInt](updateFunc) - .checkpoint(Seconds(2)) + .checkpoint(batchDuration * 2) .map(t => (t._1, t._2.self)) } - testCheckpointedOperation(input, operation, output, 3) + testCheckpointedOperation(input, operation, output, 7) } - - + /** + * Tests a streaming operation under checkpointing, by restart the operation + * from checkpoint file and verifying whether the final output is correct. + * The output is assumed to have come from a reliable queue which an replay + * data as required. + */ def testCheckpointedOperation[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], @@ -170,8 +174,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val initialNumExpectedOutputs = initialNumBatches val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs - // Do half the computation (half the number of batches), create checkpoint file and quit - + // Do the computation for initial number of batches, create checkpoint file and quit ssc = setupStreams[U, V](input, operation) val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs) verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) @@ -193,8 +196,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { * Advances the manual clock on the streaming scheduler by given number of batches. * It also wait for the expected amount of time for each batch. */ - - def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.time) diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 0957748603..3e99440226 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -16,24 +16,36 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + val testPort = 9999 + var testServer: TestServer = null + var testDir: File = null + override def checkpointDir = "checkpoint" after { FileUtils.deleteDirectory(new File(checkpointDir)) + if (testServer != null) { + testServer.stop() + testServer = null + } + if (testDir != null && testDir.exists()) { + FileUtils.deleteDirectory(testDir) + testDir = null + } } test("network input stream") { // Start the server - val serverPort = 9999 - val server = new TestServer(9999) - server.start() + testServer = new TestServer(testPort) + testServer.start() // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) - val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) ssc.registerOutputStream(outputStream) ssc.start() @@ -41,21 +53,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) val expectedOutput = input.map(_.toString) + Thread.sleep(1000) for (i <- 0 until input.size) { - server.send(input(i).toString + "\n") + testServer.send(input(i).toString + "\n") Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) } - val startTime = System.currentTimeMillis() - while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size) - Thread.sleep(100) - } Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") logInfo("Stopping server") - server.stop() + testServer.stop() logInfo("Stopping context") ssc.stop() @@ -69,24 +75,24 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") - assert(outputBuffer.size === expectedOutput.size) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - assert(outputBuffer(i).head === expectedOutput(i)) + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) } } test("network input stream with checkpoint") { // Start the server - val serverPort = 9999 - val server = new TestServer(9999) - server.start() + testServer = new TestServer(testPort) + testServer.start() // Set up the streaming context and input streams var ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) - val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) ssc.registerOutputStream(outputStream) ssc.start() @@ -94,7 +100,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Feed data to the server to send to the network receiver var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] for (i <- Seq(1, 2, 3)) { - server.send(i.toString + "\n") + testServer.send(i.toString + "\n") Thread.sleep(100) clock.addToTime(batchDuration.milliseconds) } @@ -109,7 +115,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { ssc.start() clock = ssc.scheduler.clock.asInstanceOf[ManualClock] for (i <- Seq(4, 5, 6)) { - server.send(i.toString + "\n") + testServer.send(i.toString + "\n") Thread.sleep(100) clock.addToTime(batchDuration.milliseconds) } @@ -120,12 +126,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } test("file input stream") { + // Create a temporary directory - val dir = { + testDir = { var temp = File.createTempFile(".temp.", Random.nextInt().toString) temp.delete() temp.mkdirs() - temp.deleteOnExit() logInfo("Created temp dir " + temp) temp } @@ -133,10 +139,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) - val filestream = ssc.textFileStream(dir.toString) + val filestream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) - val outputStream = new TestOutputStream(filestream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -147,16 +152,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val expectedOutput = input.map(_.toString) Thread.sleep(1000) for (i <- 0 until input.size) { - FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n") - Thread.sleep(100) + FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n") + Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) - Thread.sleep(100) + //Thread.sleep(100) } val startTime = System.currentTimeMillis() - while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) + /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) Thread.sleep(100) - } + }*/ Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") @@ -165,14 +170,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + output.size) + logInfo("output.size = " + outputBuffer.size) logInfo("output") - output.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) assert(output.size === expectedOutput.size) for (i <- 0 until output.size) { assert(output(i).size === 1) @@ -182,12 +189,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("file input stream with checkpoint") { // Create a temporary directory - val dir = { + testDir = { var temp = File.createTempFile(".temp.", Random.nextInt().toString) temp.delete() temp.mkdirs() - temp.deleteOnExit() - println("Created temp dir " + temp) + logInfo("Created temp dir " + temp) temp } @@ -195,7 +201,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { var ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) - val filestream = ssc.textFileStream(dir.toString) + val filestream = ssc.textFileStream(testDir.toString) var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) ssc.registerOutputStream(outputStream) ssc.start() @@ -204,7 +210,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { - FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") Thread.sleep(100) clock.addToTime(batchDuration.milliseconds) } @@ -221,7 +227,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(500) for (i <- Seq(4, 5, 6)) { - FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") Thread.sleep(100) clock.addToTime(batchDuration.milliseconds) } |