diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-24 23:14:37 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-24 23:14:37 -0700 |
commit | 926e05b0300ad2850d48e5692d73c209c1c90100 (patch) | |
tree | c87e4a439ba8340115de2a646f06998d49c8d247 /streaming | |
parent | ed71df46cddc9a4f1363b937c10bfa2a928e564c (diff) | |
download | spark-926e05b0300ad2850d48e5692d73c209c1c90100.tar.gz spark-926e05b0300ad2850d48e5692d73c209c1c90100.tar.bz2 spark-926e05b0300ad2850d48e5692d73c209c1c90100.zip |
Added tests for the file input stream.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/FileInputDStream.scala | 4 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 68 |
2 files changed, 64 insertions, 8 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala index 78537b8794..69d3504c72 100644 --- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala @@ -49,7 +49,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K if (!filter.accept(path)) { return false } else { - val modTime = fs.getFileStatus(path).getModificationTime() + val modTime = fs.getFileStatus(path).getModificationTime() if (modTime <= lastModTime) { return false } @@ -60,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } } } - + val newFiles = fs.listStatus(path, newFilter) logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) if (newFiles.length > 0) { diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index a3f213ebd0..fcf5d22f5c 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -1,43 +1,48 @@ package spark.streaming import java.net.{SocketException, Socket, ServerSocket} -import java.io.{BufferedWriter, OutputStreamWriter} +import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import spark.storage.StorageLevel import spark.Logging +import scala.util.Random +import org.apache.commons.io.FileUtils class InputStreamsSuite extends TestSuiteBase { test("network input stream") { + // Start the server val serverPort = 9999 val server = new TestServer(9999) server.start() + + // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) - val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.DISK_AND_MEMORY) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() + // Feed data to the server to send to the Spark Streaming network receiver val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3) + val input = Seq(1, 2, 3, 4, 5) val expectedOutput = input.map(_.toString) for (i <- 0 until input.size) { server.send(input(i).toString + "\n") - Thread.sleep(1000) - clock.addToTime(1000) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) } val startTime = System.currentTimeMillis() while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size) Thread.sleep(100) } - Thread.sleep(5000) + Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") logInfo("Stopping server") @@ -45,6 +50,57 @@ class InputStreamsSuite extends TestSuiteBase { logInfo("Stopping context") ssc.stop() + // Verify whether data received by Spark Streaming was as expected + assert(outputBuffer.size === expectedOutput.size) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + assert(outputBuffer(i).head === expectedOutput(i)) + } + } + + test("file input stream") { + // Create a temporary directory + val dir = { + var temp = File.createTempFile(".temp.", Random.nextInt().toString) + temp.delete() + temp.mkdirs() + temp.deleteOnExit() + println("Created temp dir " + temp) + temp + } + + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + val filestream = ssc.textFileStream(dir.toString) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] + val outputStream = new TestOutputStream(filestream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files in the temporary directory so that Spark Streaming can read data from it + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + val expectedOutput = input.map(_.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n") + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(500) + } + val startTime = System.currentTimeMillis() + while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size) + Thread.sleep(100) + } + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + println("Stopping context") + ssc.stop() + + // Verify whether data received by Spark Streaming was as expected assert(outputBuffer.size === expectedOutput.size) for (i <- 0 until outputBuffer.size) { assert(outputBuffer(i).size === 1) |