aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-04-14 09:43:41 +0100
committerSean Owen <sowen@cloudera.com>2016-04-14 09:43:41 +0100
commit6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c (patch)
treedb47cd619d84a7890ff1cacc78a44046ace85633 /external/flume
parent478af2f45595913c9b8f560d13e8d88447486f99 (diff)
downloadspark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.tar.gz
spark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.tar.bz2
spark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.zip
[MINOR][SQL] Remove extra anonymous closure within functional transformations
## What changes were proposed in this pull request? This PR removes extra anonymous closure within functional transformations. For example, ```scala .map(item => { ... }) ``` which can be just simply as below: ```scala .map { item => ... } ``` ## How was this patch tested? Related unit tests and `sbt scalastyle`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #12382 from HyukjinKwon/minor-extra-closers.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala4
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala4
2 files changed, 4 insertions, 4 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 250bfc1718..54565840fa 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -79,11 +79,11 @@ private[streaming] class FlumePollingReceiver(
override def onStart(): Unit = {
// Create the connections to each Flume agent.
- addresses.foreach(host => {
+ addresses.foreach { host =>
val transceiver = new NettyTransceiver(host, channelFactory)
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
connections.add(new FlumeConnection(transceiver, client))
- })
+ }
for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads..")
// Threads that pull data from Flume.
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 1a96df6e94..6a4dafb8ed 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -123,9 +123,9 @@ private[flume] class PollingFlumeTestUtils {
val latch = new CountDownLatch(batchCount * channels.size)
sinks.foreach(_.countdownWhenBatchReceived(latch))
- channels.foreach(channel => {
+ channels.foreach { channel =>
executorCompletion.submit(new TxnSubmitter(channel))
- })
+ }
for (i <- 0 until channels.size) {
executorCompletion.take()