aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-31 00:47:56 -0700
committerReynold Xin <rxin@databricks.com>2015-05-31 00:47:56 -0700
commit564bc11e9827915c8652bc06f4bd591809dea4b1 (patch)
tree1a3fe6ff9f78bef3d991897effe554098018a1b4 /external/flume
parent74fdc97c7206c6d715f128ef7c46055e0bb90760 (diff)
downloadspark-564bc11e9827915c8652bc06f4bd591809dea4b1.tar.gz
spark-564bc11e9827915c8652bc06f4bd591809dea4b1.tar.bz2
spark-564bc11e9827915c8652bc06f4bd591809dea4b1.zip
[SPARK-3850] Trim trailing spaces for examples/streaming/yarn.
Author: Reynold Xin <rxin@databricks.com> Closes #6530 from rxin/trim-whitespace-1 and squashes the following commits: 7b7b3a0 [Reynold Xin] Reset again. dc14597 [Reynold Xin] Reset scalastyle. cd556c4 [Reynold Xin] YARN, Kinesis, Flume. 4223fe1 [Reynold Xin] [SPARK-3850] Trim trailing spaces for examples/streaming.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala10
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
3 files changed, 7 insertions, 7 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 60e2994431..1e32a365a1 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -152,9 +152,9 @@ class FlumeReceiver(
val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())
val channelPipelineFactory = new CompressionChannelPipelineFactory()
-
+
new NettyServer(
- responder,
+ responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelineFactory,
@@ -188,12 +188,12 @@ class FlumeReceiver(
override def preferredLocation: Option[String] = Option(host)
- /** A Netty Pipeline factory that will decompress incoming data from
+ /** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
* The compression on the return is required because Flume requires
- * a successful response to indicate it can remove the event/batch
- * from the configured channel
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 92fa5b41be..583e7dca31 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -110,7 +110,7 @@ private[streaming] class FlumePollingReceiver(
}
/**
- * A wrapper around the transceiver and the Avro IPC API.
+ * A wrapper around the transceiver and the Avro IPC API.
* @param transceiver The transceiver to use for communication with Flume
* @param client The client that the callbacks are received on.
*/
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 3d9daeb6e4..c926359987 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -138,7 +138,7 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val status = client.appendBatch(inputEvents.toList)
status should be (avro.Status.OK)
}
-
+
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {