aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-13 00:30:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-13 00:30:27 -0800
commitec80c0c2fc63360ee6b5872c24e6c67779ac63f4 (patch)
treefc7186f159d172f9799b5c0b5c5b8737502d1b76 /external
parentad960885bfee7850c18eb5338546cecf2b2e9876 (diff)
downloadspark-ec80c0c2fc63360ee6b5872c24e6c67779ac63f4.tar.gz
spark-ec80c0c2fc63360ee6b5872c24e6c67779ac63f4.tar.bz2
spark-ec80c0c2fc63360ee6b5872c24e6c67779ac63f4.zip
[SPARK-11706][STREAMING] Fix the bug that Streaming Python tests cannot report failures
This PR just checks the test results and returns 1 if the test fails, so that `run-tests.py` can mark it fail. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9669 from zsxwing/streaming-python-tests.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala5
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala9
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
4 files changed, 10 insertions, 8 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
index 70018c86f9..fe5dcc8e4b 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.flume
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
+import java.util.{List => JList}
import java.util.Collections
import scala.collection.JavaConverters._
@@ -59,10 +60,10 @@ private[flume] class FlumeTestUtils {
}
/** Send data to the flume receiver */
- def writeInput(input: Seq[String], enableCompression: Boolean): Unit = {
+ def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
val testAddress = new InetSocketAddress("localhost", testPort)
- val inputEvents = input.map { item =>
+ val inputEvents = input.asScala.map { item =>
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
event.setHeaders(Collections.singletonMap("test", "header"))
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index a2ab320957..bfe7548d4f 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.flume
import java.util.concurrent._
-import java.util.{Map => JMap, Collections}
+import java.util.{Collections, List => JList, Map => JMap}
import scala.collection.mutable.ArrayBuffer
@@ -137,7 +137,8 @@ private[flume] class PollingFlumeTestUtils {
/**
* A Python-friendly method to assert the output
*/
- def assertOutput(outputHeaders: Seq[JMap[String, String]], outputBodies: Seq[String]): Unit = {
+ def assertOutput(
+ outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
require(outputHeaders.size == outputBodies.size)
val eventSize = outputHeaders.size
if (eventSize != totalEventsPerChannel * channels.size) {
@@ -151,8 +152,8 @@ private[flume] class PollingFlumeTestUtils {
var found = false
var j = 0
while (j < eventSize && !found) {
- if (eventBodyToVerify == outputBodies(j) &&
- eventHeaderToVerify == outputHeaders(j)) {
+ if (eventBodyToVerify == outputBodies.get(j) &&
+ eventHeaderToVerify == outputHeaders.get(j)) {
found = true
counter += 1
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index ff2fb8eed2..5fd2711f5f 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -120,7 +120,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
case (key, value) => (key.toString, value.toString)
}).map(_.asJava)
val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8))
- utils.assertOutput(headers, bodies)
+ utils.assertOutput(headers.asJava, bodies.asJava)
}
} finally {
ssc.stop()
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 5ffb60bd60..f315e0a7ca 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -54,7 +54,7 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val outputBuffer = startContext(utils.getTestPort(), testCompression)
eventually(timeout(10 seconds), interval(100 milliseconds)) {
- utils.writeInput(input, testCompression)
+ utils.writeInput(input.asJava, testCompression)
}
eventually(timeout(10 seconds), interval(100 milliseconds)) {