aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2014-08-27 02:39:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-27 02:39:02 -0700
commit6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44 (patch)
treec56c24a614050b66506baf79d964ba6627117669
parent171a41cb034f4ea80f6a3c91a6872970de16a14a (diff)
downloadspark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.gz
spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.bz2
spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.zip
[SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Currently lot of errors get thrown from Avro IPC layer when the dstream or sink is shutdown. This PR cleans it up. Some refactoring is done in the receiver code to put all of the RPC code into a single Try and just recover from that. The sink code has also been cleaned up. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #2065 from harishreedharan/clean-flume-shutdown and squashes the following commits: f93a07c [Hari Shreedharan] Formatting fixes. d7427cc [Hari Shreedharan] More fixes! a0a8852 [Hari Shreedharan] Fix race condition, hopefully! Minor other changes. 4c9ed02 [Hari Shreedharan] Remove unneeded list in Callback handler. Other misc changes. 8fee36f [Hari Shreedharan] Scala-library is required, else maven build fails. Also catch InterruptedException in TxnProcessor. 445e700 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown 87232e0 [Hari Shreedharan] Refactor Flume Input Stream. Clean up code, better error handling. 9001d26 [Hari Shreedharan] Change log level to debug in TransactionProcessor#shutdown method e7b8d82 [Hari Shreedharan] Incorporate review feedback 598efa7 [Hari Shreedharan] Clean up some exception handling code e1027c6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown ed608c8 [Hari Shreedharan] [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
-rw-r--r--external/flume-sink/pom.xml4
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala56
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala18
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala167
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala77
5 files changed, 236 insertions, 86 deletions
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index c1e8e65464..b345276b08 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -71,6 +71,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
<!--
Netty explicitly added in test as it has been excluded from
Flume dependency (to avoid runtime problems when running with
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 7da8eb3e35..e77cf7bfa5 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConversions._
+
import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
- private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+ private val sequenceNumberToProcessor =
+ new ConcurrentHashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the
// restart.
@@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)
+ @volatile private var stopped = false
+
/**
* Returns a bunch of events to Spark over Avro RPC.
* @param n Maximum number of events to return in a batch
@@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
override def getEventBatch(n: Int): EventBatch = {
logDebug("Got getEventBatch call from Spark.")
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
- val processor = new TransactionProcessor(channel, sequenceNumber,
- n, transactionTimeout, backOffInterval, this)
- transactionExecutorOpt.foreach(executor => {
- executor.submit(processor)
- })
- // Wait until a batch is available - will be an error if error message is non-empty
- val batch = processor.getEventBatch
- if (!SparkSinkUtils.isErrorBatch(batch)) {
- processorMap.put(sequenceNumber.toString, processor)
- logDebug("Sending event batch with sequence number: " + sequenceNumber)
+ createProcessor(sequenceNumber, n) match {
+ case Some(processor) =>
+ transactionExecutorOpt.foreach(_.submit(processor))
+ // Wait until a batch is available - will be an error if error message is non-empty
+ val batch = processor.getEventBatch
+ if (SparkSinkUtils.isErrorBatch(batch)) {
+ // Remove the processor if it is an error batch since no ACK is sent.
+ removeAndGetProcessor(sequenceNumber)
+ logWarning("Received an error batch - no events were received from channel! ")
+ }
+ batch
+ case None =>
+ new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
+ }
+ }
+
+ private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
+ sequenceNumberToProcessor.synchronized {
+ if (!stopped) {
+ val processor = new TransactionProcessor(
+ channel, seq, n, transactionTimeout, backOffInterval, this)
+ sequenceNumberToProcessor.put(seq, processor)
+ Some(processor)
+ } else {
+ None
+ }
}
- batch
}
/**
@@ -116,7 +136,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* longer tracked and the caller is responsible for that txn processor.
*/
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
- processorMap.remove(sequenceNumber.toString) // The toString is required!
+ sequenceNumberToProcessor.synchronized {
+ sequenceNumberToProcessor.remove(sequenceNumber.toString)
+ }
}
/**
@@ -124,8 +146,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
*/
def shutdown() {
logInfo("Shutting down Spark Avro Callback Handler")
- transactionExecutorOpt.foreach(executor => {
- executor.shutdownNow()
- })
+ sequenceNumberToProcessor.synchronized {
+ stopped = true
+ sequenceNumberToProcessor.values().foreach(_.shutdown())
+ }
+ transactionExecutorOpt.foreach(_.shutdownNow())
}
}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index b9e3c786eb..13f3aa94be 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// succeeded.
@volatile private var batchSuccess = false
+ @volatile private var stopped = false
+
// The transaction that this processor would handle
var txOpt: Option[Transaction] = None
@@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
batchAckLatch.countDown()
}
+ private[flume] def shutdown(): Unit = {
+ logDebug("Shutting down transaction processor")
+ stopped = true
+ }
+
/**
* Populates events into the event batch. If the batch cannot be populated,
* this method will not set the events into the event batch, but it sets an error message.
@@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
var gotEventsInThisTxn = false
var loopCounter: Int = 0
loop.breakable {
- while (events.size() < maxBatchSize
+ while (!stopped && events.size() < maxBatchSize
&& loopCounter < totalAttemptsToRemoveFromChannel) {
loopCounter += 1
Option(channel.take()) match {
@@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
ByteBuffer.wrap(event.getBody)))
gotEventsInThisTxn = true
case None =>
- if (!gotEventsInThisTxn) {
+ if (!gotEventsInThisTxn && !stopped) {
logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
" the current transaction")
TimeUnit.MILLISECONDS.sleep(backOffInterval)
@@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
}
}
}
- if (!gotEventsInThisTxn) {
+ if (!gotEventsInThisTxn && !stopped) {
val msg = "Tried several times, " +
"but did not get any events from the channel!"
logWarning(msg)
@@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
}
})
} catch {
+ case interrupted: InterruptedException =>
+ // Don't pollute logs if the InterruptedException came from this being stopped
+ if (!stopped) {
+ logWarning("Error while processing transaction.", interrupted)
+ }
case e: Exception =>
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
new file mode 100644
index 0000000000..88cc2aa3bf
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
+ * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
+ * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
+ *
+ * @param receiver The receiver that owns this instance.
+ */
+
+private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
+ Logging {
+
+ def run(): Unit = {
+ while (!receiver.isStopped()) {
+ val connection = receiver.getConnections.poll()
+ val client = connection.client
+ var batchReceived = false
+ var seq: CharSequence = null
+ try {
+ getBatch(client) match {
+ case Some(eventBatch) =>
+ batchReceived = true
+ seq = eventBatch.getSequenceNumber
+ val events = toSparkFlumeEvents(eventBatch.getEvents)
+ if (store(events)) {
+ sendAck(client, seq)
+ } else {
+ sendNack(batchReceived, client, seq)
+ }
+ case None =>
+ }
+ } catch {
+ case e: Exception =>
+ Throwables.getRootCause(e) match {
+ // If the cause was an InterruptedException, then check if the receiver is stopped -
+ // if yes, just break out of the loop. Else send a Nack and log a warning.
+ // In the unlikely case, the cause was not an Exception,
+ // then just throw it out and exit.
+ case interrupted: InterruptedException =>
+ if (!receiver.isStopped()) {
+ logWarning("Interrupted while receiving data from Flume", interrupted)
+ sendNack(batchReceived, client, seq)
+ }
+ case exception: Exception =>
+ logWarning("Error while receiving data from Flume", exception)
+ sendNack(batchReceived, client, seq)
+ }
+ } finally {
+ receiver.getConnections.add(connection)
+ }
+ }
+ }
+
+ /**
+ * Gets a batch of events from the specified client. This method does not handle any exceptions
+ * which will be propogated to the caller.
+ * @param client Client to get events from
+ * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
+ */
+ private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
+ val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
+ if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+ // No error, proceed with processing data
+ logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
+ s"number: ${eventBatch.getSequenceNumber}")
+ Some(eventBatch)
+ } else {
+ logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
+ eventBatch.getErrorMsg)
+ None
+ }
+ }
+
+ /**
+ * Store the events in the buffer to Spark. This method will not propogate any exceptions,
+ * but will propogate any other errors.
+ * @param buffer The buffer to store
+ * @return true if the data was stored without any exception being thrown, else false
+ */
+ private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
+ try {
+ receiver.store(buffer)
+ true
+ } catch {
+ case e: Exception =>
+ logWarning("Error while attempting to store data received from Flume", e)
+ false
+ }
+ }
+
+ /**
+ * Send an ack to the client for the sequence number. This method does not handle any exceptions
+ * which will be propagated to the caller.
+ * @param client client to send the ack to
+ * @param seq sequence number of the batch to be ack-ed.
+ * @return
+ */
+ private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
+ logDebug("Sending ack for sequence number: " + seq)
+ client.ack(seq)
+ logDebug("Ack sent for sequence number: " + seq)
+ }
+
+ /**
+ * This method sends a Nack if a batch was received to the client with the given sequence
+ * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
+ * to handle it.
+ * @param batchReceived true if a batch was received. If this is false, no nack is sent
+ * @param client The client to which the nack should be sent
+ * @param seq The sequence number of the batch that is being nack-ed.
+ */
+ private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
+ seq: CharSequence): Unit = {
+ if (batchReceived) {
+ // Let Flume know that the events need to be pushed back into the channel.
+ logDebug("Sending nack for sequence number: " + seq)
+ client.nack(seq) // If the agent is down, even this could fail and throw
+ logDebug("Nack sent for sequence number: " + seq)
+ }
+ }
+
+ /**
+ * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
+ * @param events - Events to convert to SparkFlumeEvents
+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
+ */
+ private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
+ ArrayBuffer[SparkFlumeEvent] = {
+ // Convert each Flume event to a serializable SparkFlumeEvent
+ val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+ var j = 0
+ while (j < events.size()) {
+ val event = events(j)
+ val sparkFlumeEvent = new SparkFlumeEvent()
+ sparkFlumeEvent.event.setBody(event.getBody)
+ sparkFlumeEvent.event.setHeaders(event.getHeaders)
+ buffer += sparkFlumeEvent
+ j += 1
+ }
+ buffer
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 148262bb67..92fa5b41be 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -18,10 +18,9 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
+import java.util.concurrent.{LinkedBlockingQueue, Executors}
import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -86,61 +85,9 @@ private[streaming] class FlumePollingReceiver(
connections.add(new FlumeConnection(transceiver, client))
})
for (i <- 0 until parallelism) {
- logInfo("Starting Flume Polling Receiver worker threads starting..")
+ logInfo("Starting Flume Polling Receiver worker threads..")
// Threads that pull data from Flume.
- receiverExecutor.submit(new Runnable {
- override def run(): Unit = {
- while (true) {
- val connection = connections.poll()
- val client = connection.client
- try {
- val eventBatch = client.getEventBatch(maxBatchSize)
- if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
- // No error, proceed with processing data
- val seq = eventBatch.getSequenceNumber
- val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
- logDebug(
- "Received batch of " + events.size() + " events with sequence number: " + seq)
- try {
- // Convert each Flume event to a serializable SparkFlumeEvent
- val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
- var j = 0
- while (j < events.size()) {
- buffer += toSparkFlumeEvent(events(j))
- j += 1
- }
- store(buffer)
- logDebug("Sending ack for sequence number: " + seq)
- // Send an ack to Flume so that Flume discards the events from its channels.
- client.ack(seq)
- logDebug("Ack sent for sequence number: " + seq)
- } catch {
- case e: Exception =>
- try {
- // Let Flume know that the events need to be pushed back into the channel.
- logDebug("Sending nack for sequence number: " + seq)
- client.nack(seq) // If the agent is down, even this could fail and throw
- logDebug("Nack sent for sequence number: " + seq)
- } catch {
- case e: Exception => logError(
- "Sending Nack also failed. A Flume agent is down.")
- }
- TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
- logWarning("Error while attempting to store events", e)
- }
- } else {
- logWarning("Did not receive events from Flume agent due to error on the Flume " +
- "agent: " + eventBatch.getErrorMsg)
- }
- } catch {
- case e: Exception =>
- logWarning("Error while reading data from Flume", e)
- } finally {
- connections.add(connection)
- }
- }
- }
- })
+ receiverExecutor.submit(new FlumeBatchFetcher(this))
}
}
@@ -153,16 +100,12 @@ private[streaming] class FlumePollingReceiver(
channelFactory.releaseExternalResources()
}
- /**
- * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
- * @param event - Event to convert to SparkFlumeEvent
- * @return - The SparkFlumeEvent generated from SparkSinkEvent
- */
- private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
- val sparkFlumeEvent = new SparkFlumeEvent()
- sparkFlumeEvent.event.setBody(event.getBody)
- sparkFlumeEvent.event.setHeaders(event.getHeaders)
- sparkFlumeEvent
+ private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
+ this.connections
+ }
+
+ private[flume] def getMaxBatchSize: Int = {
+ this.maxBatchSize
}
}
@@ -171,7 +114,7 @@ private[streaming] class FlumePollingReceiver(
* @param transceiver The transceiver to use for communication with Flume
* @param client The client that the callbacks are received on.
*/
-private class FlumeConnection(val transceiver: NettyTransceiver,
+private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
val client: SparkFlumeProtocol.Callback)