From b235e013638685758885842dc3268e9800af3678 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 25 Sep 2014 22:56:43 -0700 Subject: [SPARK-3686][STREAMING] Wait for sink to commit the channel before check... ...ing for the channel size. Author: Hari Shreedharan Closes #2531 from harishreedharan/sparksinksuite-fix and squashes the following commits: 30393c1 [Hari Shreedharan] Use more deterministic method to figure out when batches come in. 6ce9d8b [Hari Shreedharan] [SPARK-3686][STREAMING] Wait for sink to commit the channel before checking for the channel size. --- .../flume/sink/SparkAvroCallbackHandler.scala | 14 +++++++++++++- .../spark/streaming/flume/sink/SparkSink.scala | 10 ++++++++++ .../flume/sink/TransactionProcessor.scala | 12 ++++++++++++ .../streaming/flume/sink/SparkSinkSuite.scala | 22 +++++++++++++--------- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index e77cf7bfa5..3c656a381b 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.streaming.flume.sink -import java.util.concurrent.{ConcurrentHashMap, Executors} +import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors} import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConversions._ @@ -58,8 +58,12 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha private val seqBase = RandomStringUtils.randomAlphanumeric(8) private val seqCounter = new AtomicLong(0) + @volatile private var stopped = false + @volatile private var isTest = false + private var testLatch: CountDownLatch = null + /** * Returns a bunch of events to Spark over Avro RPC. * @param n Maximum number of events to return in a batch @@ -90,6 +94,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha val processor = new TransactionProcessor( channel, seq, n, transactionTimeout, backOffInterval, this) sequenceNumberToProcessor.put(seq, processor) + if (isTest) { + processor.countDownWhenBatchAcked(testLatch) + } Some(processor) } else { None @@ -141,6 +148,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha } } + private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) { + testLatch = latch + isTest = true + } + /** * Shuts down the executor used to process transactions. */ diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala index 98ae7d783a..14dffb15fe 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala @@ -138,6 +138,16 @@ class SparkSink extends AbstractSink with Logging with Configurable { throw new RuntimeException("Server was not started!") ) } + + /** + * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each + * batch is received. The test can simply call await on this latch till the expected number of + * batches are received. + * @param latch + */ + private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) { + handler.foreach(_.countDownWhenBatchAcked(latch)) + } } /** diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala index 13f3aa94be..ea45b14294 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala @@ -62,6 +62,10 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, @volatile private var stopped = false + @volatile private var isTest = false + + private var testLatch: CountDownLatch = null + // The transaction that this processor would handle var txOpt: Option[Transaction] = None @@ -182,6 +186,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, rollbackAndClose(tx, close = false) // tx will be closed later anyway } finally { tx.close() + if (isTest) { + testLatch.countDown() + } } } else { logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.") @@ -237,4 +244,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, processAckOrNack() null } + + private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) { + testLatch = latch + isTest = true + } } diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala index 75a6668c62..a2b2cc6149 100644 --- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala +++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala @@ -38,7 +38,7 @@ class SparkSinkSuite extends FunSuite { val channelCapacity = 5000 test("Success with ack") { - val (channel, sink) = initializeChannelAndSink() + val (channel, sink, latch) = initializeChannelAndSink() channel.start() sink.start() @@ -51,6 +51,7 @@ class SparkSinkSuite extends FunSuite { val events = client.getEventBatch(1000) client.ack(events.getSequenceNumber) assert(events.getEvents.size() === 1000) + latch.await(1, TimeUnit.SECONDS) assertChannelIsEmpty(channel) sink.stop() channel.stop() @@ -58,7 +59,7 @@ class SparkSinkSuite extends FunSuite { } test("Failure with nack") { - val (channel, sink) = initializeChannelAndSink() + val (channel, sink, latch) = initializeChannelAndSink() channel.start() sink.start() putEvents(channel, eventsPerBatch) @@ -70,6 +71,7 @@ class SparkSinkSuite extends FunSuite { val events = client.getEventBatch(1000) assert(events.getEvents.size() === 1000) client.nack(events.getSequenceNumber) + latch.await(1, TimeUnit.SECONDS) assert(availableChannelSlots(channel) === 4000) sink.stop() channel.stop() @@ -77,7 +79,7 @@ class SparkSinkSuite extends FunSuite { } test("Failure with timeout") { - val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig + val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig .CONF_TRANSACTION_TIMEOUT -> 1.toString)) channel.start() sink.start() @@ -88,7 +90,7 @@ class SparkSinkSuite extends FunSuite { val (transceiver, client) = getTransceiverAndClient(address, 1)(0) val events = client.getEventBatch(1000) assert(events.getEvents.size() === 1000) - Thread.sleep(1000) + latch.await(1, TimeUnit.SECONDS) assert(availableChannelSlots(channel) === 4000) sink.stop() channel.stop() @@ -106,7 +108,7 @@ class SparkSinkSuite extends FunSuite { def testMultipleConsumers(failSome: Boolean): Unit = { implicit val executorContext = ExecutionContext .fromExecutorService(Executors.newFixedThreadPool(5)) - val (channel, sink) = initializeChannelAndSink() + val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5) channel.start() sink.start() (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch)) @@ -136,7 +138,7 @@ class SparkSinkSuite extends FunSuite { } }) batchCounter.await() - TimeUnit.SECONDS.sleep(1) // Allow the sink to commit the transactions. + latch.await(1, TimeUnit.SECONDS) executorContext.shutdown() if(failSome) { assert(availableChannelSlots(channel) === 3000) @@ -148,8 +150,8 @@ class SparkSinkSuite extends FunSuite { transceiversAndClients.foreach(x => x._1.close()) } - private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty): (MemoryChannel, - SparkSink) = { + private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty, + batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = { val channel = new MemoryChannel() val channelContext = new Context() @@ -165,7 +167,9 @@ class SparkSinkSuite extends FunSuite { sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString) sink.configure(sinkContext) sink.setChannel(channel) - (channel, sink) + val latch = new CountDownLatch(batchCounter) + sink.countdownWhenBatchReceived(latch) + (channel, sink, latch) } private def putEvents(ch: MemoryChannel, count: Int): Unit = { -- cgit v1.2.3