aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume-sink')
-rw-r--r--external/flume-sink/pom.xml4
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala56
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala18
3 files changed, 59 insertions, 19 deletions
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index c1e8e65464..b345276b08 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -71,6 +71,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
<!--
Netty explicitly added in test as it has been excluded from
Flume dependency (to avoid runtime problems when running with
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 7da8eb3e35..e77cf7bfa5 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConversions._
+
import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
- private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+ private val sequenceNumberToProcessor =
+ new ConcurrentHashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the
// restart.
@@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)
+ @volatile private var stopped = false
+
/**
* Returns a bunch of events to Spark over Avro RPC.
* @param n Maximum number of events to return in a batch
@@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
override def getEventBatch(n: Int): EventBatch = {
logDebug("Got getEventBatch call from Spark.")
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
- val processor = new TransactionProcessor(channel, sequenceNumber,
- n, transactionTimeout, backOffInterval, this)
- transactionExecutorOpt.foreach(executor => {
- executor.submit(processor)
- })
- // Wait until a batch is available - will be an error if error message is non-empty
- val batch = processor.getEventBatch
- if (!SparkSinkUtils.isErrorBatch(batch)) {
- processorMap.put(sequenceNumber.toString, processor)
- logDebug("Sending event batch with sequence number: " + sequenceNumber)
+ createProcessor(sequenceNumber, n) match {
+ case Some(processor) =>
+ transactionExecutorOpt.foreach(_.submit(processor))
+ // Wait until a batch is available - will be an error if error message is non-empty
+ val batch = processor.getEventBatch
+ if (SparkSinkUtils.isErrorBatch(batch)) {
+ // Remove the processor if it is an error batch since no ACK is sent.
+ removeAndGetProcessor(sequenceNumber)
+ logWarning("Received an error batch - no events were received from channel! ")
+ }
+ batch
+ case None =>
+ new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
+ }
+ }
+
+ private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
+ sequenceNumberToProcessor.synchronized {
+ if (!stopped) {
+ val processor = new TransactionProcessor(
+ channel, seq, n, transactionTimeout, backOffInterval, this)
+ sequenceNumberToProcessor.put(seq, processor)
+ Some(processor)
+ } else {
+ None
+ }
}
- batch
}
/**
@@ -116,7 +136,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* longer tracked and the caller is responsible for that txn processor.
*/
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
- processorMap.remove(sequenceNumber.toString) // The toString is required!
+ sequenceNumberToProcessor.synchronized {
+ sequenceNumberToProcessor.remove(sequenceNumber.toString)
+ }
}
/**
@@ -124,8 +146,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
*/
def shutdown() {
logInfo("Shutting down Spark Avro Callback Handler")
- transactionExecutorOpt.foreach(executor => {
- executor.shutdownNow()
- })
+ sequenceNumberToProcessor.synchronized {
+ stopped = true
+ sequenceNumberToProcessor.values().foreach(_.shutdown())
+ }
+ transactionExecutorOpt.foreach(_.shutdownNow())
}
}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index b9e3c786eb..13f3aa94be 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// succeeded.
@volatile private var batchSuccess = false
+ @volatile private var stopped = false
+
// The transaction that this processor would handle
var txOpt: Option[Transaction] = None
@@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
batchAckLatch.countDown()
}
+ private[flume] def shutdown(): Unit = {
+ logDebug("Shutting down transaction processor")
+ stopped = true
+ }
+
/**
* Populates events into the event batch. If the batch cannot be populated,
* this method will not set the events into the event batch, but it sets an error message.
@@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
var gotEventsInThisTxn = false
var loopCounter: Int = 0
loop.breakable {
- while (events.size() < maxBatchSize
+ while (!stopped && events.size() < maxBatchSize
&& loopCounter < totalAttemptsToRemoveFromChannel) {
loopCounter += 1
Option(channel.take()) match {
@@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
ByteBuffer.wrap(event.getBody)))
gotEventsInThisTxn = true
case None =>
- if (!gotEventsInThisTxn) {
+ if (!gotEventsInThisTxn && !stopped) {
logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
" the current transaction")
TimeUnit.MILLISECONDS.sleep(backOffInterval)
@@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
}
}
}
- if (!gotEventsInThisTxn) {
+ if (!gotEventsInThisTxn && !stopped) {
val msg = "Tried several times, " +
"but did not get any events from the channel!"
logWarning(msg)
@@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
}
})
} catch {
+ case interrupted: InterruptedException =>
+ // Don't pollute logs if the InterruptedException came from this being stopped
+ if (!stopped) {
+ logWarning("Error while processing transaction.", interrupted)
+ }
case e: Exception =>
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)