aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-24 23:14:37 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-24 23:14:37 -0700
commit926e05b0300ad2850d48e5692d73c209c1c90100 (patch)
treec87e4a439ba8340115de2a646f06998d49c8d247 /streaming
parented71df46cddc9a4f1363b937c10bfa2a928e564c (diff)
downloadspark-926e05b0300ad2850d48e5692d73c209c1c90100.tar.gz
spark-926e05b0300ad2850d48e5692d73c209c1c90100.tar.bz2
spark-926e05b0300ad2850d48e5692d73c209c1c90100.zip
Added tests for the file input stream.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/FileInputDStream.scala4
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala68
2 files changed, 64 insertions, 8 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
index 78537b8794..69d3504c72 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
@@ -49,7 +49,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
if (!filter.accept(path)) {
return false
} else {
- val modTime = fs.getFileStatus(path).getModificationTime()
+ val modTime = fs.getFileStatus(path).getModificationTime()
if (modTime <= lastModTime) {
return false
}
@@ -60,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
}
-
+
val newFiles = fs.listStatus(path, newFilter)
logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
if (newFiles.length > 0) {
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index a3f213ebd0..fcf5d22f5c 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,43 +1,48 @@
package spark.streaming
import java.net.{SocketException, Socket, ServerSocket}
-import java.io.{BufferedWriter, OutputStreamWriter}
+import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import spark.storage.StorageLevel
import spark.Logging
+import scala.util.Random
+import org.apache.commons.io.FileUtils
class InputStreamsSuite extends TestSuiteBase {
test("network input stream") {
+ // Start the server
val serverPort = 9999
val server = new TestServer(9999)
server.start()
+
+ // Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
-
val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.DISK_AND_MEMORY)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
+ // Feed data to the server to send to the Spark Streaming network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val input = Seq(1, 2, 3)
+ val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
for (i <- 0 until input.size) {
server.send(input(i).toString + "\n")
- Thread.sleep(1000)
- clock.addToTime(1000)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(100)
}
- Thread.sleep(5000)
+ Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping server")
@@ -45,6 +50,57 @@ class InputStreamsSuite extends TestSuiteBase {
logInfo("Stopping context")
ssc.stop()
+ // Verify whether data received by Spark Streaming was as expected
+ assert(outputBuffer.size === expectedOutput.size)
+ for (i <- 0 until outputBuffer.size) {
+ assert(outputBuffer(i).size === 1)
+ assert(outputBuffer(i).head === expectedOutput(i))
+ }
+ }
+
+ test("file input stream") {
+ // Create a temporary directory
+ val dir = {
+ var temp = File.createTempFile(".temp.", Random.nextInt().toString)
+ temp.delete()
+ temp.mkdirs()
+ temp.deleteOnExit()
+ println("Created temp dir " + temp)
+ temp
+ }
+
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(master, framework)
+ ssc.setBatchDuration(batchDuration)
+ val filestream = ssc.textFileStream(dir.toString)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
+ val outputStream = new TestOutputStream(filestream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Create files in the temporary directory so that Spark Streaming can read data from it
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq(1, 2, 3, 4, 5)
+ val expectedOutput = input.map(_.toString)
+ Thread.sleep(1000)
+ for (i <- 0 until input.size) {
+ FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(500)
+ }
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
+ Thread.sleep(100)
+ }
+ Thread.sleep(1000)
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
+ println("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received by Spark Streaming was as expected
assert(outputBuffer.size === expectedOutput.size)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)