aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume/src/main')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala72
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala3
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala178
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala144
4 files changed, 391 insertions, 6 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000000..dc629df4f4
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectOutput, ObjectInput}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+private[streaming] object EventTransformer extends Logging {
+ def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+ Array[Byte]) = {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.readFully(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+ (headers, bodyBuff)
+ }
+
+ def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+ body: Array[Byte]) {
+ out.writeInt(body.length)
+ out.write(body)
+ val numHeaders = headers.size()
+ out.writeInt(numHeaders)
+ for ((k,v) <- headers) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 56d2886b26..4b2ea45fb8 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -39,11 +39,8 @@ import org.apache.spark.streaming.receiver.Receiver
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.ChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
-import org.jboss.netty.handler.execution.ExecutionHandler
private[streaming]
class FlumeInputDStream[T: ClassTag](
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000000..148262bb67
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
+ * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
+ * @param _ssc Streaming context that will execute this input stream
+ * @param addresses List of addresses at which SparkSinks are listening
+ * @param maxBatchSize Maximum size of a batch
+ * @param parallelism Number of parallel connections to open
+ * @param storageLevel The storage level to use.
+ * @tparam T Class type of the object of this stream
+ */
+private[streaming] class FlumePollingInputDStream[T: ClassTag](
+ @transient _ssc: StreamingContext,
+ val addresses: Seq[InetSocketAddress],
+ val maxBatchSize: Int,
+ val parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
+ new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
+ }
+}
+
+private[streaming] class FlumePollingReceiver(
+ addresses: Seq[InetSocketAddress],
+ maxBatchSize: Int,
+ parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+ lazy val channelFactoryExecutor =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
+ setNameFormat("Flume Receiver Channel Thread - %d").build())
+
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+
+ lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
+
+ private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
+
+ override def onStart(): Unit = {
+ // Create the connections to each Flume agent.
+ addresses.foreach(host => {
+ val transceiver = new NettyTransceiver(host, channelFactory)
+ val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ connections.add(new FlumeConnection(transceiver, client))
+ })
+ for (i <- 0 until parallelism) {
+ logInfo("Starting Flume Polling Receiver worker threads starting..")
+ // Threads that pull data from Flume.
+ receiverExecutor.submit(new Runnable {
+ override def run(): Unit = {
+ while (true) {
+ val connection = connections.poll()
+ val client = connection.client
+ try {
+ val eventBatch = client.getEventBatch(maxBatchSize)
+ if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+ // No error, proceed with processing data
+ val seq = eventBatch.getSequenceNumber
+ val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
+ logDebug(
+ "Received batch of " + events.size() + " events with sequence number: " + seq)
+ try {
+ // Convert each Flume event to a serializable SparkFlumeEvent
+ val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+ var j = 0
+ while (j < events.size()) {
+ buffer += toSparkFlumeEvent(events(j))
+ j += 1
+ }
+ store(buffer)
+ logDebug("Sending ack for sequence number: " + seq)
+ // Send an ack to Flume so that Flume discards the events from its channels.
+ client.ack(seq)
+ logDebug("Ack sent for sequence number: " + seq)
+ } catch {
+ case e: Exception =>
+ try {
+ // Let Flume know that the events need to be pushed back into the channel.
+ logDebug("Sending nack for sequence number: " + seq)
+ client.nack(seq) // If the agent is down, even this could fail and throw
+ logDebug("Nack sent for sequence number: " + seq)
+ } catch {
+ case e: Exception => logError(
+ "Sending Nack also failed. A Flume agent is down.")
+ }
+ TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
+ logWarning("Error while attempting to store events", e)
+ }
+ } else {
+ logWarning("Did not receive events from Flume agent due to error on the Flume " +
+ "agent: " + eventBatch.getErrorMsg)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Error while reading data from Flume", e)
+ } finally {
+ connections.add(connection)
+ }
+ }
+ }
+ })
+ }
+ }
+
+ override def onStop(): Unit = {
+ logInfo("Shutting down Flume Polling Receiver")
+ receiverExecutor.shutdownNow()
+ connections.foreach(connection => {
+ connection.transceiver.close()
+ })
+ channelFactory.releaseExternalResources()
+ }
+
+ /**
+ * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
+ * @param event - Event to convert to SparkFlumeEvent
+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
+ */
+ private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
+ val sparkFlumeEvent = new SparkFlumeEvent()
+ sparkFlumeEvent.event.setBody(event.getBody)
+ sparkFlumeEvent.event.setHeaders(event.getHeaders)
+ sparkFlumeEvent
+ }
+}
+
+/**
+ * A wrapper around the transceiver and the Avro IPC API.
+ * @param transceiver The transceiver to use for communication with Flume
+ * @param client The client that the callbacks are received on.
+ */
+private class FlumeConnection(val transceiver: NettyTransceiver,
+ val client: SparkFlumeProtocol.Callback)
+
+
+
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 716db9fa76..4b732c1592 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -17,12 +17,19 @@
package org.apache.spark.streaming.flume
+import java.net.InetSocketAddress
+
+import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
object FlumeUtils {
+ private val DEFAULT_POLLING_PARALLELISM = 5
+ private val DEFAULT_POLLING_BATCH_SIZE = 1000
+
/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
@@ -56,7 +63,7 @@ object FlumeUtils {
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc, hostname, port, storageLevel, enableDecompression)
-
+
inputStream
}
@@ -105,4 +112,135 @@ object FlumeUtils {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Address of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
+ parallelism, storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running
+ * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ }
}