aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-10-13 22:46:49 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-10-13 22:46:49 -0700
commit4d26aca770f7dd50eee1ed7855e9eda68b5a7ffa (patch)
tree6906b2a4071fc1ded7d90994cb17423ca19f0214 /external/flume/src/test
parent9eb49d4134e23a15142fb592d54d920e89bd8786 (diff)
downloadspark-4d26aca770f7dd50eee1ed7855e9eda68b5a7ffa.tar.gz
spark-4d26aca770f7dd50eee1ed7855e9eda68b5a7ffa.tar.bz2
spark-4d26aca770f7dd50eee1ed7855e9eda68b5a7ffa.zip
[SPARK-3912][Streaming] Fixed flakyFlumeStreamSuite
@harishreedharan @pwendell See JIRA for diagnosis of the problem https://issues.apache.org/jira/browse/SPARK-3912 The solution was to reimplement it. 1. Find a free port (by binding and releasing a server-scoket), and then use that port 2. Remove thread.sleep()s, instead repeatedly try to create a sender and send data and check whether data was sent. Use eventually() to minimize waiting time. 3. Check whether all the data was received, without caring about batches. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2773 from tdas/flume-test-fix and squashes the following commits: 93cd7f6 [Tathagata Das] Reimplimented FlumeStreamSuite to be more robust.
Diffstat (limited to 'external/flume/src/test')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala166
1 files changed, 102 insertions, 64 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 33235d150b..13943ed544 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,103 +17,141 @@
package org.apache.spark.streaming.flume
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
-
-import java.net.InetSocketAddress
+import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
import java.nio.charset.Charset
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.source.avro
import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually._
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
-import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
+import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
import org.apache.spark.util.Utils
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.handler.codec.compression._
+class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
+ val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
+
+ var ssc: StreamingContext = null
+ var transceiver: NettyTransceiver = null
-class FlumeStreamSuite extends TestSuiteBase {
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ if (transceiver != null) {
+ transceiver.close()
+ }
+ }
test("flume input stream") {
- runFlumeStreamTest(false)
+ testFlumeStream(testCompression = false)
}
test("flume input compressed stream") {
- runFlumeStreamTest(true)
+ testFlumeStream(testCompression = true)
+ }
+
+ /** Run test on flume stream */
+ private def testFlumeStream(testCompression: Boolean): Unit = {
+ val input = (1 to 100).map { _.toString }
+ val testPort = findFreePort()
+ val outputBuffer = startContext(testPort, testCompression)
+ writeAndVerify(input, testPort, outputBuffer, testCompression)
+ }
+
+ /** Find a free port */
+ private def findFreePort(): Int = {
+ Utils.startServiceOnPort(23456, (trialPort: Int) => {
+ val socket = new ServerSocket(trialPort)
+ socket.close()
+ (null, trialPort)
+ })._2
}
-
- def runFlumeStreamTest(enableDecompression: Boolean) {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val (flumeStream, testPort) =
- Utils.startServiceOnPort(9997, (trialPort: Int) => {
- val dstream = FlumeUtils.createStream(
- ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
- (dstream, trialPort)
- })
+ /** Setup and start the streaming context */
+ private def startContext(
+ testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
+ ssc = new StreamingContext(conf, Milliseconds(200))
+ val flumeStream = FlumeUtils.createStream(
+ ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
+ outputBuffer
+ }
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val input = Seq(1, 2, 3, 4, 5)
- Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- var client: AvroSourceProtocol = null
-
- if (enableDecompression) {
- client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol],
- new NettyTransceiver(new InetSocketAddress("localhost", testPort),
- new CompressionChannelFactory(6)))
- } else {
- client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver)
- }
+ /** Send data to the flume receiver and verify whether the data was received */
+ private def writeAndVerify(
+ input: Seq[String],
+ testPort: Int,
+ outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
+ enableCompression: Boolean
+ ) {
+ val testAddress = new InetSocketAddress("localhost", testPort)
- for (i <- 0 until input.size) {
+ val inputEvents = input.map { item =>
val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
+ event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
- client.append(event)
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
+ event
}
- Thread.sleep(1000)
-
- val startTime = System.currentTimeMillis()
- while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
- Thread.sleep(100)
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ // if last attempted transceiver had succeeded, close it
+ if (transceiver != null) {
+ transceiver.close()
+ transceiver = null
+ }
+
+ // Create transceiver
+ transceiver = {
+ if (enableCompression) {
+ new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+ } else {
+ new NettyTransceiver(testAddress)
+ }
+ }
+
+ // Create Avro client with the transceiver
+ val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
+ client should not be null
+
+ // Send data
+ val status = client.appendBatch(inputEvents.toList)
+ status should be (avro.Status.OK)
}
- Thread.sleep(1000)
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- logInfo("Stopping context")
- ssc.stop()
-
- val decoder = Charset.forName("UTF-8").newDecoder()
-
- assert(outputBuffer.size === input.length)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- val str = decoder.decode(outputBuffer(i).head.event.getBody)
- assert(str.toString === input(i).toString)
- assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
+
+ val decoder = Charset.forName("UTF-8").newDecoder()
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val outputEvents = outputBuffer.flatten.map { _.event }
+ outputEvents.foreach {
+ event =>
+ event.getHeaders.get("test") should be("header")
+ }
+ val output = outputEvents.map(event => decoder.decode(event.getBody()).toString)
+ output should be (input)
}
}
- class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
+ /** Class to create socket channel with compression */
+ private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
val encoder = new ZlibEncoder(compressionLevel)
pipeline.addFirst("deflater", encoder)