aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala')
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala10
1 files changed, 10 insertions, 0 deletions
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 98ae7d783a..14dffb15fe 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -138,6 +138,16 @@ class SparkSink extends AbstractSink with Logging with Configurable {
throw new RuntimeException("Server was not started!")
)
}
+
+ /**
+ * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
+ * batch is received. The test can simply call await on this latch till the expected number of
+ * batches are received.
+ * @param latch
+ */
+ private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
+ handler.foreach(_.countDownWhenBatchAcked(latch))
+ }
}
/**