aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/pom.xml5
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala72
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala3
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala178
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala144
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala195
7 files changed, 635 insertions, 6 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 874b8a7959..9f680b27c3 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -77,6 +77,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-sink_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000000..dc629df4f4
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectOutput, ObjectInput}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+private[streaming] object EventTransformer extends Logging {
+ def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+ Array[Byte]) = {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.readFully(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+ (headers, bodyBuff)
+ }
+
+ def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+ body: Array[Byte]) {
+ out.writeInt(body.length)
+ out.write(body)
+ val numHeaders = headers.size()
+ out.writeInt(numHeaders)
+ for ((k,v) <- headers) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 56d2886b26..4b2ea45fb8 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -39,11 +39,8 @@ import org.apache.spark.streaming.receiver.Receiver
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.ChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
-import org.jboss.netty.handler.execution.ExecutionHandler
private[streaming]
class FlumeInputDStream[T: ClassTag](
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000000..148262bb67
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
+ * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
+ * @param _ssc Streaming context that will execute this input stream
+ * @param addresses List of addresses at which SparkSinks are listening
+ * @param maxBatchSize Maximum size of a batch
+ * @param parallelism Number of parallel connections to open
+ * @param storageLevel The storage level to use.
+ * @tparam T Class type of the object of this stream
+ */
+private[streaming] class FlumePollingInputDStream[T: ClassTag](
+ @transient _ssc: StreamingContext,
+ val addresses: Seq[InetSocketAddress],
+ val maxBatchSize: Int,
+ val parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
+ new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
+ }
+}
+
+private[streaming] class FlumePollingReceiver(
+ addresses: Seq[InetSocketAddress],
+ maxBatchSize: Int,
+ parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+ lazy val channelFactoryExecutor =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
+ setNameFormat("Flume Receiver Channel Thread - %d").build())
+
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+
+ lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
+
+ private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
+
+ override def onStart(): Unit = {
+ // Create the connections to each Flume agent.
+ addresses.foreach(host => {
+ val transceiver = new NettyTransceiver(host, channelFactory)
+ val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ connections.add(new FlumeConnection(transceiver, client))
+ })
+ for (i <- 0 until parallelism) {
+ logInfo("Starting Flume Polling Receiver worker threads starting..")
+ // Threads that pull data from Flume.
+ receiverExecutor.submit(new Runnable {
+ override def run(): Unit = {
+ while (true) {
+ val connection = connections.poll()
+ val client = connection.client
+ try {
+ val eventBatch = client.getEventBatch(maxBatchSize)
+ if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+ // No error, proceed with processing data
+ val seq = eventBatch.getSequenceNumber
+ val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
+ logDebug(
+ "Received batch of " + events.size() + " events with sequence number: " + seq)
+ try {
+ // Convert each Flume event to a serializable SparkFlumeEvent
+ val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+ var j = 0
+ while (j < events.size()) {
+ buffer += toSparkFlumeEvent(events(j))
+ j += 1
+ }
+ store(buffer)
+ logDebug("Sending ack for sequence number: " + seq)
+ // Send an ack to Flume so that Flume discards the events from its channels.
+ client.ack(seq)
+ logDebug("Ack sent for sequence number: " + seq)
+ } catch {
+ case e: Exception =>
+ try {
+ // Let Flume know that the events need to be pushed back into the channel.
+ logDebug("Sending nack for sequence number: " + seq)
+ client.nack(seq) // If the agent is down, even this could fail and throw
+ logDebug("Nack sent for sequence number: " + seq)
+ } catch {
+ case e: Exception => logError(
+ "Sending Nack also failed. A Flume agent is down.")
+ }
+ TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
+ logWarning("Error while attempting to store events", e)
+ }
+ } else {
+ logWarning("Did not receive events from Flume agent due to error on the Flume " +
+ "agent: " + eventBatch.getErrorMsg)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Error while reading data from Flume", e)
+ } finally {
+ connections.add(connection)
+ }
+ }
+ }
+ })
+ }
+ }
+
+ override def onStop(): Unit = {
+ logInfo("Shutting down Flume Polling Receiver")
+ receiverExecutor.shutdownNow()
+ connections.foreach(connection => {
+ connection.transceiver.close()
+ })
+ channelFactory.releaseExternalResources()
+ }
+
+ /**
+ * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
+ * @param event - Event to convert to SparkFlumeEvent
+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
+ */
+ private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
+ val sparkFlumeEvent = new SparkFlumeEvent()
+ sparkFlumeEvent.event.setBody(event.getBody)
+ sparkFlumeEvent.event.setHeaders(event.getHeaders)
+ sparkFlumeEvent
+ }
+}
+
+/**
+ * A wrapper around the transceiver and the Avro IPC API.
+ * @param transceiver The transceiver to use for communication with Flume
+ * @param client The client that the callbacks are received on.
+ */
+private class FlumeConnection(val transceiver: NettyTransceiver,
+ val client: SparkFlumeProtocol.Callback)
+
+
+
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 716db9fa76..4b732c1592 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -17,12 +17,19 @@
package org.apache.spark.streaming.flume
+import java.net.InetSocketAddress
+
+import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
object FlumeUtils {
+ private val DEFAULT_POLLING_PARALLELISM = 5
+ private val DEFAULT_POLLING_BATCH_SIZE = 1000
+
/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
@@ -56,7 +63,7 @@ object FlumeUtils {
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc, hostname, port, storageLevel, enableDecompression)
-
+
inputStream
}
@@ -105,4 +112,135 @@ object FlumeUtils {
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Address of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
+ parallelism, storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running
+ * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ @Experimental
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ }
}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
new file mode 100644
index 0000000000..79c5b91654
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import java.net.InetSocketAddress;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ InetSocketAddress[] addresses = new InetSocketAddress[] {
+ new InetSocketAddress("localhost", 12345)
+ };
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 =
+ FlumeUtils.createPollingStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
+ ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
+ }
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
new file mode 100644
index 0000000000..47071d0cc4
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+import org.apache.flume.event.EventBuilder
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
+import org.apache.spark.streaming.flume.sink._
+
+class FlumePollingStreamSuite extends TestSuiteBase {
+
+ val testPort = 9999
+ val batchCount = 5
+ val eventsPerBatch = 100
+ val totalEventsPerChannel = batchCount * eventsPerBatch
+ val channelCapacity = 5000
+
+ test("flume polling test") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
+ StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ outputStream.register()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+ ssc.start()
+
+ writeAndVerify(Seq(channel), ssc, outputBuffer)
+ assertChannelIsEmpty(channel)
+ sink.stop()
+ channel.stop()
+ }
+
+ test("flume polling test multiple hosts") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
+ eventsPerBatch, 5)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ outputStream.register()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val channel2 = new MemoryChannel()
+ Configurables.configure(channel2, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ val sink2 = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
+ Configurables.configure(sink2, context)
+ sink2.setChannel(channel2)
+ sink2.start()
+ ssc.start()
+ writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
+ assertChannelIsEmpty(channel)
+ assertChannelIsEmpty(channel2)
+ sink.stop()
+ channel.stop()
+ }
+
+ def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
+ outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val executor = Executors.newCachedThreadPool()
+ val executorCompletion = new ExecutorCompletionService[Void](executor)
+ channels.map(channel => {
+ executorCompletion.submit(new TxnSubmitter(channel, clock))
+ })
+ for (i <- 0 until channels.size) {
+ executorCompletion.take()
+ }
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < batchCount * channels.size &&
+ System.currentTimeMillis() - startTime < 15000) {
+ logInfo("output.size = " + outputBuffer.size)
+ Thread.sleep(100)
+ }
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
+ logInfo("Stopping context")
+ ssc.stop()
+
+ val flattenedBuffer = outputBuffer.flatten
+ assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
+ var counter = 0
+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+ val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
+ String.valueOf(i)).getBytes("utf-8"),
+ Map[String, String]("test-" + i.toString -> "header"))
+ var found = false
+ var j = 0
+ while (j < flattenedBuffer.size && !found) {
+ val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
+ if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
+ eventToVerify.getHeaders.get("test-" + i.toString)
+ .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
+ found = true
+ counter += 1
+ }
+ j += 1
+ }
+ }
+ assert(counter === totalEventsPerChannel * channels.size)
+ }
+
+ def assertChannelIsEmpty(channel: MemoryChannel) = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
+ }
+
+ private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
+ override def call(): Void = {
+ var t = 0
+ for (i <- 0 until batchCount) {
+ val tx = channel.getTransaction
+ tx.begin()
+ for (j <- 0 until eventsPerBatch) {
+ channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
+ "utf-8"),
+ Map[String, String]("test-" + t.toString -> "header")))
+ t += 1
+ }
+ tx.commit()
+ tx.close()
+ Thread.sleep(500) // Allow some time for the events to reach
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ null
+ }
+ }
+}