aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-05-13 16:43:30 -0700
committerAndrew Or <andrew@databricks.com>2015-05-13 16:43:30 -0700
commit61d1e87c0d3d12dac0b724d1b84436f748227e99 (patch)
tree0511f757c1aacf1b9b3a7926ae2c041fd227733a /external
parentbb6dec3b160b54488892a509965fee70a530deff (diff)
downloadspark-61d1e87c0d3d12dac0b724d1b84436f748227e99.tar.gz
spark-61d1e87c0d3d12dac0b724d1b84436f748227e99.tar.bz2
spark-61d1e87c0d3d12dac0b724d1b84436f748227e99.zip
[SPARK-7356] [STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.
This is meant to make the FlumePollingStreamSuite deterministic. Now we basically count the number of batches that have been completed - and then verify the results rather than sleeping for random periods of time. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #5918 from harishreedharan/flume-test-fix and squashes the following commits: 93f24f3 [Hari Shreedharan] Add an eventually block to ensure that all received data is processed. Refactor the dstream creation and remove redundant code. 1108804 [Hari Shreedharan] [SPARK-7356][STREAMING] Fix flakey tests in FlumePollingStreamSuite using SparkSink's batch CountDownLatch.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala110
1 files changed, 51 insertions, 59 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 43c1b865b6..93afe50c21 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,15 +18,18 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
-import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.concurrent.duration._
+import scala.language.postfixOps
import org.apache.flume.Context
import org.apache.flume.channel.MemoryChannel
import org.apache.flume.conf.Configurables
import org.apache.flume.event.EventBuilder
+import org.scalatest.concurrent.Eventually._
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -57,11 +60,11 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
before(beforeFunction())
- ignore("flume polling test") {
+ test("flume polling test") {
testMultipleTimes(testFlumePolling)
}
- ignore("flume polling test multiple hosts") {
+ test("flume polling test multiple hosts") {
testMultipleTimes(testFlumePollingMultipleHost)
}
@@ -100,18 +103,8 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
- StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
- outputStream.register()
- ssc.start()
- writeAndVerify(Seq(channel), ssc, outputBuffer)
+ writeAndVerify(Seq(sink), Seq(channel))
assertChannelIsEmpty(channel)
sink.stop()
channel.stop()
@@ -142,10 +135,22 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
Configurables.configure(sink2, context)
sink2.setChannel(channel2)
sink2.start()
+ try {
+ writeAndVerify(Seq(sink, sink2), Seq(channel, channel2))
+ assertChannelIsEmpty(channel)
+ assertChannelIsEmpty(channel2)
+ } finally {
+ sink.stop()
+ sink2.stop()
+ channel.stop()
+ channel2.stop()
+ }
+ }
+ def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel]) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
+ val addresses = sinks.map(sink => new InetSocketAddress("localhost", sink.getPort()))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
eventsPerBatch, 5)
@@ -155,61 +160,49 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
outputStream.register()
ssc.start()
- try {
- writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
- assertChannelIsEmpty(channel)
- assertChannelIsEmpty(channel2)
- } finally {
- sink.stop()
- sink2.stop()
- channel.stop()
- channel2.stop()
- }
- }
-
- def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
- outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
- channels.map(channel => {
+
+ val latch = new CountDownLatch(batchCount * channels.size)
+ sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+ channels.foreach(channel => {
executorCompletion.submit(new TxnSubmitter(channel, clock))
})
+
for (i <- 0 until channels.size) {
executorCompletion.take()
}
- val startTime = System.currentTimeMillis()
- while (outputBuffer.size < batchCount * channels.size &&
- System.currentTimeMillis() - startTime < 15000) {
- logInfo("output.size = " + outputBuffer.size)
- Thread.sleep(100)
- }
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
- logInfo("Stopping context")
- ssc.stop()
- val flattenedBuffer = outputBuffer.flatten
- assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
- var counter = 0
- for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
- val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
- String.valueOf(i)).getBytes("utf-8"),
- Map[String, String]("test-" + i.toString -> "header"))
- var found = false
- var j = 0
- while (j < flattenedBuffer.size && !found) {
- val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
- if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
- eventToVerify.getHeaders.get("test-" + i.toString)
- .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
- found = true
- counter += 1
+ latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+ clock.advance(batchDuration.milliseconds)
+
+ // The eventually is required to ensure that all data in the batch has been processed.
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val flattenedBuffer = outputBuffer.flatten
+ assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
+ var counter = 0
+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+ val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
+ String.valueOf(i)).getBytes("utf-8"),
+ Map[String, String]("test-" + i.toString -> "header"))
+ var found = false
+ var j = 0
+ while (j < flattenedBuffer.size && !found) {
+ val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
+ if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
+ eventToVerify.getHeaders.get("test-" + i.toString)
+ .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
+ found = true
+ counter += 1
+ }
+ j += 1
}
- j += 1
}
+ assert(counter === totalEventsPerChannel * channels.size)
}
- assert(counter === totalEventsPerChannel * channels.size)
+ ssc.stop()
}
def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
@@ -234,7 +227,6 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
- clock.advance(batchDuration.milliseconds)
}
null
}