aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-04-14 09:43:41 +0100
committerSean Owen <sowen@cloudera.com>2016-04-14 09:43:41 +0100
commit6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c (patch)
treedb47cd619d84a7890ff1cacc78a44046ace85633 /external/flume-sink
parent478af2f45595913c9b8f560d13e8d88447486f99 (diff)
downloadspark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.tar.gz
spark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.tar.bz2
spark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.zip
[MINOR][SQL] Remove extra anonymous closure within functional transformations
## What changes were proposed in this pull request? This PR removes extra anonymous closure within functional transformations. For example, ```scala .map(item => { ... }) ``` which can be just simply as below: ```scala .map { item => ... } ``` ## How was this patch tested? Related unit tests and `sbt scalastyle`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #12382 from HyukjinKwon/minor-extra-closers.
Diffstat (limited to 'external/flume-sink')
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala4
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala12
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala12
3 files changed, 14 insertions, 14 deletions
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 719fca0938..8050ec357e 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -129,9 +129,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
- removeAndGetProcessor(sequenceNumber).foreach(processor => {
+ removeAndGetProcessor(sequenceNumber).foreach { processor =>
processor.batchProcessed(success)
- })
+ }
}
/**
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 14dffb15fe..41f27e9376 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -88,23 +88,23 @@ class SparkSink extends AbstractSink with Logging with Configurable {
// dependencies which are being excluded in the build. In practice,
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
- serverOpt.foreach(server => {
+ serverOpt.foreach { server =>
logInfo("Starting Avro server for sink: " + getName)
server.start()
- })
+ }
super.start()
}
override def stop() {
logInfo("Stopping Spark Sink: " + getName)
- handler.foreach(callbackHandler => {
+ handler.foreach { callbackHandler =>
callbackHandler.shutdown()
- })
- serverOpt.foreach(server => {
+ }
+ serverOpt.foreach { server =>
logInfo("Stopping Avro Server for sink: " + getName)
server.close()
server.join()
- })
+ }
blockingLatch.countDown()
super.stop()
}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index b15c2097e5..19e736f016 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -110,7 +110,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
eventBatch.setErrorMsg("Something went wrong. Channel was " +
"unable to create a transaction!")
}
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
tx.begin()
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
val loop = new Breaks
@@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// At this point, the events are available, so fill them into the event batch
eventBatch = new EventBatch("", seqNum, events)
}
- })
+ }
} catch {
case interrupted: InterruptedException =>
// Don't pollute logs if the InterruptedException came from this being stopped
@@ -156,9 +156,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)
try {
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
rollbackAndClose(tx, close = true)
- })
+ }
} finally {
txOpt = None
}
@@ -174,7 +174,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
*/
private def processAckOrNack() {
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
if (batchSuccess) {
try {
logDebug("Committing transaction")
@@ -197,7 +197,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
parent.removeAndGetProcessor(seqNum)
}
- })
+ }
}
/**