aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xdev/run-tests.py7
-rw-r--r--dev/sparktestsupport/modules.py15
-rw-r--r--docs/streaming-flume-integration.md18
-rw-r--r--docs/streaming-programming-guide.md2
-rw-r--r--examples/src/main/python/streaming/flume_wordcount.py55
-rw-r--r--external/flume-assembly/pom.xml135
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala116
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala76
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala209
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala173
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala106
-rw-r--r--pom.xml1
-rw-r--r--project/SparkBuild.scala6
-rw-r--r--python/pyspark/streaming/flume.py147
-rw-r--r--python/pyspark/streaming/tests.py179
15 files changed, 1009 insertions, 236 deletions
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 4596e07014..1f0d218514 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -96,8 +96,8 @@ def determine_modules_to_test(changed_modules):
['examples', 'graphx']
>>> x = sorted(x.name for x in determine_modules_to_test([modules.sql]))
>>> x # doctest: +NORMALIZE_WHITESPACE
- ['examples', 'hive-thriftserver', 'mllib', 'pyspark-core', 'pyspark-ml', \
- 'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming', 'sparkr', 'sql']
+ ['examples', 'hive-thriftserver', 'mllib', 'pyspark-ml', \
+ 'pyspark-mllib', 'pyspark-sql', 'sparkr', 'sql']
"""
# If we're going to have to run all of the tests, then we can just short-circuit
# and return 'root'. No module depends on root, so if it appears then it will be
@@ -293,7 +293,8 @@ def build_spark_sbt(hadoop_version):
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["package",
"assembly/assembly",
- "streaming-kafka-assembly/assembly"]
+ "streaming-kafka-assembly/assembly",
+ "streaming-flume-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals
print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index efe3a897e9..993583e2f4 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -203,7 +203,7 @@ streaming_flume_sink = Module(
streaming_flume = Module(
- name="streaming_flume",
+ name="streaming-flume",
dependencies=[streaming],
source_file_regexes=[
"external/flume",
@@ -214,6 +214,15 @@ streaming_flume = Module(
)
+streaming_flume_assembly = Module(
+ name="streaming-flume-assembly",
+ dependencies=[streaming_flume, streaming_flume_sink],
+ source_file_regexes=[
+ "external/flume-assembly",
+ ]
+)
+
+
mllib = Module(
name="mllib",
dependencies=[streaming, sql],
@@ -241,7 +250,7 @@ examples = Module(
pyspark_core = Module(
name="pyspark-core",
- dependencies=[mllib, streaming, streaming_kafka],
+ dependencies=[],
source_file_regexes=[
"python/(?!pyspark/(ml|mllib|sql|streaming))"
],
@@ -281,7 +290,7 @@ pyspark_sql = Module(
pyspark_streaming = Module(
name="pyspark-streaming",
- dependencies=[pyspark_core, streaming, streaming_kafka],
+ dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
source_file_regexes=[
"python/pyspark/streaming"
],
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index 8d6e743709..de0461010d 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -58,6 +58,15 @@ configuring Flume agents.
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.flume import FlumeUtils
+
+ flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
+
+ By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
+ </div>
</div>
Note that the hostname should be the same as the one used by the resource manager in the
@@ -135,6 +144,15 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.flume import FlumeUtils
+
+ addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
+ flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
+
+ By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
+ </div>
</div>
See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index b784d59666..e72d5580da 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}
<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
-out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
+out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
diff --git a/examples/src/main/python/streaming/flume_wordcount.py b/examples/src/main/python/streaming/flume_wordcount.py
new file mode 100644
index 0000000000..091b64d8c4
--- /dev/null
+++ b/examples/src/main/python/streaming/flume_wordcount.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ Usage: flume_wordcount.py <hostname> <port>
+
+ To run this on your local machine, you need to setup Flume first, see
+ https://flume.apache.org/documentation.html
+
+ and then run the example
+ `$ bin/spark-submit --jars external/flume-assembly/target/scala-*/\
+ spark-streaming-flume-assembly-*.jar examples/src/main/python/streaming/flume_wordcount.py \
+ localhost 12345
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.flume import FlumeUtils
+
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
+ exit(-1)
+
+ sc = SparkContext(appName="PythonStreamingFlumeWordCount")
+ ssc = StreamingContext(sc, 1)
+
+ hostname, port = sys.argv[1:]
+ kvs = FlumeUtils.createStream(ssc, hostname, int(port))
+ lines = kvs.map(lambda x: x[1])
+ counts = lines.flatMap(lambda line: line.split(" ")) \
+ .map(lambda word: (word, 1)) \
+ .reduceByKey(lambda a, b: a+b)
+ counts.pprint()
+
+ ssc.start()
+ ssc.awaitTermination()
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
new file mode 100644
index 0000000000..8565cd83ed
--- /dev/null
+++ b/external/flume-assembly/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-assembly_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume Assembly</name>
+ <url>http://spark.apache.org/</url>
+
+ <properties>
+ <sbt.project.name>streaming-flume-assembly</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <version>${avro.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.velocity</groupId>
+ <artifactId>velocity</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar</outputFile>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>log4j.properties</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+</build>
+</project>
+
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
new file mode 100644
index 0000000000..9d9c3b1894
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.{InetSocketAddress, ServerSocket}
+import java.nio.ByteBuffer
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+import com.google.common.base.Charsets.UTF_8
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
+import org.apache.flume.source.avro
+import org.apache.flume.source.avro.{AvroSourceProtocol, AvroFlumeEvent}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class FlumeTestUtils {
+
+ private var transceiver: NettyTransceiver = null
+
+ private val testPort: Int = findFreePort()
+
+ def getTestPort(): Int = testPort
+
+ /** Find a free port */
+ private def findFreePort(): Int = {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
+ val socket = new ServerSocket(trialPort)
+ socket.close()
+ (null, trialPort)
+ }, new SparkConf())._2
+ }
+
+ /** Send data to the flume receiver */
+ def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+ val testAddress = new InetSocketAddress("localhost", testPort)
+
+ val inputEvents = input.map { item =>
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
+ event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+ event
+ }
+
+ // if last attempted transceiver had succeeded, close it
+ close()
+
+ // Create transceiver
+ transceiver = {
+ if (enableCompression) {
+ new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+ } else {
+ new NettyTransceiver(testAddress)
+ }
+ }
+
+ // Create Avro client with the transceiver
+ val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
+ if (client == null) {
+ throw new AssertionError("Cannot create client")
+ }
+
+ // Send data
+ val status = client.appendBatch(inputEvents.toList)
+ if (status != avro.Status.OK) {
+ throw new AssertionError("Sent events unsuccessfully")
+ }
+ }
+
+ def close(): Unit = {
+ if (transceiver != null) {
+ transceiver.close()
+ transceiver = null
+ }
+ }
+
+ /** Class to create socket channel with compression */
+ private class CompressionChannelFactory(compressionLevel: Int)
+ extends NioClientSocketChannelFactory {
+
+ override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+ val encoder = new ZlibEncoder(compressionLevel)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ super.newChannel(pipeline)
+ }
+ }
+
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 44dec45c22..095bfb0c73 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,10 +18,16 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
+import java.io.{DataOutputStream, ByteArrayOutputStream}
+import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.api.python.PythonRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
@@ -236,3 +242,71 @@ object FlumeUtils {
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
}
}
+
+/**
+ * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's FlumeUtils.
+ */
+private class FlumeUtilsPythonHelper {
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hosts: JList[String],
+ ports: JList[Int],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ assert(hosts.length == ports.length)
+ val addresses = hosts.zip(ports).map {
+ case (host, port) => new InetSocketAddress(host, port)
+ }
+ val dstream = FlumeUtils.createPollingStream(
+ jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+}
+
+private object FlumeUtilsPythonHelper {
+
+ private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream()
+ val output = new DataOutputStream(byteStream)
+ try {
+ output.writeInt(map.size)
+ map.foreach { kv =>
+ PythonRDD.writeUTF(kv._1.toString, output)
+ PythonRDD.writeUTF(kv._2.toString, output)
+ }
+ byteStream.toByteArray
+ }
+ finally {
+ output.close()
+ }
+ }
+
+ private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
+ JavaPairDStream[Array[Byte], Array[Byte]] = {
+ dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
+ override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
+ val event = sparkEvent.event
+ val byteBuffer = event.getBody
+ val body = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.get(body)
+ (stringMapToByteArray(event.getHeaders), body)
+ }
+ })
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
new file mode 100644
index 0000000000..91d63d49db
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.util.concurrent._
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Charsets.UTF_8
+import org.apache.flume.event.EventBuilder
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+
+import org.apache.spark.streaming.flume.sink.{SparkSinkConfig, SparkSink}
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class PollingFlumeTestUtils {
+
+ private val batchCount = 5
+ val eventsPerBatch = 100
+ private val totalEventsPerChannel = batchCount * eventsPerBatch
+ private val channelCapacity = 5000
+
+ def getTotalEvents: Int = totalEventsPerChannel * channels.size
+
+ private val channels = new ArrayBuffer[MemoryChannel]
+ private val sinks = new ArrayBuffer[SparkSink]
+
+ /**
+ * Start a sink and return the port of this sink
+ */
+ def startSingleSink(): Int = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ channels += (channel)
+ sinks += sink
+
+ sink.getPort()
+ }
+
+ /**
+ * Start 2 sinks and return the ports
+ */
+ def startMultipleSinks(): JList[Int] = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val channel2 = new MemoryChannel()
+ Configurables.configure(channel2, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ val sink2 = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink2, context)
+ sink2.setChannel(channel2)
+ sink2.start()
+
+ sinks += sink
+ sinks += sink2
+ channels += channel
+ channels += channel2
+
+ sinks.map(_.getPort())
+ }
+
+ /**
+ * Send data and wait until all data has been received
+ */
+ def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+ val executor = Executors.newCachedThreadPool()
+ val executorCompletion = new ExecutorCompletionService[Void](executor)
+
+ val latch = new CountDownLatch(batchCount * channels.size)
+ sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+ channels.foreach(channel => {
+ executorCompletion.submit(new TxnSubmitter(channel))
+ })
+
+ for (i <- 0 until channels.size) {
+ executorCompletion.take()
+ }
+
+ latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+ }
+
+ /**
+ * A Python-friendly method to assert the output
+ */
+ def assertOutput(
+ outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
+ require(outputHeaders.size == outputBodies.size)
+ val eventSize = outputHeaders.size
+ if (eventSize != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
+ }
+ var counter = 0
+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+ val eventBodyToVerify = s"${channels(k).getName}-$i"
+ val eventHeaderToVerify: JMap[String, String] = Map[String, String](s"test-$i" -> "header")
+ var found = false
+ var j = 0
+ while (j < eventSize && !found) {
+ if (eventBodyToVerify == outputBodies.get(j) &&
+ eventHeaderToVerify == outputHeaders.get(j)) {
+ found = true
+ counter += 1
+ }
+ j += 1
+ }
+ }
+ if (counter != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
+ }
+ }
+
+ def assertChannelsAreEmpty(): Unit = {
+ channels.foreach(assertChannelIsEmpty)
+ }
+
+ private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+ throw new AssertionError(s"Channel ${channel.getName} is not empty")
+ }
+ }
+
+ def close(): Unit = {
+ sinks.foreach(_.stop())
+ sinks.clear()
+ channels.foreach(_.stop())
+ channels.clear()
+ }
+
+ private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
+ override def call(): Void = {
+ var t = 0
+ for (i <- 0 until batchCount) {
+ val tx = channel.getTransaction
+ tx.begin()
+ for (j <- 0 until eventsPerBatch) {
+ channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8),
+ Map[String, String](s"test-$t" -> "header")))
+ t += 1
+ }
+ tx.commit()
+ tx.close()
+ Thread.sleep(500) // Allow some time for the events to reach
+ }
+ null
+ }
+ }
+
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index d772b9ca9b..d5f9a0aa38 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,47 +18,33 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
-import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.conf.Configurables
-import org.apache.flume.event.EventBuilder
-import org.scalatest.concurrent.Eventually._
-
+import com.google.common.base.Charsets.UTF_8
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
-import org.apache.spark.streaming.flume.sink._
import org.apache.spark.util.{ManualClock, Utils}
class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
- val batchCount = 5
- val eventsPerBatch = 100
- val totalEventsPerChannel = batchCount * eventsPerBatch
- val channelCapacity = 5000
val maxAttempts = 5
val batchDuration = Seconds(1)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
- def beforeFunction() {
- logInfo("Using manual clock")
- conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
- }
-
- before(beforeFunction())
+ val utils = new PollingFlumeTestUtils
test("flume polling test") {
testMultipleTimes(testFlumePolling)
@@ -89,146 +75,55 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
}
private def testFlumePolling(): Unit = {
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- writeAndVerify(Seq(sink), Seq(channel))
- assertChannelIsEmpty(channel)
- sink.stop()
- channel.stop()
+ try {
+ val port = utils.startSingleSink()
+
+ writeAndVerify(Seq(port))
+ utils.assertChannelsAreEmpty()
+ } finally {
+ utils.close()
+ }
}
private def testFlumePollingMultipleHost(): Unit = {
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val channel2 = new MemoryChannel()
- Configurables.configure(channel2, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- val sink2 = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink2, context)
- sink2.setChannel(channel2)
- sink2.start()
try {
- writeAndVerify(Seq(sink, sink2), Seq(channel, channel2))
- assertChannelIsEmpty(channel)
- assertChannelIsEmpty(channel2)
+ val ports = utils.startMultipleSinks()
+ writeAndVerify(ports)
+ utils.assertChannelsAreEmpty()
} finally {
- sink.stop()
- sink2.stop()
- channel.stop()
- channel2.stop()
+ utils.close()
}
}
- def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel]) {
+ def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val addresses = sinks.map(sink => new InetSocketAddress("localhost", sink.getPort()))
+ val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
- eventsPerBatch, 5)
+ utils.eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val executor = Executors.newCachedThreadPool()
- val executorCompletion = new ExecutorCompletionService[Void](executor)
-
- val latch = new CountDownLatch(batchCount * channels.size)
- sinks.foreach(_.countdownWhenBatchReceived(latch))
-
- channels.foreach(channel => {
- executorCompletion.submit(new TxnSubmitter(channel, clock))
- })
-
- for (i <- 0 until channels.size) {
- executorCompletion.take()
- }
-
- latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
- clock.advance(batchDuration.milliseconds)
-
- // The eventually is required to ensure that all data in the batch has been processed.
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenedBuffer = outputBuffer.flatten
- assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
- var counter = 0
- for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
- val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
- String.valueOf(i)).getBytes("utf-8"),
- Map[String, String]("test-" + i.toString -> "header"))
- var found = false
- var j = 0
- while (j < flattenedBuffer.size && !found) {
- val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
- if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
- eventToVerify.getHeaders.get("test-" + i.toString)
- .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
- found = true
- counter += 1
- }
- j += 1
- }
- }
- assert(counter === totalEventsPerChannel * channels.size)
- }
- ssc.stop()
- }
-
- def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
- val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
- queueRemaining.setAccessible(true)
- val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
- assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
- }
-
- private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
- override def call(): Void = {
- var t = 0
- for (i <- 0 until batchCount) {
- val tx = channel.getTransaction
- tx.begin()
- for (j <- 0 until eventsPerBatch) {
- channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
- "utf-8"),
- Map[String, String]("test-" + t.toString -> "header")))
- t += 1
- }
- tx.commit()
- tx.close()
- Thread.sleep(500) // Allow some time for the events to reach
+ try {
+ utils.sendDatAndEnsureAllDataHasBeenReceived()
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(batchDuration.milliseconds)
+
+ // The eventually is required to ensure that all data in the batch has been processed.
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val flattenOutputBuffer = outputBuffer.flatten
+ val headers = flattenOutputBuffer.map(_.event.getHeaders.map {
+ case kv => (kv._1.toString, kv._2.toString)
+ }).map(mapAsJavaMap)
+ val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8))
+ utils.assertOutput(headers, bodies)
}
- null
+ } finally {
+ ssc.stop()
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index c926359987..5bc4cdf653 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,20 +17,12 @@
package org.apache.spark.streaming.flume
-import java.net.{InetSocketAddress, ServerSocket}
-import java.nio.ByteBuffer
-
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.base.Charsets
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.commons.lang3.RandomUtils
-import org.apache.flume.source.avro
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -41,22 +33,10 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-import org.apache.spark.util.Utils
class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
-
var ssc: StreamingContext = null
- var transceiver: NettyTransceiver = null
-
- after {
- if (ssc != null) {
- ssc.stop()
- }
- if (transceiver != null) {
- transceiver.close()
- }
- }
test("flume input stream") {
testFlumeStream(testCompression = false)
@@ -69,19 +49,29 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
/** Run test on flume stream */
private def testFlumeStream(testCompression: Boolean): Unit = {
val input = (1 to 100).map { _.toString }
- val testPort = findFreePort()
- val outputBuffer = startContext(testPort, testCompression)
- writeAndVerify(input, testPort, outputBuffer, testCompression)
- }
+ val utils = new FlumeTestUtils
+ try {
+ val outputBuffer = startContext(utils.getTestPort(), testCompression)
- /** Find a free port */
- private def findFreePort(): Int = {
- val candidatePort = RandomUtils.nextInt(1024, 65536)
- Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
- val socket = new ServerSocket(trialPort)
- socket.close()
- (null, trialPort)
- }, conf)._2
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ utils.writeInput(input, testCompression)
+ }
+
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val outputEvents = outputBuffer.flatten.map { _.event }
+ outputEvents.foreach {
+ event =>
+ event.getHeaders.get("test") should be("header")
+ }
+ val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
+ output should be (input)
+ }
+ } finally {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ utils.close()
+ }
}
/** Setup and start the streaming context */
@@ -98,58 +88,6 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
outputBuffer
}
- /** Send data to the flume receiver and verify whether the data was received */
- private def writeAndVerify(
- input: Seq[String],
- testPort: Int,
- outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
- enableCompression: Boolean
- ) {
- val testAddress = new InetSocketAddress("localhost", testPort)
-
- val inputEvents = input.map { item =>
- val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8)))
- event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
- event
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- // if last attempted transceiver had succeeded, close it
- if (transceiver != null) {
- transceiver.close()
- transceiver = null
- }
-
- // Create transceiver
- transceiver = {
- if (enableCompression) {
- new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
- } else {
- new NettyTransceiver(testAddress)
- }
- }
-
- // Create Avro client with the transceiver
- val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
- client should not be null
-
- // Send data
- val status = client.appendBatch(inputEvents.toList)
- status should be (avro.Status.OK)
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputBuffer.flatten.map { _.event }
- outputEvents.foreach {
- event =>
- event.getHeaders.get("test") should be("header")
- }
- val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
- output should be (input)
- }
- }
-
/** Class to create socket channel with compression */
private class CompressionChannelFactory(compressionLevel: Int)
extends NioClientSocketChannelFactory {
diff --git a/pom.xml b/pom.xml
index 94dd512cfb..211da9ee74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,7 @@
<module>external/twitter</module>
<module>external/flume</module>
<module>external/flume-sink</module>
+ <module>external/flume-assembly</module>
<module>external/mqtt</module>
<module>external/zeromq</module>
<module>examples</module>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f5f1c9a1a2..4ef4dc8bdc 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,8 +45,8 @@ object BuildCommons {
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
"kinesis-asl").map(ProjectRef(buildLocation, _))
- val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) =
- Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly")
+ val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
+ Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
.map(ProjectRef(buildLocation, _))
val tools = ProjectRef(buildLocation, "tools")
@@ -347,7 +347,7 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
- if (mName.contains("streaming-kafka-assembly")) {
+ if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
new file mode 100644
index 0000000000..cbb573f226
--- /dev/null
+++ b/python/pyspark/streaming/flume.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+if sys.version >= "3":
+ from io import BytesIO
+else:
+ from StringIO import StringIO
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.storagelevel import StorageLevel
+from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
+from pyspark.streaming import DStream
+
+__all__ = ['FlumeUtils', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+ """ Decode the unicode as UTF-8 """
+ return s and s.decode('utf-8')
+
+
+class FlumeUtils(object):
+
+ @staticmethod
+ def createStream(ssc, hostname, port,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ enableDecompression=False,
+ bodyDecoder=utf8_decoder):
+ """
+ Create an input stream that pulls events from Flume.
+
+ :param ssc: StreamingContext object
+ :param hostname: Hostname of the slave machine to which the flume data will be sent
+ :param port: Port of the slave machine to which the flume data will be sent
+ :param storageLevel: Storage level to use for storing the received objects
+ :param enableDecompression: Should netty server decompress input stream
+ :param bodyDecoder: A function used to decode body (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+
+ try:
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ FlumeUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+
+ return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+ @staticmethod
+ def createPollingStream(ssc, addresses,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ maxBatchSize=1000,
+ parallelism=5,
+ bodyDecoder=utf8_decoder):
+ """
+ Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+ This stream will poll the sink for data and will pull events as they are available.
+
+ :param ssc: StreamingContext object
+ :param addresses: List of (host, port)s on which the Spark Sink is running.
+ :param storageLevel: Storage level to use for storing the received objects
+ :param maxBatchSize: The maximum number of events to be pulled from the Spark sink
+ in a single RPC call
+ :param parallelism: Number of concurrent requests this stream should send to the sink.
+ Note that having a higher number of requests concurrently being pulled
+ will result in this stream using more threads
+ :param bodyDecoder: A function used to decode body (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ hosts = []
+ ports = []
+ for (host, port) in addresses:
+ hosts.append(host)
+ ports.append(port)
+
+ try:
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createPollingStream(
+ ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ FlumeUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+
+ return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+ @staticmethod
+ def _toPythonDStream(ssc, jstream, bodyDecoder):
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+
+ def func(event):
+ headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
+ headers = {}
+ strSer = UTF8Deserializer()
+ for i in range(0, read_int(headersBytes)):
+ key = strSer.loads(headersBytes)
+ value = strSer.loads(headersBytes)
+ headers[key] = value
+ body = bodyDecoder(event[1])
+ return (headers, body)
+ return stream.map(func)
+
+ @staticmethod
+ def _printErrorMsg(sc):
+ print("""
+________________________________________________________________________________________________
+
+ Spark Streaming's Flume libraries not found in class path. Try one of the following.
+
+ 1. Include the Flume library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
+ Then, include the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 91ce681fbe..188c8ff120 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -38,6 +38,7 @@ else:
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
+from pyspark.streaming.flume import FlumeUtils
class PySparkStreamingTestCase(unittest.TestCase):
@@ -677,7 +678,156 @@ class KafkaStreamTests(PySparkStreamingTestCase):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)
-if __name__ == "__main__":
+
+class FlumeStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+
+ def setUp(self):
+ super(FlumeStreamTests, self).setUp()
+
+ utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils")
+ self._utils = utilsClz.newInstance()
+
+ def tearDown(self):
+ if self._utils is not None:
+ self._utils.close()
+ self._utils = None
+
+ super(FlumeStreamTests, self).tearDown()
+
+ def _startContext(self, n, compressed):
+ # Start the StreamingContext and also collect the result
+ dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(),
+ enableDecompression=compressed)
+ result = []
+
+ def get_output(_, rdd):
+ for event in rdd.collect():
+ if len(result) < n:
+ result.append(event)
+ dstream.foreachRDD(get_output)
+ self.ssc.start()
+ return result
+
+ def _validateResult(self, input, result):
+ # Validate both the header and the body
+ header = {"test": "header"}
+ self.assertEqual(len(input), len(result))
+ for i in range(0, len(input)):
+ self.assertEqual(header, result[i][0])
+ self.assertEqual(input[i], result[i][1])
+
+ def _writeInput(self, input, compressed):
+ # Try to write input to the receiver until success or timeout
+ start_time = time.time()
+ while True:
+ try:
+ self._utils.writeInput(input, compressed)
+ break
+ except:
+ if time.time() - start_time < self.timeout:
+ time.sleep(0.01)
+ else:
+ raise
+
+ def test_flume_stream(self):
+ input = [str(i) for i in range(1, 101)]
+ result = self._startContext(len(input), False)
+ self._writeInput(input, False)
+ self.wait_for(result, len(input))
+ self._validateResult(input, result)
+
+ def test_compressed_flume_stream(self):
+ input = [str(i) for i in range(1, 101)]
+ result = self._startContext(len(input), True)
+ self._writeInput(input, True)
+ self.wait_for(result, len(input))
+ self._validateResult(input, result)
+
+
+class FlumePollingStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+ maxAttempts = 5
+
+ def setUp(self):
+ utilsClz = \
+ self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils")
+ self._utils = utilsClz.newInstance()
+
+ def tearDown(self):
+ if self._utils is not None:
+ self._utils.close()
+ self._utils = None
+
+ def _writeAndVerify(self, ports):
+ # Set up the streaming context and input streams
+ ssc = StreamingContext(self.sc, self.duration)
+ try:
+ addresses = [("localhost", port) for port in ports]
+ dstream = FlumeUtils.createPollingStream(
+ ssc,
+ addresses,
+ maxBatchSize=self._utils.eventsPerBatch(),
+ parallelism=5)
+ outputBuffer = []
+
+ def get_output(_, rdd):
+ for e in rdd.collect():
+ outputBuffer.append(e)
+
+ dstream.foreachRDD(get_output)
+ ssc.start()
+ self._utils.sendDatAndEnsureAllDataHasBeenReceived()
+
+ self.wait_for(outputBuffer, self._utils.getTotalEvents())
+ outputHeaders = [event[0] for event in outputBuffer]
+ outputBodies = [event[1] for event in outputBuffer]
+ self._utils.assertOutput(outputHeaders, outputBodies)
+ finally:
+ ssc.stop(False)
+
+ def _testMultipleTimes(self, f):
+ attempt = 0
+ while True:
+ try:
+ f()
+ break
+ except:
+ attempt += 1
+ if attempt >= self.maxAttempts:
+ raise
+ else:
+ import traceback
+ traceback.print_exc()
+
+ def _testFlumePolling(self):
+ try:
+ port = self._utils.startSingleSink()
+ self._writeAndVerify([port])
+ self._utils.assertChannelsAreEmpty()
+ finally:
+ self._utils.close()
+
+ def _testFlumePollingMultipleHosts(self):
+ try:
+ port = self._utils.startSingleSink()
+ self._writeAndVerify([port])
+ self._utils.assertChannelsAreEmpty()
+ finally:
+ self._utils.close()
+
+ def test_flume_polling(self):
+ self._testMultipleTimes(self._testFlumePolling)
+
+ def test_flume_polling_multiple_hosts(self):
+ self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+
+
+def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
jars = glob.glob(
@@ -692,5 +842,30 @@ if __name__ == "__main__":
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
"remove all but one") % kafka_assembly_dir)
else:
- os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars[0]
+ return jars[0]
+
+
+def search_flume_assembly_jar():
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
+ jars = glob.glob(
+ os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar"))
+ if not jars:
+ raise Exception(
+ ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
+ "You need to build Spark with "
+ "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
+ "'build/mvn package' before running this test")
+ elif len(jars) > 1:
+ raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
+ "remove all but one") % flume_assembly_dir)
+ else:
+ return jars[0]
+
+if __name__ == "__main__":
+ kafka_assembly_jar = search_kafka_assembly_jar()
+ flume_assembly_jar = search_flume_assembly_jar()
+ jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+
+ os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
unittest.main()