From ec80c0c2fc63360ee6b5872c24e6c67779ac63f4 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 13 Nov 2015 00:30:27 -0800 Subject: [SPARK-11706][STREAMING] Fix the bug that Streaming Python tests cannot report failures This PR just checks the test results and returns 1 if the test fails, so that `run-tests.py` can mark it fail. Author: Shixiong Zhu Closes #9669 from zsxwing/streaming-python-tests. --- .../scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala | 5 +++-- .../org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala | 9 +++++---- .../apache/spark/streaming/flume/FlumePollingStreamSuite.scala | 2 +- .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) (limited to 'external') diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala index 70018c86f9..fe5dcc8e4b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer +import java.util.{List => JList} import java.util.Collections import scala.collection.JavaConverters._ @@ -59,10 +60,10 @@ private[flume] class FlumeTestUtils { } /** Send data to the flume receiver */ - def writeInput(input: Seq[String], enableCompression: Boolean): Unit = { + def writeInput(input: JList[String], enableCompression: Boolean): Unit = { val testAddress = new InetSocketAddress("localhost", testPort) - val inputEvents = input.map { item => + val inputEvents = input.asScala.map { item => val event = new AvroFlumeEvent event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8))) event.setHeaders(Collections.singletonMap("test", "header")) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index a2ab320957..bfe7548d4f 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume import java.util.concurrent._ -import java.util.{Map => JMap, Collections} +import java.util.{Collections, List => JList, Map => JMap} import scala.collection.mutable.ArrayBuffer @@ -137,7 +137,8 @@ private[flume] class PollingFlumeTestUtils { /** * A Python-friendly method to assert the output */ - def assertOutput(outputHeaders: Seq[JMap[String, String]], outputBodies: Seq[String]): Unit = { + def assertOutput( + outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = { require(outputHeaders.size == outputBodies.size) val eventSize = outputHeaders.size if (eventSize != totalEventsPerChannel * channels.size) { @@ -151,8 +152,8 @@ private[flume] class PollingFlumeTestUtils { var found = false var j = 0 while (j < eventSize && !found) { - if (eventBodyToVerify == outputBodies(j) && - eventHeaderToVerify == outputHeaders(j)) { + if (eventBodyToVerify == outputBodies.get(j) && + eventHeaderToVerify == outputHeaders.get(j)) { found = true counter += 1 } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index ff2fb8eed2..5fd2711f5f 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -120,7 +120,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log case (key, value) => (key.toString, value.toString) }).map(_.asJava) val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8)) - utils.assertOutput(headers, bodies) + utils.assertOutput(headers.asJava, bodies.asJava) } } finally { ssc.stop() diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 5ffb60bd60..f315e0a7ca 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -54,7 +54,7 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w val outputBuffer = startContext(utils.getTestPort(), testCompression) eventually(timeout(10 seconds), interval(100 milliseconds)) { - utils.writeInput(input, testCompression) + utils.writeInput(input.asJava, testCompression) } eventually(timeout(10 seconds), interval(100 milliseconds)) { -- cgit v1.2.3