aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume-sink')
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala4
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala12
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala12
3 files changed, 14 insertions, 14 deletions
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 719fca0938..8050ec357e 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -129,9 +129,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
- removeAndGetProcessor(sequenceNumber).foreach(processor => {
+ removeAndGetProcessor(sequenceNumber).foreach { processor =>
processor.batchProcessed(success)
- })
+ }
}
/**
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 14dffb15fe..41f27e9376 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -88,23 +88,23 @@ class SparkSink extends AbstractSink with Logging with Configurable {
// dependencies which are being excluded in the build. In practice,
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
- serverOpt.foreach(server => {
+ serverOpt.foreach { server =>
logInfo("Starting Avro server for sink: " + getName)
server.start()
- })
+ }
super.start()
}
override def stop() {
logInfo("Stopping Spark Sink: " + getName)
- handler.foreach(callbackHandler => {
+ handler.foreach { callbackHandler =>
callbackHandler.shutdown()
- })
- serverOpt.foreach(server => {
+ }
+ serverOpt.foreach { server =>
logInfo("Stopping Avro Server for sink: " + getName)
server.close()
server.join()
- })
+ }
blockingLatch.countDown()
super.stop()
}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index b15c2097e5..19e736f016 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -110,7 +110,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
eventBatch.setErrorMsg("Something went wrong. Channel was " +
"unable to create a transaction!")
}
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
tx.begin()
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
val loop = new Breaks
@@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// At this point, the events are available, so fill them into the event batch
eventBatch = new EventBatch("", seqNum, events)
}
- })
+ }
} catch {
case interrupted: InterruptedException =>
// Don't pollute logs if the InterruptedException came from this being stopped
@@ -156,9 +156,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)
try {
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
rollbackAndClose(tx, close = true)
- })
+ }
} finally {
txOpt = None
}
@@ -174,7 +174,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
*/
private def processAckOrNack() {
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
if (batchSuccess) {
try {
logDebug("Committing transaction")
@@ -197,7 +197,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
parent.removeAndGetProcessor(seqNum)
}
- })
+ }
}
/**