aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala10
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
3 files changed, 7 insertions, 7 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 60e2994431..1e32a365a1 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -152,9 +152,9 @@ class FlumeReceiver(
val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())
val channelPipelineFactory = new CompressionChannelPipelineFactory()
-
+
new NettyServer(
- responder,
+ responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelineFactory,
@@ -188,12 +188,12 @@ class FlumeReceiver(
override def preferredLocation: Option[String] = Option(host)
- /** A Netty Pipeline factory that will decompress incoming data from
+ /** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
* The compression on the return is required because Flume requires
- * a successful response to indicate it can remove the event/batch
- * from the configured channel
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 92fa5b41be..583e7dca31 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -110,7 +110,7 @@ private[streaming] class FlumePollingReceiver(
}
/**
- * A wrapper around the transceiver and the Avro IPC API.
+ * A wrapper around the transceiver and the Avro IPC API.
* @param transceiver The transceiver to use for communication with Flume
* @param client The client that the callbacks are received on.
*/
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 3d9daeb6e4..c926359987 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -138,7 +138,7 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val status = client.appendBatch(inputEvents.toList)
status should be (avro.Status.OK)
}
-
+
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {