aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorHari Shreedharan <harishreedharan@gmail.com>2014-07-29 11:11:29 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-29 11:11:29 -0700
commit800ecff4b1127d9042d5a8a746348fb4d45aa34b (patch)
tree4cf35f6858fa9cf6e3db120b4785295103f60386 /external/flume
parentfc4d05700026f4ee9cc5544cf493d900039c38f3 (diff)
downloadspark-800ecff4b1127d9042d5a8a746348fb4d45aa34b.tar.gz
spark-800ecff4b1127d9042d5a8a746348fb4d45aa34b.tar.bz2
spark-800ecff4b1127d9042d5a8a746348fb4d45aa34b.zip
[STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu...
...sh model Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the receiver fails, it currently has to be restarted on the same node to be able to receive data. This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new DStream that is also included in this commit. This model ensures that data can be pulled into Spark from Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on multiple threads for better performance. Author: Hari Shreedharan <harishreedharan@gmail.com> Author: Hari Shreedharan <hshreedharan@apache.org> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: harishreedharan <hshreedharan@cloudera.com> Closes #807 from harishreedharan/master and squashes the following commits: e7f70a3 [Hari Shreedharan] Merge remote-tracking branch 'asf-git/master' 96cfb6f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' e48d785 [Hari Shreedharan] Documenting flume-sink being ignored for Mima checks. 5f212ce [Hari Shreedharan] Ignore Spark Sink from mima. 981bf62 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 7a1bc6e [Hari Shreedharan] Fix SparkBuild.scala a082eb3 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 1f47364 [Hari Shreedharan] Minor fixes. 73d6f6d [Hari Shreedharan] Cleaned up tests a bit. Added some docs in multiple places. 65b76b4 [Hari Shreedharan] Fixing the unit test. e59cc20 [Hari Shreedharan] Use SparkFlumeEvent instead of the new type. Also, Flume Polling Receiver now uses the store(ArrayBuffer) method. f3c99d1 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 3572180 [Hari Shreedharan] Adding a license header, making Jenkins happy. 799509f [Hari Shreedharan] Fix a compile issue. 3c5194c [Hari Shreedharan] Merge remote-tracking branch 'asf/master' d248d22 [harishreedharan] Merge pull request #1 from tdas/flume-polling 10b6214 [Tathagata Das] Changed public API, changed sink package, and added java unit test to make sure Java API is callable from Java. 1edc806 [Hari Shreedharan] SPARK-1729. Update logging in Spark Sink. 8c00289 [Hari Shreedharan] More debug messages 393bd94 [Hari Shreedharan] SPARK-1729. Use LinkedBlockingQueue instead of ArrayBuffer to keep track of connections. 120e2a1 [Hari Shreedharan] SPARK-1729. Some test changes and changes to utils classes. 9fd0da7 [Hari Shreedharan] SPARK-1729. Use foreach instead of map for all Options. 8136aa6 [Hari Shreedharan] Adding TransactionProcessor to map on returning batch of data 86aa274 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 205034d [Hari Shreedharan] Merging master in 4b0c7fc [Hari Shreedharan] FLUME-1729. New Flume-Spark integration. bda01fc [Hari Shreedharan] FLUME-1729. Flume-Spark integration. 0d69604 [Hari Shreedharan] FLUME-1729. Better Flume-Spark integration. 3c23c18 [Hari Shreedharan] SPARK-1729. New Spark-Flume integration. 70bcc2a [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. d6fa3aa [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. e7da512 [Hari Shreedharan] SPARK-1729. Fixing import order 9741683 [Hari Shreedharan] SPARK-1729. Fixes based on review. c604a3c [Hari Shreedharan] SPARK-1729. Optimize imports. 0f10788 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 87775aa [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 8df37e4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 03d6c1c [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 08176ad [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model d24d9d4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 6d6776a [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/pom.xml5
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala72
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala3
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala178
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala144
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala195
7 files changed, 635 insertions, 6 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 874b8a7959..9f680b27c3 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -77,6 +77,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-sink_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000000..dc629df4f4
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectOutput, ObjectInput}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+private[streaming] object EventTransformer extends Logging {
+ def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+ Array[Byte]) = {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.readFully(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+ (headers, bodyBuff)
+ }
+
+ def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+ body: Array[Byte]) {
+ out.writeInt(body.length)
+ out.write(body)
+ val numHeaders = headers.size()
+ out.writeInt(numHeaders)
+ for ((k,v) <- headers) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 56d2886b26..4b2ea45fb8 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -39,11 +39,8 @@ import org.apache.spark.streaming.receiver.Receiver
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.ChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
-import org.jboss.netty.handler.execution.ExecutionHandler
private[streaming]
class FlumeInputDStream[T: ClassTag](
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000000..148262bb67
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
+ * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
+ * @param _ssc Streaming context that will execute this input stream
+ * @param addresses List of addresses at which SparkSinks are listening
+ * @param maxBatchSize Maximum size of a batch
+ * @param parallelism Number of parallel connections to open
+ * @param storageLevel The storage level to use.
+ * @tparam T Class type of the object of this stream
+ */
+private[streaming] class FlumePollingInputDStream[T: ClassTag](
+ @transient _ssc: StreamingContext,
+ val addresses: Seq[InetSocketAddress],
+ val maxBatchSize: Int,
+ val parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
+ new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
+ }
+}
+
+private[streaming] class FlumePollingReceiver(
+ addresses: Seq[InetSocketAddress],
+ maxBatchSize: Int,
+ parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+ lazy val channelFactoryExecutor =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
+ setNameFormat("Flume Receiver Channel Thread - %d").build())
+
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+
+ lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
+
+ private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
+
+ override def onStart(): Unit = {
+ // Create the connections to each Flume agent.
+ addresses.foreach(host => {
+ val transceiver = new NettyTransceiver(host, channelFactory)
+ val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ connections.add(new FlumeConnection(transceiver, client))
+ })
+ for (i <- 0 until parallelism) {
+ logInfo("Starting Flume Polling Receiver worker threads starting..")
+ // Threads that pull data from Flume.
+ receiverExecutor.submit(new Runnable {
+ override def run(): Unit = {
+ while (true) {
+ val connection = connections.poll()
+ val client = connection.client
+ try {
+ val eventBatch = client.getEventBatch(maxBatchSize)
+ if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+ // No error, proceed with processing data
+ val seq = eventBatch.getSequenceNumber
+ val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
+ logDebug(
+ "Received batch of " + events.size() + " events with sequence number: " + seq)
+ try {
+ // Convert each Flume event to a serializable SparkFlumeEvent
+ val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+ var j = 0
+ while (j < events.size()) {
+ buffer += toSparkFlumeEvent(events(j))
+ j += 1
+ }
+ store(buffer)
+ logDebug("Sending ack for sequence number: " + seq)
+ // Send an ack to Flume so that Flume discards the events from its channels.
+ client.ack(seq)
+ logDebug("Ack sent for sequence number: " + seq)
+ } catch {
+ case e: Exception =>
+ try {
+ // Let Flume know that the events need to be pushed back into the channel.
+ logDebug("Sending nack for sequence number: " + seq)
+ client.nack(seq) // If the agent is down, even this could fail and throw
+ logDebug("Nack sent for sequence number: " + seq)
+ } catch {
+ case e: Exception => logError(
+ "Sending Nack also failed. A Flume agent is down.")
+ }
+ TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
+ logWarning("Error while attempting to store events", e)
+ }
+ } else {
+ logWarning("Did not receive events from Flume agent due to error on the Flume " +
+ "agent: " + eventBatch.getErrorMsg)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Error while reading data from Flume", e)
+ } finally {
+ connections.add(connection)
+ }
+ }
+ }
+ })
+ }
+ }
+
+ override def onStop(): Unit = {
+ logInfo("Shutting down Flume Polling Receiver")
+ receiverExecutor.shutdownNow()
+ connections.foreach(connection => {
+ connection.transceiver.close()
+ })
+ channelFactory.releaseExternalResources()
+ }
+
+ /**
+ * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
+ * @param event - Event to convert to SparkFlumeEvent
+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
+ */
+ private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
+ val sparkFlumeEvent = new SparkFlumeEvent()
+ sparkFlumeEvent.event.setBody(event.getBody)
+ sparkFlumeEvent.event.setHeaders(event.getHeaders)
+ sparkFlumeEvent
+ }
+}
+
+/**
+ * A wrapper around the transceiver and the Avro IPC API.
+ * @param transceiver The transceiver to use for communication with Flume
+ * @param client The client that the callbacks are received on.
+ */
+private class FlumeConnection(val transceiver: NettyTransceiver,
+ val client: SparkFlumeProtocol.Callback)
+
+
+
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 716db9fa76..4b732c1592 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -17,12 +17,19 @@
package org.apache.spark.streaming.flume
+import java.net.InetSocketAddress
+
+import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
object FlumeUtils {
+ private val DEFAULT_POLLING_PARALLELISM = 5
+ private val DEFAULT_POLLING_BATCH_SIZE = 1000
+
/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
@@ -56,7 +63,7 @@ object FlumeUtils {
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc, hostname, port, storageLevel, enableDecompression)
-
+
inputStream
}
@@ -105,4 +112,135 @@ object FlumeUtils {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Address of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
+ parallelism, storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running
+ * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ }
}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
new file mode 100644
index 0000000000..79c5b91654
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import java.net.InetSocketAddress;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ InetSocketAddress[] addresses = new InetSocketAddress[] {
+ new InetSocketAddress("localhost", 12345)
+ };
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 =
+ FlumeUtils.createPollingStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
+ ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
+ }
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
new file mode 100644
index 0000000000..47071d0cc4
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+import org.apache.flume.event.EventBuilder
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
+import org.apache.spark.streaming.flume.sink._
+
+class FlumePollingStreamSuite extends TestSuiteBase {
+
+ val testPort = 9999
+ val batchCount = 5
+ val eventsPerBatch = 100
+ val totalEventsPerChannel = batchCount * eventsPerBatch
+ val channelCapacity = 5000
+
+ test("flume polling test") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
+ StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ outputStream.register()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+ ssc.start()
+
+ writeAndVerify(Seq(channel), ssc, outputBuffer)
+ assertChannelIsEmpty(channel)
+ sink.stop()
+ channel.stop()
+ }
+
+ test("flume polling test multiple hosts") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
+ eventsPerBatch, 5)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ outputStream.register()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val channel2 = new MemoryChannel()
+ Configurables.configure(channel2, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ val sink2 = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
+ Configurables.configure(sink2, context)
+ sink2.setChannel(channel2)
+ sink2.start()
+ ssc.start()
+ writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
+ assertChannelIsEmpty(channel)
+ assertChannelIsEmpty(channel2)
+ sink.stop()
+ channel.stop()
+ }
+
+ def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
+ outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val executor = Executors.newCachedThreadPool()
+ val executorCompletion = new ExecutorCompletionService[Void](executor)
+ channels.map(channel => {
+ executorCompletion.submit(new TxnSubmitter(channel, clock))
+ })
+ for (i <- 0 until channels.size) {
+ executorCompletion.take()
+ }
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < batchCount * channels.size &&
+ System.currentTimeMillis() - startTime < 15000) {
+ logInfo("output.size = " + outputBuffer.size)
+ Thread.sleep(100)
+ }
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
+ logInfo("Stopping context")
+ ssc.stop()
+
+ val flattenedBuffer = outputBuffer.flatten
+ assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
+ var counter = 0
+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+ val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
+ String.valueOf(i)).getBytes("utf-8"),
+ Map[String, String]("test-" + i.toString -> "header"))
+ var found = false
+ var j = 0
+ while (j < flattenedBuffer.size && !found) {
+ val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
+ if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
+ eventToVerify.getHeaders.get("test-" + i.toString)
+ .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
+ found = true
+ counter += 1
+ }
+ j += 1
+ }
+ }
+ assert(counter === totalEventsPerChannel * channels.size)
+ }
+
+ def assertChannelIsEmpty(channel: MemoryChannel) = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
+ }
+
+ private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
+ override def call(): Void = {
+ var t = 0
+ for (i <- 0 until batchCount) {
+ val tx = channel.getTransaction
+ tx.begin()
+ for (j <- 0 until eventsPerBatch) {
+ channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
+ "utf-8"),
+ Map[String, String]("test-" + t.toString -> "header")))
+ t += 1
+ }
+ tx.commit()
+ tx.close()
+ Thread.sleep(500) // Allow some time for the events to reach
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ null
+ }
+ }
+}