aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-23 16:27:00 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-23 16:27:00 -0800
commit8ca14a1e5157a449d1fa7dc0657079ed82c3c4be (patch)
treec032a78104eb52b3f2509deb5960f2a54c3552e4 /streaming/src/test
parentb31e91f927356c50d24286ba70f00fa8f6527e2f (diff)
downloadspark-8ca14a1e5157a449d1fa7dc0657079ed82c3c4be.tar.gz
spark-8ca14a1e5157a449d1fa7dc0657079ed82c3c4be.tar.bz2
spark-8ca14a1e5157a449d1fa7dc0657079ed82c3c4be.zip
Updated testsuites to work with the slack time of file stream.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala8
2 files changed, 13 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 4e25c9566c..0347cc1032 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -200,6 +200,9 @@ class CheckpointSuite extends TestSuiteBase {
val clockProperty = System.getProperty("spark.streaming.clock")
System.clearProperty("spark.streaming.clock")
+ // Disable slack time of file stream when testing with local file system
+ FileInputDStream.disableSlackTime()
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -300,6 +303,9 @@ class CheckpointSuite extends TestSuiteBase {
// Enable manual clock back again for other tests
if (clockProperty != null)
System.setProperty("spark.streaming.clock", clockProperty)
+
+ // Restore the default slack time
+ FileInputDStream.restoreSlackTime()
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 62a9f120b4..e506c954ac 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
-import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
@@ -152,6 +152,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Disable manual clock as FileInputDStream does not work with manual clock
System.clearProperty("spark.streaming.clock")
+ // Disable slack time of file stream when testing with local file system
+ FileInputDStream.disableSlackTime()
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
@@ -196,6 +199,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enable manual clock back again for other tests
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+
+ // Restore the default slack time
+ FileInputDStream.restoreSlackTime()
}