aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala46
3 files changed, 49 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 389b23d4d5..952511d411 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is a server to test the network input stream */
-class TestServer() extends Logging {
+class TestServer(portToBind: Int = 0) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
- val serverSocket = new ServerSocket(0)
+ val serverSocket = new ServerSocket(portToBind)
val servingThread = new Thread() {
override def run() {
@@ -282,7 +282,7 @@ class TestServer() extends Logging {
def start() { servingThread.start() }
- def send(msg: String) { queue.add(msg) }
+ def send(msg: String) { queue.put(msg) }
def stop() { servingThread.interrupt() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9cc27ef7f0..efd0d22ecb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
-
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
new file mode 100644
index 0000000000..35538ec188
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.io.Source
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+class UISuite extends FunSuite {
+
+ test("streaming tab in spark UI") {
+ val ssc = new StreamingContext("local", "test", Seconds(1))
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+ // test if streaming tab exist
+ assert(html.toLowerCase.contains("streaming"))
+ // test if other Spark tabs still exist
+ assert(html.toLowerCase.contains("stages"))
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(
+ ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ assert(html.toLowerCase.contains("batch"))
+ assert(html.toLowerCase.contains("network"))
+ }
+ }
+}