aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2014-08-27 02:39:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-27 02:39:02 -0700
commit6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44 (patch)
treec56c24a614050b66506baf79d964ba6627117669 /external/flume-sink
parent171a41cb034f4ea80f6a3c91a6872970de16a14a (diff)
downloadspark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.gz
spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.bz2
spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.zip
[SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Currently lot of errors get thrown from Avro IPC layer when the dstream or sink is shutdown. This PR cleans it up. Some refactoring is done in the receiver code to put all of the RPC code into a single Try and just recover from that. The sink code has also been cleaned up. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #2065 from harishreedharan/clean-flume-shutdown and squashes the following commits: f93a07c [Hari Shreedharan] Formatting fixes. d7427cc [Hari Shreedharan] More fixes! a0a8852 [Hari Shreedharan] Fix race condition, hopefully! Minor other changes. 4c9ed02 [Hari Shreedharan] Remove unneeded list in Callback handler. Other misc changes. 8fee36f [Hari Shreedharan] Scala-library is required, else maven build fails. Also catch InterruptedException in TxnProcessor. 445e700 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown 87232e0 [Hari Shreedharan] Refactor Flume Input Stream. Clean up code, better error handling. 9001d26 [Hari Shreedharan] Change log level to debug in TransactionProcessor#shutdown method e7b8d82 [Hari Shreedharan] Incorporate review feedback 598efa7 [Hari Shreedharan] Clean up some exception handling code e1027c6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown ed608c8 [Hari Shreedharan] [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Diffstat (limited to 'external/flume-sink')
-rw-r--r--external/flume-sink/pom.xml4
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala56
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala18
3 files changed, 59 insertions, 19 deletions
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index c1e8e65464..b345276b08 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -71,6 +71,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
<!--
Netty explicitly added in test as it has been excluded from
Flume dependency (to avoid runtime problems when running with
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 7da8eb3e35..e77cf7bfa5 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConversions._
+
import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
- private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+ private val sequenceNumberToProcessor =
+ new ConcurrentHashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the
// restart.
@@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)
+ @volatile private var stopped = false
+
/**
* Returns a bunch of events to Spark over Avro RPC.
* @param n Maximum number of events to return in a batch
@@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
override def getEventBatch(n: Int): EventBatch = {
logDebug("Got getEventBatch call from Spark.")
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
- val processor = new TransactionProcessor(channel, sequenceNumber,
- n, transactionTimeout, backOffInterval, this)
- transactionExecutorOpt.foreach(executor => {
- executor.submit(processor)
- })
- // Wait until a batch is available - will be an error if error message is non-empty
- val batch = processor.getEventBatch
- if (!SparkSinkUtils.isErrorBatch(batch)) {
- processorMap.put(sequenceNumber.toString, processor)
- logDebug("Sending event batch with sequence number: " + sequenceNumber)
+ createProcessor(sequenceNumber, n) match {
+ case Some(processor) =>
+ transactionExecutorOpt.foreach(_.submit(processor))
+ // Wait until a batch is available - will be an error if error message is non-empty
+ val batch = processor.getEventBatch
+ if (SparkSinkUtils.isErrorBatch(batch)) {
+ // Remove the processor if it is an error batch since no ACK is sent.
+ removeAndGetProcessor(sequenceNumber)
+ logWarning("Received an error batch - no events were received from channel! ")
+ }
+ batch
+ case None =>
+ new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
+ }
+ }
+
+ private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
+ sequenceNumberToProcessor.synchronized {
+ if (!stopped) {
+ val processor = new TransactionProcessor(
+ channel, seq, n, transactionTimeout, backOffInterval, this)
+ sequenceNumberToProcessor.put(seq, processor)
+ Some(processor)
+ } else {
+ None
+ }
}
- batch
}
/**
@@ -116,7 +136,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* longer tracked and the caller is responsible for that txn processor.
*/
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
- processorMap.remove(sequenceNumber.toString) // The toString is required!
+ sequenceNumberToProcessor.synchronized {
+ sequenceNumberToProcessor.remove(sequenceNumber.toString)
+ }
}
/**
@@ -124,8 +146,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
*/
def shutdown() {
logInfo("Shutting down Spark Avro Callback Handler")
- transactionExecutorOpt.foreach(executor => {
- executor.shutdownNow()
- })
+ sequenceNumberToProcessor.synchronized {
+ stopped = true
+ sequenceNumberToProcessor.values().foreach(_.shutdown())
+ }
+ transactionExecutorOpt.foreach(_.shutdownNow())
}
}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index b9e3c786eb..13f3aa94be 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// succeeded.
@volatile private var batchSuccess = false
+ @volatile private var stopped = false
+
// The transaction that this processor would handle
var txOpt: Option[Transaction] = None
@@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
batchAckLatch.countDown()
}
+ private[flume] def shutdown(): Unit = {
+ logDebug("Shutting down transaction processor")
+ stopped = true
+ }
+
/**
* Populates events into the event batch. If the batch cannot be populated,
* this method will not set the events into the event batch, but it sets an error message.
@@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
var gotEventsInThisTxn = false
var loopCounter: Int = 0
loop.breakable {
- while (events.size() < maxBatchSize
+ while (!stopped && events.size() < maxBatchSize
&& loopCounter < totalAttemptsToRemoveFromChannel) {
loopCounter += 1
Option(channel.take()) match {
@@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
ByteBuffer.wrap(event.getBody)))
gotEventsInThisTxn = true
case None =>
- if (!gotEventsInThisTxn) {
+ if (!gotEventsInThisTxn && !stopped) {
logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
" the current transaction")
TimeUnit.MILLISECONDS.sleep(backOffInterval)
@@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
}
}
}
- if (!gotEventsInThisTxn) {
+ if (!gotEventsInThisTxn && !stopped) {
val msg = "Tried several times, " +
"but did not get any events from the channel!"
logWarning(msg)
@@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
}
})
} catch {
+ case interrupted: InterruptedException =>
+ // Don't pollute logs if the InterruptedException came from this being stopped
+ if (!stopped) {
+ logWarning("Error while processing transaction.", interrupted)
+ }
case e: Exception =>
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)