aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 5fa14ad7c4..5185954521 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -56,9 +56,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
ssc.registerOutputStream(outputStream)
@@ -101,7 +101,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("flume input stream") {
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
@@ -150,11 +150,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
- System.clearProperty("spark.streaming.clock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -195,7 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(testDir)
// Enable manual clock back again for other tests
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
@@ -206,7 +206,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -250,7 +250,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val topics = Map("my-topic" -> 1)
val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
@@ -273,7 +273,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
MultiThreadTestReceiver.haveAllThreadsFinished = false
// set up the network stream using the test receiver
- val ssc = new StreamingContext(master, framework, batchDuration)
+ val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
val networkStream = ssc.networkStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]