diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 16:27:00 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 16:27:00 -0800 |
commit | 8ca14a1e5157a449d1fa7dc0657079ed82c3c4be (patch) | |
tree | c032a78104eb52b3f2509deb5960f2a54c3552e4 | |
parent | b31e91f927356c50d24286ba70f00fa8f6527e2f (diff) | |
download | spark-8ca14a1e5157a449d1fa7dc0657079ed82c3c4be.tar.gz spark-8ca14a1e5157a449d1fa7dc0657079ed82c3c4be.tar.bz2 spark-8ca14a1e5157a449d1fa7dc0657079ed82c3c4be.zip |
Updated testsuites to work with the slack time of file stream.
3 files changed, 22 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index fb52bcfb67..b526a43662 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -201,7 +201,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas // Creating an RDD from a HDFS file immediately after the file is created sometime returns // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older // than slack time from current time is considered for processing. - val slackTime = System.getProperty("spark.streaming.filestream.slackTime", "2000").toLong + val slackTime = System.getProperty("spark.streaming.fileStream.slackTime", "2000").toLong val maxModTime = currentTime - slackTime def accept(path: Path): Boolean = { @@ -237,4 +237,12 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas private[streaming] object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") + + private[streaming] def disableSlackTime() { + System.setProperty("spark.streaming.fileStream.slackTime", "0") + } + + private[streaming] def restoreSlackTime() { + System.clearProperty("spark.streaming.fileStream.slackTime") + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 4e25c9566c..0347cc1032 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -200,6 +200,9 @@ class CheckpointSuite extends TestSuiteBase { val clockProperty = System.getProperty("spark.streaming.clock") System.clearProperty("spark.streaming.clock") + // Disable slack time of file stream when testing with local file system + FileInputDStream.disableSlackTime() + // Set up the streaming context and input streams val testDir = Files.createTempDir() var ssc = new StreamingContext(master, framework, Seconds(1)) @@ -300,6 +303,9 @@ class CheckpointSuite extends TestSuiteBase { // Enable manual clock back again for other tests if (clockProperty != null) System.setProperty("spark.streaming.clock", clockProperty) + + // Restore the default slack time + FileInputDStream.restoreSlackTime() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 62a9f120b4..e506c954ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,7 +23,7 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent} +import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} @@ -152,6 +152,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Disable manual clock as FileInputDStream does not work with manual clock System.clearProperty("spark.streaming.clock") + // Disable slack time of file stream when testing with local file system + FileInputDStream.disableSlackTime() + // Set up the streaming context and input streams val testDir = Files.createTempDir() val ssc = new StreamingContext(master, framework, batchDuration) @@ -196,6 +199,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Enable manual clock back again for other tests System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + + // Restore the default slack time + FileInputDStream.restoreSlackTime() } |