diff options
author | Hari Shreedharan <hshreedharan@apache.org> | 2014-08-27 02:39:02 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-08-27 02:39:02 -0700 |
commit | 6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44 (patch) | |
tree | c56c24a614050b66506baf79d964ba6627117669 /external/flume-sink | |
parent | 171a41cb034f4ea80f6a3c91a6872970de16a14a (diff) | |
download | spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.gz spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.bz2 spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.zip |
[SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Currently lot of errors get thrown from Avro IPC layer when the dstream
or sink is shutdown. This PR cleans it up. Some refactoring is done in the
receiver code to put all of the RPC code into a single Try and just recover
from that. The sink code has also been cleaned up.
Author: Hari Shreedharan <hshreedharan@apache.org>
Closes #2065 from harishreedharan/clean-flume-shutdown and squashes the following commits:
f93a07c [Hari Shreedharan] Formatting fixes.
d7427cc [Hari Shreedharan] More fixes!
a0a8852 [Hari Shreedharan] Fix race condition, hopefully! Minor other changes.
4c9ed02 [Hari Shreedharan] Remove unneeded list in Callback handler. Other misc changes.
8fee36f [Hari Shreedharan] Scala-library is required, else maven build fails. Also catch InterruptedException in TxnProcessor.
445e700 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown
87232e0 [Hari Shreedharan] Refactor Flume Input Stream. Clean up code, better error handling.
9001d26 [Hari Shreedharan] Change log level to debug in TransactionProcessor#shutdown method
e7b8d82 [Hari Shreedharan] Incorporate review feedback
598efa7 [Hari Shreedharan] Clean up some exception handling code
e1027c6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown
ed608c8 [Hari Shreedharan] [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Diffstat (limited to 'external/flume-sink')
3 files changed, 59 insertions, 19 deletions
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index c1e8e65464..b345276b08 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -71,6 +71,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + <dependency> <!-- Netty explicitly added in test as it has been excluded from Flume dependency (to avoid runtime problems when running with diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index 7da8eb3e35..e77cf7bfa5 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConversions._ + import org.apache.flume.Channel import org.apache.commons.lang.RandomStringUtils import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Spark Sink Processor Thread - %d").build())) - private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() + private val sequenceNumberToProcessor = + new ConcurrentHashMap[CharSequence, TransactionProcessor]() // This sink will not persist sequence numbers and reuses them if it gets restarted. // So it is possible to commit a transaction which may have been meant for the sink before the // restart. @@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha private val seqBase = RandomStringUtils.randomAlphanumeric(8) private val seqCounter = new AtomicLong(0) + @volatile private var stopped = false + /** * Returns a bunch of events to Spark over Avro RPC. * @param n Maximum number of events to return in a batch @@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha override def getEventBatch(n: Int): EventBatch = { logDebug("Got getEventBatch call from Spark.") val sequenceNumber = seqBase + seqCounter.incrementAndGet() - val processor = new TransactionProcessor(channel, sequenceNumber, - n, transactionTimeout, backOffInterval, this) - transactionExecutorOpt.foreach(executor => { - executor.submit(processor) - }) - // Wait until a batch is available - will be an error if error message is non-empty - val batch = processor.getEventBatch - if (!SparkSinkUtils.isErrorBatch(batch)) { - processorMap.put(sequenceNumber.toString, processor) - logDebug("Sending event batch with sequence number: " + sequenceNumber) + createProcessor(sequenceNumber, n) match { + case Some(processor) => + transactionExecutorOpt.foreach(_.submit(processor)) + // Wait until a batch is available - will be an error if error message is non-empty + val batch = processor.getEventBatch + if (SparkSinkUtils.isErrorBatch(batch)) { + // Remove the processor if it is an error batch since no ACK is sent. + removeAndGetProcessor(sequenceNumber) + logWarning("Received an error batch - no events were received from channel! ") + } + batch + case None => + new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList()) + } + } + + private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = { + sequenceNumberToProcessor.synchronized { + if (!stopped) { + val processor = new TransactionProcessor( + channel, seq, n, transactionTimeout, backOffInterval, this) + sequenceNumberToProcessor.put(seq, processor) + Some(processor) + } else { + None + } } - batch } /** @@ -116,7 +136,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha * longer tracked and the caller is responsible for that txn processor. */ private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { - processorMap.remove(sequenceNumber.toString) // The toString is required! + sequenceNumberToProcessor.synchronized { + sequenceNumberToProcessor.remove(sequenceNumber.toString) + } } /** @@ -124,8 +146,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha */ def shutdown() { logInfo("Shutting down Spark Avro Callback Handler") - transactionExecutorOpt.foreach(executor => { - executor.shutdownNow() - }) + sequenceNumberToProcessor.synchronized { + stopped = true + sequenceNumberToProcessor.values().foreach(_.shutdown()) + } + transactionExecutorOpt.foreach(_.shutdownNow()) } } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala index b9e3c786eb..13f3aa94be 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala @@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, // succeeded. @volatile private var batchSuccess = false + @volatile private var stopped = false + // The transaction that this processor would handle var txOpt: Option[Transaction] = None @@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, batchAckLatch.countDown() } + private[flume] def shutdown(): Unit = { + logDebug("Shutting down transaction processor") + stopped = true + } + /** * Populates events into the event batch. If the batch cannot be populated, * this method will not set the events into the event batch, but it sets an error message. @@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, var gotEventsInThisTxn = false var loopCounter: Int = 0 loop.breakable { - while (events.size() < maxBatchSize + while (!stopped && events.size() < maxBatchSize && loopCounter < totalAttemptsToRemoveFromChannel) { loopCounter += 1 Option(channel.take()) match { @@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, ByteBuffer.wrap(event.getBody))) gotEventsInThisTxn = true case None => - if (!gotEventsInThisTxn) { + if (!gotEventsInThisTxn && !stopped) { logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" + " the current transaction") TimeUnit.MILLISECONDS.sleep(backOffInterval) @@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, } } } - if (!gotEventsInThisTxn) { + if (!gotEventsInThisTxn && !stopped) { val msg = "Tried several times, " + "but did not get any events from the channel!" logWarning(msg) @@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, } }) } catch { + case interrupted: InterruptedException => + // Don't pollute logs if the InterruptedException came from this being stopped + if (!stopped) { + logWarning("Error while processing transaction.", interrupted) + } case e: Exception => logWarning("Error while processing transaction.", e) eventBatch.setErrorMsg(e.getMessage) |