aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-25 17:37:16 -0700
committerReynold Xin <rxin@databricks.com>2016-03-25 17:37:16 -0700
commit24587ce433aa30f30a5d1ed6566365f24c222a27 (patch)
treeb1ff8ffa17b643d3e833be33debecf209d20ff6d /external/flume
parent54d13bed87fcf2968f77e1f1153e85184ec91d78 (diff)
downloadspark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.gz
spark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.bz2
spark-24587ce433aa30f30a5d1ed6566365f24c222a27.zip
[SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark
## What changes were proposed in this pull request? This PR moves flume back to Spark as per the discussion in the dev mail-list. ## How was this patch tested? Existing Jenkins tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11895 from zsxwing/move-flume-back.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/pom.xml78
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala72
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala166
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala205
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala123
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala117
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala311
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala209
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java21
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala23
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java36
-rw-r--r--external/flume/src/test/resources/log4j.properties28
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala48
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala130
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala103
17 files changed, 1758 insertions, 0 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
new file mode 100644
index 0000000000..d650dd034d
--- /dev/null
+++ b/external/flume/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-flume</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000000..07c5286477
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectInput, ObjectOutput}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+private[streaming] object EventTransformer extends Logging {
+ def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+ Array[Byte]) = {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.readFully(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+ (headers, bodyBuff)
+ }
+
+ def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+ body: Array[Byte]) {
+ out.writeInt(body.length)
+ out.write(body)
+ val numHeaders = headers.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- headers.asScala) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
new file mode 100644
index 0000000000..5f234b1f0c
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
+ * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
+ * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
+ *
+ * @param receiver The receiver that owns this instance.
+ */
+
+private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
+ Logging {
+
+ def run(): Unit = {
+ while (!receiver.isStopped()) {
+ val connection = receiver.getConnections.poll()
+ val client = connection.client
+ var batchReceived = false
+ var seq: CharSequence = null
+ try {
+ getBatch(client) match {
+ case Some(eventBatch) =>
+ batchReceived = true
+ seq = eventBatch.getSequenceNumber
+ val events = toSparkFlumeEvents(eventBatch.getEvents)
+ if (store(events)) {
+ sendAck(client, seq)
+ } else {
+ sendNack(batchReceived, client, seq)
+ }
+ case None =>
+ }
+ } catch {
+ case e: Exception =>
+ Throwables.getRootCause(e) match {
+ // If the cause was an InterruptedException, then check if the receiver is stopped -
+ // if yes, just break out of the loop. Else send a Nack and log a warning.
+ // In the unlikely case, the cause was not an Exception,
+ // then just throw it out and exit.
+ case interrupted: InterruptedException =>
+ if (!receiver.isStopped()) {
+ logWarning("Interrupted while receiving data from Flume", interrupted)
+ sendNack(batchReceived, client, seq)
+ }
+ case exception: Exception =>
+ logWarning("Error while receiving data from Flume", exception)
+ sendNack(batchReceived, client, seq)
+ }
+ } finally {
+ receiver.getConnections.add(connection)
+ }
+ }
+ }
+
+ /**
+ * Gets a batch of events from the specified client. This method does not handle any exceptions
+ * which will be propagated to the caller.
+ * @param client Client to get events from
+ * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
+ */
+ private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
+ val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
+ if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+ // No error, proceed with processing data
+ logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
+ s"number: ${eventBatch.getSequenceNumber}")
+ Some(eventBatch)
+ } else {
+ logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
+ eventBatch.getErrorMsg)
+ None
+ }
+ }
+
+ /**
+ * Store the events in the buffer to Spark. This method will not propagate any exceptions,
+ * but will propagate any other errors.
+ * @param buffer The buffer to store
+ * @return true if the data was stored without any exception being thrown, else false
+ */
+ private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
+ try {
+ receiver.store(buffer)
+ true
+ } catch {
+ case e: Exception =>
+ logWarning("Error while attempting to store data received from Flume", e)
+ false
+ }
+ }
+
+ /**
+ * Send an ack to the client for the sequence number. This method does not handle any exceptions
+ * which will be propagated to the caller.
+ * @param client client to send the ack to
+ * @param seq sequence number of the batch to be ack-ed.
+ * @return
+ */
+ private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
+ logDebug("Sending ack for sequence number: " + seq)
+ client.ack(seq)
+ logDebug("Ack sent for sequence number: " + seq)
+ }
+
+ /**
+ * This method sends a Nack if a batch was received to the client with the given sequence
+ * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
+ * to handle it.
+ * @param batchReceived true if a batch was received. If this is false, no nack is sent
+ * @param client The client to which the nack should be sent
+ * @param seq The sequence number of the batch that is being nack-ed.
+ */
+ private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
+ seq: CharSequence): Unit = {
+ if (batchReceived) {
+ // Let Flume know that the events need to be pushed back into the channel.
+ logDebug("Sending nack for sequence number: " + seq)
+ client.nack(seq) // If the agent is down, even this could fail and throw
+ logDebug("Nack sent for sequence number: " + seq)
+ }
+ }
+
+ /**
+ * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
+ * @param events - Events to convert to SparkFlumeEvents
+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
+ */
+ private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
+ ArrayBuffer[SparkFlumeEvent] = {
+ // Convert each Flume event to a serializable SparkFlumeEvent
+ val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+ var j = 0
+ while (j < events.size()) {
+ val event = events.get(j)
+ val sparkFlumeEvent = new SparkFlumeEvent()
+ sparkFlumeEvent.event.setBody(event.getBody)
+ sparkFlumeEvent.event.setHeaders(event.getHeaders)
+ buffer += sparkFlumeEvent
+ j += 1
+ }
+ buffer
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
new file mode 100644
index 0000000000..7dc9606913
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, Status}
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
+
+private[streaming]
+class FlumeInputDStream[T: ClassTag](
+ _ssc: StreamingContext,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
+ new FlumeReceiver(host, port, storageLevel, enableDecompression)
+ }
+}
+
+/**
+ * A wrapper class for AvroFlumeEvent's with a custom serialization format.
+ *
+ * This is necessary because AvroFlumeEvent uses inner data structures
+ * which are not serializable.
+ */
+class SparkFlumeEvent() extends Externalizable {
+ var event: AvroFlumeEvent = new AvroFlumeEvent()
+
+ /* De-serialize from bytes. */
+ def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.readFully(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.readFully(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.readFully(valBuff)
+ val value: String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+
+ event.setBody(ByteBuffer.wrap(bodyBuff))
+ event.setHeaders(headers)
+ }
+
+ /* Serialize to bytes. */
+ def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+ val body = event.getBody
+ out.writeInt(body.remaining())
+ Utils.writeByteBuffer(body, out)
+
+ val numHeaders = event.getHeaders.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- event.getHeaders.asScala) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
+
+private[streaming] object SparkFlumeEvent {
+ def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
+ val event = new SparkFlumeEvent
+ event.event = in
+ event
+ }
+}
+
+/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
+class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event: AvroFlumeEvent): Status = {
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
+ Status.OK
+ }
+
+ override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
+ events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
+ Status.OK
+ }
+}
+
+/** A NetworkReceiver which listens for events using the
+ * Flume Avro interface. */
+private[streaming]
+class FlumeReceiver(
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+ lazy val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ var server: NettyServer = null
+
+ private def initServer() = {
+ if (enableDecompression) {
+ val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool())
+ val channelPipelineFactory = new CompressionChannelPipelineFactory()
+
+ new NettyServer(
+ responder,
+ new InetSocketAddress(host, port),
+ channelFactory,
+ channelPipelineFactory,
+ null)
+ } else {
+ new NettyServer(responder, new InetSocketAddress(host, port))
+ }
+ }
+
+ def onStart() {
+ synchronized {
+ if (server == null) {
+ server = initServer()
+ server.start()
+ } else {
+ logWarning("Flume receiver being asked to start more then once with out close")
+ }
+ }
+ logInfo("Flume receiver started")
+ }
+
+ def onStop() {
+ synchronized {
+ if (server != null) {
+ server.close()
+ server = null
+ }
+ }
+ logInfo("Flume receiver stopped")
+ }
+
+ override def preferredLocation: Option[String] = Option(host)
+
+ /** A Netty Pipeline factory that will decompress incoming data from
+ * and the Netty client and compress data going back to the client.
+ *
+ * The compression on the return is required because Flume requires
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
+ */
+ private[streaming]
+ class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+ def getPipeline(): ChannelPipeline = {
+ val pipeline = Channels.pipeline()
+ val encoder = new ZlibEncoder(6)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ pipeline
+ }
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000000..250bfc1718
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.flume.sink._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
+ * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
+ * @param _ssc Streaming context that will execute this input stream
+ * @param addresses List of addresses at which SparkSinks are listening
+ * @param maxBatchSize Maximum size of a batch
+ * @param parallelism Number of parallel connections to open
+ * @param storageLevel The storage level to use.
+ * @tparam T Class type of the object of this stream
+ */
+private[streaming] class FlumePollingInputDStream[T: ClassTag](
+ _ssc: StreamingContext,
+ val addresses: Seq[InetSocketAddress],
+ val maxBatchSize: Int,
+ val parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
+ new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
+ }
+}
+
+private[streaming] class FlumePollingReceiver(
+ addresses: Seq[InetSocketAddress],
+ maxBatchSize: Int,
+ parallelism: Int,
+ storageLevel: StorageLevel
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+ lazy val channelFactoryExecutor =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
+ setNameFormat("Flume Receiver Channel Thread - %d").build())
+
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+
+ lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
+
+ private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
+
+ override def onStart(): Unit = {
+ // Create the connections to each Flume agent.
+ addresses.foreach(host => {
+ val transceiver = new NettyTransceiver(host, channelFactory)
+ val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ connections.add(new FlumeConnection(transceiver, client))
+ })
+ for (i <- 0 until parallelism) {
+ logInfo("Starting Flume Polling Receiver worker threads..")
+ // Threads that pull data from Flume.
+ receiverExecutor.submit(new FlumeBatchFetcher(this))
+ }
+ }
+
+ override def onStop(): Unit = {
+ logInfo("Shutting down Flume Polling Receiver")
+ receiverExecutor.shutdown()
+ // Wait upto a minute for the threads to die
+ if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ receiverExecutor.shutdownNow()
+ }
+ connections.asScala.foreach(_.transceiver.close())
+ channelFactory.releaseExternalResources()
+ }
+
+ private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
+ this.connections
+ }
+
+ private[flume] def getMaxBatchSize: Int = {
+ this.maxBatchSize
+ }
+}
+
+/**
+ * A wrapper around the transceiver and the Avro IPC API.
+ * @param transceiver The transceiver to use for communication with Flume
+ * @param client The client that the callbacks are received on.
+ */
+private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
+ val client: SparkFlumeProtocol.Callback)
+
+
+
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
new file mode 100644
index 0000000000..945cfa7295
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.{InetSocketAddress, ServerSocket}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.{List => JList}
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
+import org.apache.flume.source.avro
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class FlumeTestUtils {
+
+ private var transceiver: NettyTransceiver = null
+
+ private val testPort: Int = findFreePort()
+
+ def getTestPort(): Int = testPort
+
+ /** Find a free port */
+ private def findFreePort(): Int = {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
+ val socket = new ServerSocket(trialPort)
+ socket.close()
+ (null, trialPort)
+ }, new SparkConf())._2
+ }
+
+ /** Send data to the flume receiver */
+ def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+ val testAddress = new InetSocketAddress("localhost", testPort)
+
+ val inputEvents = input.asScala.map { item =>
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
+ event.setHeaders(Collections.singletonMap("test", "header"))
+ event
+ }
+
+ // if last attempted transceiver had succeeded, close it
+ close()
+
+ // Create transceiver
+ transceiver = {
+ if (enableCompression) {
+ new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+ } else {
+ new NettyTransceiver(testAddress)
+ }
+ }
+
+ // Create Avro client with the transceiver
+ val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
+ if (client == null) {
+ throw new AssertionError("Cannot create client")
+ }
+
+ // Send data
+ val status = client.appendBatch(inputEvents.asJava)
+ if (status != avro.Status.OK) {
+ throw new AssertionError("Sent events unsuccessfully")
+ }
+ }
+
+ def close(): Unit = {
+ if (transceiver != null) {
+ transceiver.close()
+ transceiver = null
+ }
+ }
+
+ /** Class to create socket channel with compression */
+ private class CompressionChannelFactory(compressionLevel: Int)
+ extends NioClientSocketChannelFactory {
+
+ override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+ val encoder = new ZlibEncoder(compressionLevel)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ super.newChannel(pipeline)
+ }
+ }
+
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
new file mode 100644
index 0000000000..3e3ed712f0
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+import java.net.InetSocketAddress
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.api.python.PythonRDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object FlumeUtils {
+ private val DEFAULT_POLLING_PARALLELISM = 5
+ private val DEFAULT_POLLING_BATCH_SIZE = 1000
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](
+ ssc, hostname, port, storageLevel, enableDecompression)
+
+ inputStream
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Address of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(ssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+ * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ ssc: StreamingContext,
+ addresses: Seq[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
+ parallelism, storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param hostname Hostname of the host on which the Spark Sink is running
+ * @param port Port of the host at which the Spark Sink is listening
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc, addresses, storageLevel,
+ DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+ }
+
+ /**
+ * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ * This stream will poll the sink for data and will pull events as they are available.
+ * @param addresses List of InetSocketAddresses on which the Spark Sink is running
+ * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+ * single RPC call
+ * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+ * that having a higher number of requests concurrently being pulled will
+ * result in this stream using more threads
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ addresses: Array[InetSocketAddress],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ }
+}
+
+/**
+ * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's FlumeUtils.
+ */
+private[flume] class FlumeUtilsPythonHelper {
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hosts: JList[String],
+ ports: JList[Int],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ assert(hosts.size() == ports.size())
+ val addresses = hosts.asScala.zip(ports.asScala).map {
+ case (host, port) => new InetSocketAddress(host, port)
+ }
+ val dstream = FlumeUtils.createPollingStream(
+ jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+}
+
+private object FlumeUtilsPythonHelper {
+
+ private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream()
+ val output = new DataOutputStream(byteStream)
+ try {
+ output.writeInt(map.size)
+ map.asScala.foreach { kv =>
+ PythonRDD.writeUTF(kv._1.toString, output)
+ PythonRDD.writeUTF(kv._2.toString, output)
+ }
+ byteStream.toByteArray
+ }
+ finally {
+ output.close()
+ }
+ }
+
+ private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
+ JavaPairDStream[Array[Byte], Array[Byte]] = {
+ dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
+ override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
+ val event = sparkEvent.event
+ val byteBuffer = event.getBody
+ val body = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.get(body)
+ (stringMapToByteArray(event.getHeaders), body)
+ }
+ })
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
new file mode 100644
index 0000000000..1a96df6e94
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.nio.charset.StandardCharsets
+import java.util.{Collections, List => JList, Map => JMap}
+import java.util.concurrent._
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.flume.event.EventBuilder
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+
+import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig}
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class PollingFlumeTestUtils {
+
+ private val batchCount = 5
+ val eventsPerBatch = 100
+ private val totalEventsPerChannel = batchCount * eventsPerBatch
+ private val channelCapacity = 5000
+
+ def getTotalEvents: Int = totalEventsPerChannel * channels.size
+
+ private val channels = new ArrayBuffer[MemoryChannel]
+ private val sinks = new ArrayBuffer[SparkSink]
+
+ /**
+ * Start a sink and return the port of this sink
+ */
+ def startSingleSink(): Int = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ channels += (channel)
+ sinks += sink
+
+ sink.getPort()
+ }
+
+ /**
+ * Start 2 sinks and return the ports
+ */
+ def startMultipleSinks(): Seq[Int] = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val channel2 = new MemoryChannel()
+ Configurables.configure(channel2, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ val sink2 = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink2, context)
+ sink2.setChannel(channel2)
+ sink2.start()
+
+ sinks += sink
+ sinks += sink2
+ channels += channel
+ channels += channel2
+
+ sinks.map(_.getPort())
+ }
+
+ /**
+ * Send data and wait until all data has been received
+ */
+ def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+ val executor = Executors.newCachedThreadPool()
+ val executorCompletion = new ExecutorCompletionService[Void](executor)
+
+ val latch = new CountDownLatch(batchCount * channels.size)
+ sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+ channels.foreach(channel => {
+ executorCompletion.submit(new TxnSubmitter(channel))
+ })
+
+ for (i <- 0 until channels.size) {
+ executorCompletion.take()
+ }
+
+ latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+ }
+
+ /**
+ * A Python-friendly method to assert the output
+ */
+ def assertOutput(
+ outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
+ require(outputHeaders.size == outputBodies.size)
+ val eventSize = outputHeaders.size
+ if (eventSize != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
+ }
+ var counter = 0
+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+ val eventBodyToVerify = s"${channels(k).getName}-$i"
+ val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header")
+ var found = false
+ var j = 0
+ while (j < eventSize && !found) {
+ if (eventBodyToVerify == outputBodies.get(j) &&
+ eventHeaderToVerify == outputHeaders.get(j)) {
+ found = true
+ counter += 1
+ }
+ j += 1
+ }
+ }
+ if (counter != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
+ }
+ }
+
+ def assertChannelsAreEmpty(): Unit = {
+ channels.foreach(assertChannelIsEmpty)
+ }
+
+ private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+ throw new AssertionError(s"Channel ${channel.getName} is not empty")
+ }
+ }
+
+ def close(): Unit = {
+ sinks.foreach(_.stop())
+ sinks.clear()
+ channels.foreach(_.stop())
+ channels.clear()
+ }
+
+ private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
+ override def call(): Void = {
+ var t = 0
+ for (i <- 0 until batchCount) {
+ val tx = channel.getTransaction
+ tx.begin()
+ for (j <- 0 until eventsPerBatch) {
+ channel.put(EventBuilder.withBody(
+ s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
+ Collections.singletonMap(s"test-$t", "header")))
+ t += 1
+ }
+ tx.commit()
+ tx.close()
+ Thread.sleep(500) // Allow some time for the events to reach
+ }
+ null
+ }
+ }
+
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
new file mode 100644
index 0000000000..d31aa5f5c0
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark streaming receiver for Flume.
+ */
+package org.apache.spark.streaming.flume; \ No newline at end of file
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
new file mode 100644
index 0000000000..9bfab68c4b
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Spark streaming receiver for Flume.
+ */
+package object flume
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000000..cfedb5a042
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ }
+}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
new file mode 100644
index 0000000000..79c5b91654
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import java.net.InetSocketAddress;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ InetSocketAddress[] addresses = new InetSocketAddress[] {
+ new InetSocketAddress("localhost", 12345)
+ };
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 =
+ FlumeUtils.createPollingStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
+ ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
+ ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
+ }
+}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000000..3b5e0c7746
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2(), false);
+ }
+}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..75e3b53a09
--- /dev/null
+++ b/external/flume/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
new file mode 100644
index 0000000000..c97a27ca7c
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
+
+/**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
+ */
+class TestOutputStream[T: ClassTag](parent: DStream[T],
+ val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output.add(collected)
+ }, false) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
new file mode 100644
index 0000000000..156712483d
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.util.{ManualClock, Utils}
+
+class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
+
+ val maxAttempts = 5
+ val batchDuration = Seconds(1)
+
+ val conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
+
+ val utils = new PollingFlumeTestUtils
+
+ test("flume polling test") {
+ testMultipleTimes(testFlumePolling)
+ }
+
+ test("flume polling test multiple hosts") {
+ testMultipleTimes(testFlumePollingMultipleHost)
+ }
+
+ /**
+ * Run the given test until no more java.net.BindException's are thrown.
+ * Do this only up to a certain attempt limit.
+ */
+ private def testMultipleTimes(test: () => Unit): Unit = {
+ var testPassed = false
+ var attempt = 0
+ while (!testPassed && attempt < maxAttempts) {
+ try {
+ test()
+ testPassed = true
+ } catch {
+ case e: Exception if Utils.isBindCollision(e) =>
+ logWarning("Exception when running flume polling test: " + e)
+ attempt += 1
+ }
+ }
+ assert(testPassed, s"Test failed after $attempt attempts!")
+ }
+
+ private def testFlumePolling(): Unit = {
+ try {
+ val port = utils.startSingleSink()
+
+ writeAndVerify(Seq(port))
+ utils.assertChannelsAreEmpty()
+ } finally {
+ utils.close()
+ }
+ }
+
+ private def testFlumePollingMultipleHost(): Unit = {
+ try {
+ val ports = utils.startMultipleSinks()
+ writeAndVerify(ports)
+ utils.assertChannelsAreEmpty()
+ } finally {
+ utils.close()
+ }
+ }
+
+ def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
+ val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
+ utils.eventsPerBatch, 5)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
+ outputStream.register()
+
+ ssc.start()
+ try {
+ utils.sendDatAndEnsureAllDataHasBeenReceived()
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(batchDuration.milliseconds)
+
+ // The eventually is required to ensure that all data in the batch has been processed.
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val flattenOutput = outputQueue.asScala.toSeq.flatten
+ val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
+ case (key, value) => (key.toString, value.toString)
+ }).map(_.asJava)
+ val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
+ utils.assertOutput(headers.asJava, bodies.asJava)
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
new file mode 100644
index 0000000000..7bac1cc4b0
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
+
+class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
+ val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
+ var ssc: StreamingContext = null
+
+ test("flume input stream") {
+ testFlumeStream(testCompression = false)
+ }
+
+ test("flume input compressed stream") {
+ testFlumeStream(testCompression = true)
+ }
+
+ /** Run test on flume stream */
+ private def testFlumeStream(testCompression: Boolean): Unit = {
+ val input = (1 to 100).map { _.toString }
+ val utils = new FlumeTestUtils
+ try {
+ val outputQueue = startContext(utils.getTestPort(), testCompression)
+
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ utils.writeInput(input.asJava, testCompression)
+ }
+
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
+ outputEvents.foreach {
+ event =>
+ event.getHeaders.get("test") should be("header")
+ }
+ val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
+ output should be (input)
+ }
+ } finally {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ utils.close()
+ }
+ }
+
+ /** Setup and start the streaming context */
+ private def startContext(
+ testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
+ ssc = new StreamingContext(conf, Milliseconds(200))
+ val flumeStream = FlumeUtils.createStream(
+ ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
+ outputStream.register()
+ ssc.start()
+ outputQueue
+ }
+
+ /** Class to create socket channel with compression */
+ private class CompressionChannelFactory(compressionLevel: Int)
+ extends NioClientSocketChannelFactory {
+
+ override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+ val encoder = new ZlibEncoder(compressionLevel)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ super.newChannel(pipeline)
+ }
+ }
+}