aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-11 13:20:09 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-11 13:20:09 -0800
commit46222dc56db4a521bd613bd3fac5b91868bb339e (patch)
tree8f0282c0bd409c300519fbec9af294bf706df6ca /streaming
parent04e9e9d93c512f856116bc2c99c35dfb48b4adee (diff)
downloadspark-46222dc56db4a521bd613bd3fac5b91868bb339e.tar.gz
spark-46222dc56db4a521bd613bd3fac5b91868bb339e.tar.bz2
spark-46222dc56db4a521bd613bd3fac5b91868bb339e.zip
Fixed bug in FileInputDStream that allowed it to miss new files. Added tests in the InputStreamsSuite to test checkpointing of file and network streams.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/FileInputDStream.scala34
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala136
2 files changed, 148 insertions, 22 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
index 9d7361097b..88856364d2 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
@@ -6,7 +6,8 @@ import spark.rdd.UnionRDD
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import java.io.{ObjectInputStream, IOException}
+
+import scala.collection.mutable.HashSet
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@@ -19,7 +20,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
- var lastModTime: Long = 0
+ var lastModTime = 0L
+ val lastModTimeFiles = new HashSet[String]()
def path(): Path = {
if (path_ == null) path_ = new Path(directory)
@@ -40,22 +42,37 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
override def stop() { }
-
+
+ /**
+ * Finds the files that were modified since the last time this method was called and makes
+ * a union RDD out of them. Note that this maintains the list of files that were processed
+ * in the latest modification time in the previous call to this method. This is because the
+ * modification time returned by the FileStatus API seems to return times only at the
+ * granularity of seconds. Hence, new files may have the same modification time as the
+ * latest modification time in the previous call to this method and the list of files
+ * maintained is used to filter the one that have been processed.
+ */
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ // Create the filter for selecting new files
val newFilter = new PathFilter() {
var latestModTime = 0L
-
+ val latestModTimeFiles = new HashSet[String]()
+
def accept(path: Path): Boolean = {
if (!filter.accept(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime <= lastModTime) {
+ if (modTime < lastModTime){
+ return false
+ } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
return false
}
if (modTime > latestModTime) {
latestModTime = modTime
+ latestModTimeFiles.clear()
}
+ latestModTimeFiles += path.toString
return true
}
}
@@ -64,7 +81,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val newFiles = fs.listStatus(path, newFilter)
logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
if (newFiles.length > 0) {
- lastModTime = newFilter.latestModTime
+ // Update the modification time and the files processed for that modification time
+ if (lastModTime != newFilter.latestModTime) {
+ lastModTime = newFilter.latestModTime
+ lastModTimeFiles.clear()
+ }
+ lastModTimeFiles ++= newFilter.latestModTimeFiles
}
val newRDD = new UnionRDD(ssc.sc, newFiles.map(
file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)))
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 8f892baab1..0957748603 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -9,12 +9,19 @@ import spark.storage.StorageLevel
import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
+import org.scalatest.BeforeAndAfter
-class InputStreamsSuite extends TestSuiteBase {
+class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ override def checkpointDir = "checkpoint"
+
+ after {
+ FileUtils.deleteDirectory(new File(checkpointDir))
+ }
+
test("network input stream") {
// Start the server
val serverPort = 9999
@@ -30,7 +37,7 @@ class InputStreamsSuite extends TestSuiteBase {
ssc.registerOutputStream(outputStream)
ssc.start()
- // Feed data to the server to send to the Spark Streaming network receiver
+ // Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
@@ -52,7 +59,7 @@ class InputStreamsSuite extends TestSuiteBase {
logInfo("Stopping context")
ssc.stop()
- // Verify whether data received by Spark Streaming was as expected
+ // Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output")
@@ -69,6 +76,49 @@ class InputStreamsSuite extends TestSuiteBase {
}
}
+ test("network input stream with checkpoint") {
+ // Start the server
+ val serverPort = 9999
+ val server = new TestServer(9999)
+ server.start()
+
+ // Set up the streaming context and input streams
+ var ssc = new StreamingContext(master, framework)
+ ssc.setBatchDuration(batchDuration)
+ ssc.checkpoint(checkpointDir, checkpointInterval)
+ val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
+ var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Feed data to the server to send to the network receiver
+ var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ for (i <- Seq(1, 2, 3)) {
+ server.send(i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+
+ // Restart stream computation from checkpoint and feed more data to see whether
+ // they are being received and processed
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ for (i <- Seq(4, 5, 6)) {
+ server.send(i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+ }
+
test("file input stream") {
// Create a temporary directory
val dir = {
@@ -76,7 +126,7 @@ class InputStreamsSuite extends TestSuiteBase {
temp.delete()
temp.mkdirs()
temp.deleteOnExit()
- println("Created temp dir " + temp)
+ logInfo("Created temp dir " + temp)
temp
}
@@ -84,7 +134,9 @@ class InputStreamsSuite extends TestSuiteBase {
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
val filestream = ssc.textFileStream(dir.toString)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ def output = outputBuffer.flatMap(x => x)
+
val outputStream = new TestOutputStream(filestream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -96,36 +148,88 @@ class InputStreamsSuite extends TestSuiteBase {
Thread.sleep(1000)
for (i <- 0 until input.size) {
FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
+ Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(500)
+ Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
- while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
+ while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- println("Stopping context")
+ logInfo("Stopping context")
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
+ logInfo("output.size = " + output.size)
logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i).size === 1)
+ assert(output(i).head.toString === expectedOutput(i))
+ }
+ }
+
+ test("file input stream with checkpoint") {
+ // Create a temporary directory
+ val dir = {
+ var temp = File.createTempFile(".temp.", Random.nextInt().toString)
+ temp.delete()
+ temp.mkdirs()
+ temp.deleteOnExit()
+ println("Created temp dir " + temp)
+ temp
}
+
+ // Set up the streaming context and input streams
+ var ssc = new StreamingContext(master, framework)
+ ssc.setBatchDuration(batchDuration)
+ ssc.checkpoint(checkpointDir, checkpointInterval)
+ val filestream = ssc.textFileStream(dir.toString)
+ var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Create files and advance manual clock to process them
+ var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(1000)
+ for (i <- Seq(1, 2, 3)) {
+ FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+
+ // Restart stream computation from checkpoint and create more files to see whether
+ // they are being processed
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(500)
+ for (i <- Seq(4, 5, 6)) {
+ FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0)
+ ssc.stop()
}
}