aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2014-08-27 02:39:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-27 02:39:02 -0700
commit6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44 (patch)
treec56c24a614050b66506baf79d964ba6627117669 /external/flume
parent171a41cb034f4ea80f6a3c91a6872970de16a14a (diff)
downloadspark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.gz
spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.tar.bz2
spark-6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44.zip
[SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Currently lot of errors get thrown from Avro IPC layer when the dstream or sink is shutdown. This PR cleans it up. Some refactoring is done in the receiver code to put all of the RPC code into a single Try and just recover from that. The sink code has also been cleaned up. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #2065 from harishreedharan/clean-flume-shutdown and squashes the following commits: f93a07c [Hari Shreedharan] Formatting fixes. d7427cc [Hari Shreedharan] More fixes! a0a8852 [Hari Shreedharan] Fix race condition, hopefully! Minor other changes. 4c9ed02 [Hari Shreedharan] Remove unneeded list in Callback handler. Other misc changes. 8fee36f [Hari Shreedharan] Scala-library is required, else maven build fails. Also catch InterruptedException in TxnProcessor. 445e700 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown 87232e0 [Hari Shreedharan] Refactor Flume Input Stream. Clean up code, better error handling. 9001d26 [Hari Shreedharan] Change log level to debug in TransactionProcessor#shutdown method e7b8d82 [Hari Shreedharan] Incorporate review feedback 598efa7 [Hari Shreedharan] Clean up some exception handling code e1027c6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into clean-flume-shutdown ed608c8 [Hari Shreedharan] [SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala167
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala77
2 files changed, 177 insertions, 67 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
new file mode 100644
index 0000000000..88cc2aa3bf
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
+ * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
+ * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
+ *
+ * @param receiver The receiver that owns this instance.
+ */
+
+private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
+ Logging {
+
+ def run(): Unit = {
+ while (!receiver.isStopped()) {
+ val connection = receiver.getConnections.poll()
+ val client = connection.client
+ var batchReceived = false
+ var seq: CharSequence = null
+ try {
+ getBatch(client) match {
+ case Some(eventBatch) =>
+ batchReceived = true
+ seq = eventBatch.getSequenceNumber
+ val events = toSparkFlumeEvents(eventBatch.getEvents)
+ if (store(events)) {
+ sendAck(client, seq)
+ } else {
+ sendNack(batchReceived, client, seq)
+ }
+ case None =>
+ }
+ } catch {
+ case e: Exception =>
+ Throwables.getRootCause(e) match {
+ // If the cause was an InterruptedException, then check if the receiver is stopped -
+ // if yes, just break out of the loop. Else send a Nack and log a warning.
+ // In the unlikely case, the cause was not an Exception,
+ // then just throw it out and exit.
+ case interrupted: InterruptedException =>
+ if (!receiver.isStopped()) {
+ logWarning("Interrupted while receiving data from Flume", interrupted)
+ sendNack(batchReceived, client, seq)
+ }
+ case exception: Exception =>
+ logWarning("Error while receiving data from Flume", exception)
+ sendNack(batchReceived, client, seq)
+ }
+ } finally {
+ receiver.getConnections.add(connection)
+ }
+ }
+ }
+
+ /**
+ * Gets a batch of events from the specified client. This method does not handle any exceptions
+ * which will be propogated to the caller.
+ * @param client Client to get events from
+ * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
+ */
+ private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
+ val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
+ if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+ // No error, proceed with processing data
+ logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
+ s"number: ${eventBatch.getSequenceNumber}")
+ Some(eventBatch)
+ } else {
+ logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
+ eventBatch.getErrorMsg)
+ None
+ }
+ }
+
+ /**
+ * Store the events in the buffer to Spark. This method will not propogate any exceptions,
+ * but will propogate any other errors.
+ * @param buffer The buffer to store
+ * @return true if the data was stored without any exception being thrown, else false
+ */
+ private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
+ try {
+ receiver.store(buffer)
+ true
+ } catch {
+ case e: Exception =>
+ logWarning("Error while attempting to store data received from Flume", e)
+ false
+ }
+ }
+
+ /**
+ * Send an ack to the client for the sequence number. This method does not handle any exceptions
+ * which will be propagated to the caller.
+ * @param client client to send the ack to
+ * @param seq sequence number of the batch to be ack-ed.
+ * @return
+ */
+ private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
+ logDebug("Sending ack for sequence number: " + seq)
+ client.ack(seq)
+ logDebug("Ack sent for sequence number: " + seq)
+ }
+
+ /**
+ * This method sends a Nack if a batch was received to the client with the given sequence
+ * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
+ * to handle it.
+ * @param batchReceived true if a batch was received. If this is false, no nack is sent
+ * @param client The client to which the nack should be sent
+ * @param seq The sequence number of the batch that is being nack-ed.
+ */
+ private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
+ seq: CharSequence): Unit = {
+ if (batchReceived) {
+ // Let Flume know that the events need to be pushed back into the channel.
+ logDebug("Sending nack for sequence number: " + seq)
+ client.nack(seq) // If the agent is down, even this could fail and throw
+ logDebug("Nack sent for sequence number: " + seq)
+ }
+ }
+
+ /**
+ * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
+ * @param events - Events to convert to SparkFlumeEvents
+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
+ */
+ private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
+ ArrayBuffer[SparkFlumeEvent] = {
+ // Convert each Flume event to a serializable SparkFlumeEvent
+ val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+ var j = 0
+ while (j < events.size()) {
+ val event = events(j)
+ val sparkFlumeEvent = new SparkFlumeEvent()
+ sparkFlumeEvent.event.setBody(event.getBody)
+ sparkFlumeEvent.event.setHeaders(event.getHeaders)
+ buffer += sparkFlumeEvent
+ j += 1
+ }
+ buffer
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 148262bb67..92fa5b41be 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -18,10 +18,9 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
+import java.util.concurrent.{LinkedBlockingQueue, Executors}
import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -86,61 +85,9 @@ private[streaming] class FlumePollingReceiver(
connections.add(new FlumeConnection(transceiver, client))
})
for (i <- 0 until parallelism) {
- logInfo("Starting Flume Polling Receiver worker threads starting..")
+ logInfo("Starting Flume Polling Receiver worker threads..")
// Threads that pull data from Flume.
- receiverExecutor.submit(new Runnable {
- override def run(): Unit = {
- while (true) {
- val connection = connections.poll()
- val client = connection.client
- try {
- val eventBatch = client.getEventBatch(maxBatchSize)
- if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
- // No error, proceed with processing data
- val seq = eventBatch.getSequenceNumber
- val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
- logDebug(
- "Received batch of " + events.size() + " events with sequence number: " + seq)
- try {
- // Convert each Flume event to a serializable SparkFlumeEvent
- val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
- var j = 0
- while (j < events.size()) {
- buffer += toSparkFlumeEvent(events(j))
- j += 1
- }
- store(buffer)
- logDebug("Sending ack for sequence number: " + seq)
- // Send an ack to Flume so that Flume discards the events from its channels.
- client.ack(seq)
- logDebug("Ack sent for sequence number: " + seq)
- } catch {
- case e: Exception =>
- try {
- // Let Flume know that the events need to be pushed back into the channel.
- logDebug("Sending nack for sequence number: " + seq)
- client.nack(seq) // If the agent is down, even this could fail and throw
- logDebug("Nack sent for sequence number: " + seq)
- } catch {
- case e: Exception => logError(
- "Sending Nack also failed. A Flume agent is down.")
- }
- TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
- logWarning("Error while attempting to store events", e)
- }
- } else {
- logWarning("Did not receive events from Flume agent due to error on the Flume " +
- "agent: " + eventBatch.getErrorMsg)
- }
- } catch {
- case e: Exception =>
- logWarning("Error while reading data from Flume", e)
- } finally {
- connections.add(connection)
- }
- }
- }
- })
+ receiverExecutor.submit(new FlumeBatchFetcher(this))
}
}
@@ -153,16 +100,12 @@ private[streaming] class FlumePollingReceiver(
channelFactory.releaseExternalResources()
}
- /**
- * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
- * @param event - Event to convert to SparkFlumeEvent
- * @return - The SparkFlumeEvent generated from SparkSinkEvent
- */
- private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
- val sparkFlumeEvent = new SparkFlumeEvent()
- sparkFlumeEvent.event.setBody(event.getBody)
- sparkFlumeEvent.event.setHeaders(event.getHeaders)
- sparkFlumeEvent
+ private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
+ this.connections
+ }
+
+ private[flume] def getMaxBatchSize: Int = {
+ this.maxBatchSize
}
}
@@ -171,7 +114,7 @@ private[streaming] class FlumePollingReceiver(
* @param transceiver The transceiver to use for communication with Flume
* @param client The client that the callbacks are received on.
*/
-private class FlumeConnection(val transceiver: NettyTransceiver,
+private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
val client: SparkFlumeProtocol.Callback)