aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2014-09-25 22:56:43 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-25 22:56:43 -0700
commitb235e013638685758885842dc3268e9800af3678 (patch)
tree0810d346ef6d6e270cd0606ba6f845dbca072c63 /external/flume-sink
parent86bce764983f2b14e1bd87fc3f4f938f7a217e1b (diff)
downloadspark-b235e013638685758885842dc3268e9800af3678.tar.gz
spark-b235e013638685758885842dc3268e9800af3678.tar.bz2
spark-b235e013638685758885842dc3268e9800af3678.zip
[SPARK-3686][STREAMING] Wait for sink to commit the channel before check...
...ing for the channel size. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #2531 from harishreedharan/sparksinksuite-fix and squashes the following commits: 30393c1 [Hari Shreedharan] Use more deterministic method to figure out when batches come in. 6ce9d8b [Hari Shreedharan] [SPARK-3686][STREAMING] Wait for sink to commit the channel before checking for the channel size.
Diffstat (limited to 'external/flume-sink')
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala14
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala10
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala12
-rw-r--r--external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala22
4 files changed, 48 insertions, 10 deletions
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index e77cf7bfa5..3c656a381b 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.flume.sink
-import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConversions._
@@ -58,8 +58,12 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)
+
@volatile private var stopped = false
+ @volatile private var isTest = false
+ private var testLatch: CountDownLatch = null
+
/**
* Returns a bunch of events to Spark over Avro RPC.
* @param n Maximum number of events to return in a batch
@@ -90,6 +94,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val processor = new TransactionProcessor(
channel, seq, n, transactionTimeout, backOffInterval, this)
sequenceNumberToProcessor.put(seq, processor)
+ if (isTest) {
+ processor.countDownWhenBatchAcked(testLatch)
+ }
Some(processor)
} else {
None
@@ -141,6 +148,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
}
}
+ private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+ testLatch = latch
+ isTest = true
+ }
+
/**
* Shuts down the executor used to process transactions.
*/
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 98ae7d783a..14dffb15fe 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -138,6 +138,16 @@ class SparkSink extends AbstractSink with Logging with Configurable {
throw new RuntimeException("Server was not started!")
)
}
+
+ /**
+ * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
+ * batch is received. The test can simply call await on this latch till the expected number of
+ * batches are received.
+ * @param latch
+ */
+ private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
+ handler.foreach(_.countDownWhenBatchAcked(latch))
+ }
}
/**
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index 13f3aa94be..ea45b14294 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -62,6 +62,10 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
@volatile private var stopped = false
+ @volatile private var isTest = false
+
+ private var testLatch: CountDownLatch = null
+
// The transaction that this processor would handle
var txOpt: Option[Transaction] = None
@@ -182,6 +186,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
rollbackAndClose(tx, close = false) // tx will be closed later anyway
} finally {
tx.close()
+ if (isTest) {
+ testLatch.countDown()
+ }
}
} else {
logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
@@ -237,4 +244,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
processAckOrNack()
null
}
+
+ private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+ testLatch = latch
+ isTest = true
+ }
}
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
index 75a6668c62..a2b2cc6149 100644
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -38,7 +38,7 @@ class SparkSinkSuite extends FunSuite {
val channelCapacity = 5000
test("Success with ack") {
- val (channel, sink) = initializeChannelAndSink()
+ val (channel, sink, latch) = initializeChannelAndSink()
channel.start()
sink.start()
@@ -51,6 +51,7 @@ class SparkSinkSuite extends FunSuite {
val events = client.getEventBatch(1000)
client.ack(events.getSequenceNumber)
assert(events.getEvents.size() === 1000)
+ latch.await(1, TimeUnit.SECONDS)
assertChannelIsEmpty(channel)
sink.stop()
channel.stop()
@@ -58,7 +59,7 @@ class SparkSinkSuite extends FunSuite {
}
test("Failure with nack") {
- val (channel, sink) = initializeChannelAndSink()
+ val (channel, sink, latch) = initializeChannelAndSink()
channel.start()
sink.start()
putEvents(channel, eventsPerBatch)
@@ -70,6 +71,7 @@ class SparkSinkSuite extends FunSuite {
val events = client.getEventBatch(1000)
assert(events.getEvents.size() === 1000)
client.nack(events.getSequenceNumber)
+ latch.await(1, TimeUnit.SECONDS)
assert(availableChannelSlots(channel) === 4000)
sink.stop()
channel.stop()
@@ -77,7 +79,7 @@ class SparkSinkSuite extends FunSuite {
}
test("Failure with timeout") {
- val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig
+ val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
.CONF_TRANSACTION_TIMEOUT -> 1.toString))
channel.start()
sink.start()
@@ -88,7 +90,7 @@ class SparkSinkSuite extends FunSuite {
val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
val events = client.getEventBatch(1000)
assert(events.getEvents.size() === 1000)
- Thread.sleep(1000)
+ latch.await(1, TimeUnit.SECONDS)
assert(availableChannelSlots(channel) === 4000)
sink.stop()
channel.stop()
@@ -106,7 +108,7 @@ class SparkSinkSuite extends FunSuite {
def testMultipleConsumers(failSome: Boolean): Unit = {
implicit val executorContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(5))
- val (channel, sink) = initializeChannelAndSink()
+ val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
channel.start()
sink.start()
(1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
@@ -136,7 +138,7 @@ class SparkSinkSuite extends FunSuite {
}
})
batchCounter.await()
- TimeUnit.SECONDS.sleep(1) // Allow the sink to commit the transactions.
+ latch.await(1, TimeUnit.SECONDS)
executorContext.shutdown()
if(failSome) {
assert(availableChannelSlots(channel) === 3000)
@@ -148,8 +150,8 @@ class SparkSinkSuite extends FunSuite {
transceiversAndClients.foreach(x => x._1.close())
}
- private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty): (MemoryChannel,
- SparkSink) = {
+ private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
+ batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
val channel = new MemoryChannel()
val channelContext = new Context()
@@ -165,7 +167,9 @@ class SparkSinkSuite extends FunSuite {
sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
sink.configure(sinkContext)
sink.setChannel(channel)
- (channel, sink)
+ val latch = new CountDownLatch(batchCounter)
+ sink.countdownWhenBatchReceived(latch)
+ (channel, sink, latch)
}
private def putEvents(ch: MemoryChannel, count: Int): Unit = {